STOMP 支援
Spring Integration 4.2 版引入了 STOMP (Simple Text Orientated Messaging Protocol) 用戶端支援。它基於 Spring Framework 訊息傳遞模組 stomp 套件的架構、基礎設施和 API。Spring Integration 使用了許多 Spring STOMP 元件(例如 StompSession
和 StompClientSupport
)。如需更多資訊,請參閱 Spring Framework 參考手冊中的 Spring Framework STOMP 支援 章節。
您需要將此相依性包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stomp</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.3.5"
對於伺服器端元件,您需要新增 org.springframework:spring-websocket
和/或 io.projectreactor.netty:reactor-netty
相依性。
概觀
若要設定 STOMP,您應該從 STOMP 用戶端物件開始。Spring Framework 提供了以下實作
-
WebSocketStompClient
:建立在 Spring WebSocket API 之上,支援標準 JSR-356 WebSocket、Jetty 9 和 SockJS,用於基於 HTTP 的 WebSocket 模擬,並搭配 SockJS Client。 -
ReactorNettyTcpStompClient
:建立在來自reactor-netty
專案的ReactorNettyTcpClient
之上。
您可以提供任何其他 StompClientSupport
實作。請參閱這些類別的 Javadoc。
StompClientSupport
類別設計為factory,用於為提供的 StompSessionHandler
產生 StompSession
,而所有剩餘的工作都是透過對該 StompSessionHandler
和 StompSession
抽象的callbacks 完成的。使用 Spring Integration adapter 抽象,我們需要提供一些受管理的共用物件,以將我們的應用程式表示為具有其唯一工作階段的 STOMP 用戶端。為此,Spring Integration 提供了 StompSessionManager
抽象,以管理任何提供的 StompSessionHandler
之間的single StompSession
。這允許針對特定的 STOMP Broker 使用輸入或輸出通道配接器(或兩者)。如需更多資訊,請參閱 StompSessionManager
(及其實作)JavaDocs。
STOMP 輸入通道配接器
StompInboundChannelAdapter
是一個一站式 MessageProducer
元件,可讓您的 Spring Integration 應用程式訂閱提供的 STOMP 目的地,並從中接收訊息(透過使用提供的 MessageConverter
在連線的 StompSession
上,從 STOMP 框架轉換而來)。您可以透過在 StompInboundChannelAdapter
上使用適當的 @ManagedOperation
註解,在執行期間變更目的地(以及 STOMP 訂閱)。
如需更多設定選項,請參閱 STOMP 命名空間支援 和 StompInboundChannelAdapter
Javadoc。
STOMP 輸出通道配接器
StompMessageHandler
是 <int-stomp:outbound-channel-adapter>
的 MessageHandler
,用於透過 StompSession
(由共用的 StompSessionManager
提供)將外寄 Message<?>
實例發送到 STOMP destination
(預先設定或在執行期間使用 SpEL 運算式判斷)。
如需更多設定選項,請參閱 STOMP 命名空間支援 和 StompMessageHandler
Javadoc。
STOMP 標頭對應
STOMP 協定提供標頭作為其框架的一部分。STOMP 框架的完整結構具有以下格式
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework 提供 StompHeaders
來表示這些標頭。如需更多詳細資訊,請參閱 Javadoc。STOMP 框架會轉換為 Message<?>
實例和從 Message<?>
實例轉換而來,這些標頭會對應到 MessageHeaders
實例和從 MessageHeaders
實例對應而來。Spring Integration 為 STOMP 配接器提供預設的 HeaderMapper
實作。該實作是 StompHeaderMapper
。它分別為輸入和輸出配接器提供 fromHeaders()
和 toHeaders()
操作。
與許多其他 Spring Integration 模組一樣,引入了 IntegrationStompHeaders
類別,以將標準 STOMP 標頭對應到 MessageHeaders
,並以 stomp_
作為標頭名稱前置詞。此外,當傳送到目的地時,所有具有該前置詞的 MessageHeaders
實例都會對應到 StompHeaders
。
如需更多資訊,請參閱這些類別的 Javadoc 和 STOMP 命名空間支援 中的 mapped-headers
屬性說明。
STOMP 整合事件
許多 STOMP 操作都是非同步的,包括錯誤處理。例如,STOMP 具有 RECEIPT
伺服器框架,當用戶端框架透過新增 RECEIPT
標頭請求一個框架時,伺服器框架會傳回它。為了提供對這些非同步事件的存取,Spring Integration 發出 StompIntegrationEvent
實例,您可以透過實作 ApplicationListener
或使用 <int-event:inbound-channel-adapter>
來取得這些實例(請參閱 接收 Spring 應用程式事件)。
具體而言,當 stompSessionListenableFuture
由於連線到 STOMP Broker 失敗而收到 onFailure()
時,會從 AbstractStompSessionManager
發出 StompExceptionEvent
。另一個範例是 StompMessageHandler
。它處理 ERROR
STOMP 框架,這些框架是伺服器對此 StompMessageHandler
發送的不正確(未接受)訊息的回應。
StompMessageHandler
會發出 StompReceiptEvent
,作為在非同步回應中針對發送到 StompSession
的訊息的 StompSession.Receiptable
回呼的一部分。StompReceiptEvent
可能是正面的或負面的,具體取決於是否在 receiptTimeLimit
期間內從伺服器收到 RECEIPT
框架,您可以在 StompClientSupport
實例上設定該期間。預設值為 15 * 1000
(以毫秒為單位,因此為 15 秒)。
只有在要發送的訊息的 RECEIPT STOMP 標頭不是 null 時,才會新增 StompSession.Receiptable 回呼。您可以透過 StompSession 的 autoReceipt 選項和 StompSessionManager 上的對應選項,在 StompSession 上啟用自動 RECEIPT 標頭產生。 |
如需有關如何設定 Spring Integration 以接受這些 ApplicationEvent
實例的更多資訊,請參閱 STOMP 配接器 Java 設定。
STOMP 配接器 Java 設定
以下範例顯示了 STOMP 配接器的完整 Java 設定
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP 命名空間支援
Spring Integration STOMP 命名空間實作了輸入和輸出通道配接器元件。若要將其包含在您的設定中,請在您的應用程式環境設定檔中提供以下命名空間宣告
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
了解 <int-stomp:outbound-channel-adapter>
元素
以下清單顯示了 STOMP 輸出通道配接器的可用屬性
<int-stomp:outbound-channel-adapter
id="" (1)
channel="" (2)
stomp-session-manager="" (3)
header-mapper="" (4)
mapped-headers="" (5)
destination="" (6)
destination-expression="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | 元件 Bean 名稱。MessageHandler 註冊時會使用 id 加上 .handler 的 Bean 別名。如果您未設定 channel 屬性,則會在應用程式環境中建立並註冊 DirectChannel ,並以 id 屬性的值作為 Bean 名稱。在這種情況下,端點註冊時會使用 Bean 名稱 id 加上 .adapter 。 |
2 | 如果 id 存在,則識別附加到此配接器的通道。請參閱 id 。選用。 |
3 | 對 StompSessionManager Bean 的參考,它封裝了低階連線和 StompSession 處理操作。必要。 |
4 | 對實作 HeaderMapper<StompHeaders> 的 Bean 的參考,它將 Spring Integration MessageHeaders 對應到 STOMP 框架標頭和從 STOMP 框架標頭對應而來。它與 mapped-headers 互斥。預設值為 StompHeaderMapper 。 |
5 | 要對應到 STOMP 框架標頭的 STOMP 標頭名稱的逗號分隔清單。只有在未設定 header-mapper 參考時才能提供。此清單中的值也可以是要與標頭名稱比對的簡單模式(例如 myheader* 或 *myheader )。特殊符記 (STOMP_OUTBOUND_HEADERS ) 代表所有標準 STOMP 標頭(content-length、receipt、heart-beat 等)。預設情況下會包含它們。如果您想要新增自己的標頭,並且也想要對應標準標頭,則必須包含此符記或使用 header-mapper 提供您自己的 HeaderMapper 實作。 |
6 | STOMP 訊息要發送到的目的地名稱。它與 destination-expression 互斥。 |
7 | 一個 SpEL 運算式,將針對每個 Spring Integration Message 在執行期間評估為根物件。它與 destination 互斥。 |
8 | 布林值,指示是否應自動啟動此端點。預設值為 true 。 |
9 | 此端點應在其生命週期階段內啟動和停止。值越低,此端點啟動得越早,停止得越晚。預設值為 Integer.MIN_VALUE 。值可以是負數。請參閱 SmartLifeCycle 。 |
了解 <int-stomp:inbound-channel-adapter>
元素
以下清單顯示了 STOMP 輸入通道配接器的可用屬性
<int-stomp:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
stomp-session-manager="" (4)
header-mapper="" (5)
mapped-headers="" (6)
destinations="" (7)
send-timeout="" (8)
payload-type="" (9)
auto-startup="" (10)
phase=""/> (11)
1 | 元件 Bean 名稱。如果您未設定 channel 屬性,則會在應用程式環境中建立並註冊 DirectChannel ,並以 id 屬性的值作為 Bean 名稱。在這種情況下,端點註冊時會使用 Bean 名稱 id 加上 .adapter 。 |
2 | 識別附加到此配接器的通道。 |
3 | MessageChannel Bean 參考,ErrorMessage 實例應發送到該參考。 |
4 | 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。 |
5 | 要從 STOMP 框架標頭對應而來的 STOMP 標頭名稱的逗號分隔清單。只有在未設定 header-mapper 參考時,您才能提供此清單。此清單中的值也可以是要與標頭名稱比對的簡單模式(例如,myheader* 或 *myheader )。特殊符記 (STOMP_INBOUND_HEADERS ) 代表所有標準 STOMP 標頭(content-length、receipt、heart-beat 等)。預設情況下會包含它們。如果您想要新增自己的標頭,並且也想要對應標準標頭,則也必須包含此符記或使用 header-mapper 提供您自己的 HeaderMapper 實作。 |
6 | 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。 |
7 | 要訂閱的 STOMP 目的地名稱的逗號分隔清單。目的地(以及訂閱)清單可以在執行期間透過 addDestination() 和 removeDestination() @ManagedOperation 註解進行修改。 |
8 | 如果通道可能會封鎖,則在將訊息發送到通道時要等待的最長時間(以毫秒為單位)。例如,如果 QueueChannel 的最大容量已達到,則它可以封鎖直到空間可用。 |
9 | 要從傳入的 STOMP 框架轉換的目標 payload 的 Java 類型完整名稱。預設值為 String.class 。 |
10 | 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。 |
11 | 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。 |