STOMP 支援

Spring Integration 4.2 版引入了 STOMP (Simple Text Orientated Messaging Protocol) 用戶端支援。它基於 Spring Framework 訊息傳遞模組 stomp 套件的架構、基礎設施和 API。Spring Integration 使用了許多 Spring STOMP 元件(例如 StompSessionStompClientSupport)。如需更多資訊,請參閱 Spring Framework 參考手冊中的 Spring Framework STOMP 支援 章節。

您需要將此相依性包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.3.5"

對於伺服器端元件,您需要新增 org.springframework:spring-websocket 和/或 io.projectreactor.netty:reactor-netty 相依性。

概觀

若要設定 STOMP,您應該從 STOMP 用戶端物件開始。Spring Framework 提供了以下實作

  • WebSocketStompClient:建立在 Spring WebSocket API 之上,支援標準 JSR-356 WebSocket、Jetty 9 和 SockJS,用於基於 HTTP 的 WebSocket 模擬,並搭配 SockJS Client。

  • ReactorNettyTcpStompClient:建立在來自 reactor-netty 專案的 ReactorNettyTcpClient 之上。

您可以提供任何其他 StompClientSupport 實作。請參閱這些類別的 Javadoc

StompClientSupport 類別設計為factory,用於為提供的 StompSessionHandler 產生 StompSession,而所有剩餘的工作都是透過對該 StompSessionHandlerStompSession 抽象的callbacks 完成的。使用 Spring Integration adapter 抽象,我們需要提供一些受管理的共用物件,以將我們的應用程式表示為具有其唯一工作階段的 STOMP 用戶端。為此,Spring Integration 提供了 StompSessionManager 抽象,以管理任何提供的 StompSessionHandler 之間的single StompSession。這允許針對特定的 STOMP Broker 使用輸入輸出通道配接器(或兩者)。如需更多資訊,請參閱 StompSessionManager(及其實作)JavaDocs。

STOMP 輸入通道配接器

StompInboundChannelAdapter 是一個一站式 MessageProducer 元件,可讓您的 Spring Integration 應用程式訂閱提供的 STOMP 目的地,並從中接收訊息(透過使用提供的 MessageConverter 在連線的 StompSession 上,從 STOMP 框架轉換而來)。您可以透過在 StompInboundChannelAdapter 上使用適當的 @ManagedOperation 註解,在執行期間變更目的地(以及 STOMP 訂閱)。

如需更多設定選項,請參閱 STOMP 命名空間支援StompInboundChannelAdapter Javadoc

STOMP 輸出通道配接器

StompMessageHandler<int-stomp:outbound-channel-adapter>MessageHandler,用於透過 StompSession(由共用的 StompSessionManager 提供)將外寄 Message<?> 實例發送到 STOMP destination(預先設定或在執行期間使用 SpEL 運算式判斷)。

如需更多設定選項,請參閱 STOMP 命名空間支援StompMessageHandler Javadoc

STOMP 標頭對應

STOMP 協定提供標頭作為其框架的一部分。STOMP 框架的完整結構具有以下格式

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework 提供 StompHeaders 來表示這些標頭。如需更多詳細資訊,請參閱 Javadoc。STOMP 框架會轉換為 Message<?> 實例和從 Message<?> 實例轉換而來,這些標頭會對應到 MessageHeaders 實例和從 MessageHeaders 實例對應而來。Spring Integration 為 STOMP 配接器提供預設的 HeaderMapper 實作。該實作是 StompHeaderMapper。它分別為輸入和輸出配接器提供 fromHeaders()toHeaders() 操作。

與許多其他 Spring Integration 模組一樣,引入了 IntegrationStompHeaders 類別,以將標準 STOMP 標頭對應到 MessageHeaders,並以 stomp_ 作為標頭名稱前置詞。此外,當傳送到目的地時,所有具有該前置詞的 MessageHeaders 實例都會對應到 StompHeaders

如需更多資訊,請參閱這些類別的 JavadocSTOMP 命名空間支援 中的 mapped-headers 屬性說明。

STOMP 整合事件

許多 STOMP 操作都是非同步的,包括錯誤處理。例如,STOMP 具有 RECEIPT 伺服器框架,當用戶端框架透過新增 RECEIPT 標頭請求一個框架時,伺服器框架會傳回它。為了提供對這些非同步事件的存取,Spring Integration 發出 StompIntegrationEvent 實例,您可以透過實作 ApplicationListener 或使用 <int-event:inbound-channel-adapter> 來取得這些實例(請參閱 接收 Spring 應用程式事件)。

具體而言,當 stompSessionListenableFuture 由於連線到 STOMP Broker 失敗而收到 onFailure() 時,會從 AbstractStompSessionManager 發出 StompExceptionEvent。另一個範例是 StompMessageHandler。它處理 ERROR STOMP 框架,這些框架是伺服器對此 StompMessageHandler 發送的不正確(未接受)訊息的回應。

StompMessageHandler 會發出 StompReceiptEvent,作為在非同步回應中針對發送到 StompSession 的訊息的 StompSession.Receiptable 回呼的一部分。StompReceiptEvent 可能是正面的或負面的,具體取決於是否在 receiptTimeLimit 期間內從伺服器收到 RECEIPT 框架,您可以在 StompClientSupport 實例上設定該期間。預設值為 15 * 1000(以毫秒為單位,因此為 15 秒)。

只有在要發送的訊息的 RECEIPT STOMP 標頭不是 null 時,才會新增 StompSession.Receiptable 回呼。您可以透過 StompSessionautoReceipt 選項和 StompSessionManager 上的對應選項,在 StompSession 上啟用自動 RECEIPT 標頭產生。

如需有關如何設定 Spring Integration 以接受這些 ApplicationEvent 實例的更多資訊,請參閱 STOMP 配接器 Java 設定

STOMP 配接器 Java 設定

以下範例顯示了 STOMP 配接器的完整 Java 設定

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP 命名空間支援

Spring Integration STOMP 命名空間實作了輸入和輸出通道配接器元件。若要將其包含在您的設定中,請在您的應用程式環境設定檔中提供以下命名空間宣告

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

了解 <int-stomp:outbound-channel-adapter> 元素

以下清單顯示了 STOMP 輸出通道配接器的可用屬性

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 元件 Bean 名稱。MessageHandler 註冊時會使用 id 加上 .handler 的 Bean 別名。如果您未設定 channel 屬性,則會在應用程式環境中建立並註冊 DirectChannel,並以 id 屬性的值作為 Bean 名稱。在這種情況下,端點註冊時會使用 Bean 名稱 id 加上 .adapter
2 如果 id 存在,則識別附加到此配接器的通道。請參閱 id。選用。
3 StompSessionManager Bean 的參考,它封裝了低階連線和 StompSession 處理操作。必要。
4 對實作 HeaderMapper<StompHeaders> 的 Bean 的參考,它將 Spring Integration MessageHeaders 對應到 STOMP 框架標頭和從 STOMP 框架標頭對應而來。它與 mapped-headers 互斥。預設值為 StompHeaderMapper
5 要對應到 STOMP 框架標頭的 STOMP 標頭名稱的逗號分隔清單。只有在未設定 header-mapper 參考時才能提供。此清單中的值也可以是要與標頭名稱比對的簡單模式(例如 myheader**myheader)。特殊符記 (STOMP_OUTBOUND_HEADERS) 代表所有標準 STOMP 標頭(content-length、receipt、heart-beat 等)。預設情況下會包含它們。如果您想要新增自己的標頭,並且也想要對應標準標頭,則必須包含此符記或使用 header-mapper 提供您自己的 HeaderMapper 實作。
6 STOMP 訊息要發送到的目的地名稱。它與 destination-expression 互斥。
7 一個 SpEL 運算式,將針對每個 Spring Integration Message 在執行期間評估為根物件。它與 destination 互斥。
8 布林值,指示是否應自動啟動此端點。預設值為 true
9 此端點應在其生命週期階段內啟動和停止。值越低,此端點啟動得越早,停止得越晚。預設值為 Integer.MIN_VALUE。值可以是負數。請參閱 SmartLifeCycle

了解 <int-stomp:inbound-channel-adapter> 元素

以下清單顯示了 STOMP 輸入通道配接器的可用屬性

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 元件 Bean 名稱。如果您未設定 channel 屬性,則會在應用程式環境中建立並註冊 DirectChannel,並以 id 屬性的值作為 Bean 名稱。在這種情況下,端點註冊時會使用 Bean 名稱 id 加上 .adapter
2 識別附加到此配接器的通道。
3 MessageChannel Bean 參考,ErrorMessage 實例應發送到該參考。
4 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。
5 要從 STOMP 框架標頭對應而來的 STOMP 標頭名稱的逗號分隔清單。只有在未設定 header-mapper 參考時,您才能提供此清單。此清單中的值也可以是要與標頭名稱比對的簡單模式(例如,myheader**myheader)。特殊符記 (STOMP_INBOUND_HEADERS) 代表所有標準 STOMP 標頭(content-length、receipt、heart-beat 等)。預設情況下會包含它們。如果您想要新增自己的標頭,並且也想要對應標準標頭,則也必須包含此符記或使用 header-mapper 提供您自己的 HeaderMapper 實作。
6 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。
7 要訂閱的 STOMP 目的地名稱的逗號分隔清單。目的地(以及訂閱)清單可以在執行期間透過 addDestination()removeDestination() @ManagedOperation 註解進行修改。
8 如果通道可能會封鎖,則在將訊息發送到通道時要等待的最長時間(以毫秒為單位)。例如,如果 QueueChannel 的最大容量已達到,則它可以封鎖直到空間可用。
9 要從傳入的 STOMP 框架轉換的目標 payload 的 Java 類型完整名稱。預設值為 String.class
10 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。
11 請參閱 <int-stomp:outbound-channel-adapter> 上的相同選項。