WebSockets 支援

從 4.1 版本開始,Spring Integration 開始支援 WebSocket。它是基於 Spring Framework 的 web-socket 模組的架構、基礎設施和 API。因此,Spring WebSocket 的許多元件(例如 SubProtocolHandlerWebSocketClient)和組態選項(例如 @EnableWebSocketMessageBroker)都可以在 Spring Integration 中重複使用。如需更多資訊,請參閱 Spring Framework 參考手冊中的 Spring Framework WebSocket 支援 章節。

您需要在您的專案中包含此相依性

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-websocket</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:6.3.5"

對於伺服器端,必須明確包含 org.springframework:spring-webmvc 相依性。

Spring Framework WebSocket 基礎設施是基於 Spring 訊息傳遞基礎,並提供基於 Spring Integration 使用的相同 MessageChannel 實作和 MessageHandler 實作(以及一些 POJO 方法註解映射)的基本訊息傳遞框架。因此,即使沒有 WebSocket 配接器,Spring Integration 也可以直接參與 WebSocket 流程。為此,您可以使用適當的註解設定 Spring Integration @MessagingGateway,如下列範例所示

@MessagingGateway
@Controller
public interface WebSocketGateway {

    @MessageMapping("/greeting")
    @SendToUser("/queue/answer")
    @Gateway(requestChannel = "greetingChannel")
    String greeting(String payload);

}

概觀

由於 WebSocket 協定在定義上是串流的,而且我們可以同時向 WebSocket 發送和接收訊息,因此無論是在用戶端還是伺服器端,我們都可以處理適當的 WebSocketSession。為了封裝連線管理和 WebSocketSession 登錄,提供了 IntegrationWebSocketContainer 以及 ClientWebSocketContainerServerWebSocketContainer 實作。由於 WebSocket API 及其在 Spring Framework 中的實作(具有許多擴充功能),伺服器端和用戶端(從 Java 的角度來看,當然)都使用相同的類別。因此,大多數連線和 WebSocketSession 登錄選項在兩端都是相同的。這讓我們可以重複使用許多組態項目和基礎設施掛鉤,以在伺服器端和用戶端建置 WebSocket 應用程式。以下範例顯示了元件如何同時用於這兩種目的

//Client side
@Bean
public WebSocketClient webSocketClient() {
    return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
    return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
    return new ServerWebSocketContainer("/endpoint").withSockJs();
}

IntegrationWebSocketContainer 旨在實現雙向訊息傳遞,並且可以在輸入和輸出通道配接器之間共用(請參閱下文),當使用單向(發送或接收)WebSocket 訊息傳遞時,可以僅從其中一個配接器參考它。它可以不使用任何通道配接器,但在這種情況下,IntegrationWebSocketContainer 僅作為 WebSocketSession 登錄表。

ServerWebSocketContainer 實作 WebSocketConfigurer 以將內部 IntegrationWebSocketContainer.IntegrationWebSocketHandler 註冊為 Endpoint。它在目標廠商 WebSocket 容器的 ServletWebSocketHandlerRegistry 中,在提供的 paths 和其他伺服器 WebSocket 選項(例如 HandshakeHandlerSockJS fallback)下執行此操作。此註冊是通過基礎設施 WebSocketIntegrationConfigurationInitializer 元件實現的,該元件的作用與 @EnableWebSocket 註解相同。這表示,通過使用 @EnableIntegration(或應用程式上下文中的任何 Spring Integration 命名空間),您可以省略 @EnableWebSocket 宣告,因為 Spring Integration 基礎設施會偵測所有 WebSocket 端點。

從 6.1 版本開始,ClientWebSocketContainer 可以使用提供的 URI 而不是 uriTemplateuriVariables 組合來設定。當 URI 的某些部分需要自訂編碼時,這非常有用。有關便利性,請參閱 UriComponentsBuilder API。

WebSocket 輸入通道配接器

WebSocketInboundChannelAdapter 實作 WebSocketSession 互動的接收部分。您必須為其提供 IntegrationWebSocketContainer,並且配接器會將自身註冊為 WebSocketListener 以處理傳入訊息和 WebSocketSession 事件。

只能在 IntegrationWebSocketContainer 中註冊一個 WebSocketListener

對於 WebSocket 子協定,可以使用 SubProtocolHandlerRegistry 作為第二個建構子引數來設定 WebSocketInboundChannelAdapter。配接器委派給 SubProtocolHandlerRegistry,以確定接受的 WebSocketSession 的適當 SubProtocolHandler,並根據子協定實作將 WebSocketMessage 轉換為 Message

預設情況下,WebSocketInboundChannelAdapter 僅依賴原始 PassThruSubProtocolHandler 實作,該實作將 WebSocketMessage 轉換為 Message

WebSocketInboundChannelAdapter 僅接受並發送到基礎整合流程具有 SimpMessageType.MESSAGE 或空 simpMessageType 標頭的 Message 實例。所有其他 Message 類型都通過從 SubProtocolHandler 實作(例如 StompSubProtocolHandler)發出的 ApplicationEvent 實例處理。

在伺服器端,如果存在 @EnableWebSocketMessageBroker 組態,您可以使用 useBroker = true 選項來設定 WebSocketInboundChannelAdapter。在這種情況下,所有 non-MESSAGE Message 類型都會委派給提供的 AbstractBrokerMessageHandler。此外,如果使用目的地前綴設定代理中繼,則與代理目的地匹配的那些訊息將路由到 AbstractBrokerMessageHandler,而不是 WebSocketInboundChannelAdapteroutputChannel

如果 useBroker = false 且接收到的訊息類型為 SimpMessageType.CONNECT,則 WebSocketInboundChannelAdapter 會立即將 SimpMessageType.CONNECT_ACK 訊息發送到 WebSocketSession,而不會將其發送到通道。

Spring 的 WebSocket 支援僅允許設定一個代理中繼。因此,我們不需要 AbstractBrokerMessageHandler 參考。它在應用程式上下文中偵測到。

有關更多組態選項,請參閱 WebSockets 命名空間支援

WebSocket 輸出通道配接器

WebSocketOutboundChannelAdapter

  1. 從其 MessageChannel 接受 Spring Integration 訊息

  2. MessageHeaders 確定 WebSocketSession id

  3. 從提供的 IntegrationWebSocketContainer 檢索 WebSocketSession

  4. WebSocketMessage 工作的轉換和發送委派給提供的 SubProtocolHandlerRegistry 中的適當 SubProtocolHandler

在用戶端,不需要 WebSocketSession id 訊息標頭,因為 ClientWebSocketContainer 僅處理單一連線及其各自的 WebSocketSession

若要使用 STOMP 子協定,您應該使用 StompSubProtocolHandler 設定此配接器。然後,您可以使用 StompHeaderAccessor.create(StompCommand…​)MessageBuilder,或僅使用 HeaderEnricher(請參閱 標頭擴充器)向此配接器發送任何 STOMP 訊息類型。

本章的其餘部分主要涵蓋其他組態選項。

WebSockets 命名空間支援

Spring Integration WebSocket 命名空間包含本章其餘部分中描述的幾個元件。若要將其包含在您的組態中,請在您的應用程式上下文組態檔案中使用以下命名空間宣告

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
    ...
</beans>

<int-websocket:client-container> 屬性

以下列表顯示了 <int-websocket:client-container> 元素可用的屬性

<int-websocket:client-container
                  id=""                             (1)
                  client=""                         (2)
                  uri=""                            (3)
                  uri-variables=""                  (4)
                  origin=""                         (5)
                  send-time-limit=""                (6)
                  send-buffer-size-limit=""         (7)
                  send-buffer-overflow-strategy=""  (8)
                  auto-startup=""                   (9)
                  phase="">                        (10)
                <int-websocket:http-headers>
                  <entry key="" value=""/>
                </int-websocket:http-headers>      (11)
</int-websocket:client-container>
1 元件 bean 名稱。
2 WebSocketClient bean 參考。
3 目標 WebSocket 服務的 uriuriTemplate。如果您將其用作具有 URI 變數佔位符的 uriTemplate,則需要 uri-variables 屬性。
4 uri 屬性值中 URI 變數佔位符的逗號分隔值。這些值會根據它們在 uri 中的順序替換到佔位符中。請參閱 UriComponents.expand(Object…​uriVariableValues)
5 Origin Handshake HTTP 標頭值。
6 WebSocket 會話 'send' 逾時限制。預設值為 10000
7 WebSocket 會話 'send' 訊息大小限制。預設值為 524288
8 WebSocket 會話發送緩衝區溢位策略決定了當會話的輸出訊息緩衝區達到 send-buffer-size-limit 時的行為。有關可能的值和更多詳細資訊,請參閱 ConcurrentWebSocketSessionDecorator.OverflowStrategy
9 布林值,指示此端點是否應自動啟動。預設值為 false,假設此容器是從 WebSocket 輸入配接器 啟動的。
10 此端點應在其內啟動和停止的生命週期階段。值越低,此端點啟動得越早,停止得越晚。預設值為 Integer.MAX_VALUE。值可以是負數。請參閱 SmartLifeCycle
11 要與 Handshake 請求一起使用的 HttpHeadersMap

<int-websocket:server-container> 屬性

以下列表顯示了 <int-websocket:server-container> 元素可用的屬性

<int-websocket:server-container
          id=""                             (1)
          path=""                           (2)
          handshake-handler=""              (3)
          handshake-interceptors=""         (4)
          decorator-factories=""            (5)
          send-time-limit=""                (6)
          send-buffer-size-limit=""         (7)
          send-buffer-overflow-strategy=""  (8)
          allowed-origins="">               (9)
          <int-websocket:sockjs
            client-library-url=""          (10)
            stream-bytes-limit=""          (11)
            session-cookie-needed=""       (12)
            heartbeat-time=""              (13)
            disconnect-delay=""            (14)
            message-cache-size=""          (15)
            websocket-enabled=""           (16)
            scheduler=""                   (17)
            message-codec=""               (18)
            transport-handlers=""          (19)
            suppress-cors="true" />        (20)
</int-websocket:server-container>
1 元件 bean 名稱。
2 將特定請求映射到 WebSocketHandler 的路徑(或逗號分隔的路徑)。支援精確路徑映射 URI(例如 /myPath)和 ant 樣式路徑模式(例如 /myPath/**)。
3 HandshakeHandler bean 參考。預設值為 DefaultHandshakeHandler
4 HandshakeInterceptor bean 參考列表。
5 裝飾用於處理 WebSocket 訊息的處理器的工廠 (WebSocketHandlerDecoratorFactory) 列表。這可能對某些進階用例很有用(例如,允許 Spring Security 在相應的 HTTP 會話過期時強制關閉 WebSocket 會話)。有關更多資訊,請參閱 Spring Session 專案
6 請參閱 <int-websocket:client-container> 上的相同選項。
7 請參閱 <int-websocket:client-container> 上的相同選項。
8 WebSocket 會話發送緩衝區溢位策略決定了當會話的輸出訊息緩衝區達到 send-buffer-size-limit 時的行為。有關可能的值和更多詳細資訊,請參閱 ConcurrentWebSocketSessionDecorator.OverflowStrategy
9 允許的 origin 標頭值。您可以將多個 origin 指定為逗號分隔的列表。此檢查主要針對瀏覽器用戶端而設計。沒有任何東西可以阻止其他類型的用戶端修改 origin 標頭值。當啟用 SockJS 且 origin 受到限制時,不使用 origin 標頭進行跨域請求的傳輸類型(jsonp-pollingiframe-xhr-pollingiframe-eventsourceiframe-htmlfile)會被停用。因此,不支援 IE6 和 IE7,並且僅在沒有 Cookie 的情況下支援 IE8 和 IE9。預設情況下,允許所有 origin。
10 沒有原生跨域通訊的傳輸(例如 eventsourcehtmlfile)必須從「外部」網域在不可見的 iframe 中取得一個簡單頁面,以便 iframe 中的程式碼可以從本機於 SockJS 伺服器的網域執行。由於 iframe 需要載入 SockJS javascript 用戶端程式庫,因此此屬性可讓您指定從中載入的位置。預設情況下,它指向 d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js。但是,您也可以將其設定為指向應用程式提供的 URL。請注意,可以指定相對 URL,在這種情況下,URL 必須相對於 iframe URL。例如,假設 SockJS 端點映射到 /sockjs,並且產生的 iframe URL 為 /sockjs/iframe.html,則相對 URL 必須以 "../../" 開頭,以向上遍歷到 SockJS 映射上方的位置。對於基於前綴的 servlet 映射,您可能需要再進行一次遍歷。
11 在單個 HTTP 串流請求關閉之前可以透過它發送的最小位元組數。預設值為 128K(即 128*1024 或 131072 位元組)。
12 來自 SockJs /info 端點的回應中的 cookie_needed 值。此屬性指示應用程式是否需要 JSESSIONID Cookie 才能正常運作(例如,用於負載平衡或在 Java Servlet 容器中用於 HTTP 會話)。
13 伺服器在未發送任何訊息後,應向用戶端發送心跳訊框以防止連線中斷的時間量(以毫秒為單位)。預設值為 25,000(25 秒)。
14 在沒有接收連線(即伺服器可以透過其向用戶端發送資料的活動連線)後,將用戶端視為斷線之前的時間量(以毫秒為單位)。預設值為 5000
15 在等待來自用戶端的下一個 HTTP 輪詢請求時,會話可以快取的伺服器到用戶端訊息的數量。預設大小為 100
16 某些負載平衡器不支援 WebSockets。將此選項設定為 false 以停用伺服器端的 WebSocket 傳輸。預設值為 true
17 TaskScheduler bean 參考。如果未提供任何值,則會建立新的 ThreadPoolTaskScheduler 實例。此排程器實例用於排程心跳訊息。
18 用於編碼和解碼 SockJS 訊息的 SockJsMessageCodec bean 參考。預設情況下,使用 Jackson2SockJsMessageCodec,這需要 Jackson 程式庫存在於類別路徑中。
19 TransportHandler bean 參考列表。
20 是否停用為 SockJS 請求自動新增 CORS 標頭。預設值為 false

<int-websocket:outbound-channel-adapter> 屬性

以下列表顯示了 <int-websocket:outbound-channel-adapter> 元素可用的屬性

<int-websocket:outbound-channel-adapter
                          id=""                             (1)
                          channel=""                        (2)
                          container=""                      (3)
                          default-protocol-handler=""       (4)
                          protocol-handlers=""              (5)
                          message-converters=""             (6)
                          merge-with-default-converters=""  (7)
                          auto-startup=""                   (8)
                          phase=""/>                        (9)
1 元件 bean 名稱。如果您未提供 channel 屬性,則會在應用程式上下文中建立並註冊 DirectChannel,並將此 id 屬性作為 bean 名稱。在這種情況下,端點會以 bean 名稱 id 加上 .adapter 註冊。並且 MessageHandler 會以 bean 別名 id 加上 .handler 註冊。
2 識別附加到此配接器的通道。
3 IntegrationWebSocketContainer bean 的參考,該 bean 封裝了低階連線和 WebSocketSession 處理操作。必要項。
4 SubProtocolHandler 實例的可選參考。當用戶端未請求子協定或它是單一協定處理器時使用。如果未提供此參考或 protocol-handlers 列表,則預設會使用 PassThruSubProtocolHandler
5 此通道配接器的 SubProtocolHandler bean 參考列表。如果您僅提供單個 bean 參考,並且未提供 default-protocol-handler,則該單個 SubProtocolHandler 將用作 default-protocol-handler。如果您未設定此屬性或 default-protocol-handler,則預設會使用 PassThruSubProtocolHandler
6 此通道配接器的 MessageConverter bean 參考列表。
7 布林值,指示是否應在任何自訂轉換器之後註冊預設轉換器。僅當提供了 message-converters 時才使用此旗標。否則,將註冊所有預設轉換器。預設值為 false。預設轉換器為(依序):StringMessageConverterByteArrayMessageConverterMappingJackson2MessageConverter(如果 Jackson 程式庫存在於類別路徑中)。
8 布林值,指示此端點是否應自動啟動。預設值為 true
9 此端點應在其內啟動和停止的生命週期階段。值越低,此端點啟動得越早,停止得越晚。預設值為 Integer.MIN_VALUE。值可以是負數。請參閱 SmartLifeCycle

<int-websocket:inbound-channel-adapter> 屬性

以下列表顯示了 <int-websocket:outbound-channel-adapter> 元素可用的屬性

<int-websocket:inbound-channel-adapter
                            id=""  (1)
                            channel=""  (2)
                            error-channel=""  (3)
                            container=""  (4)
                            default-protocol-handler=""  (5)
                            protocol-handlers=""  (6)
                            message-converters=""  (7)
                            merge-with-default-converters=""  (8)
                            send-timeout=""  (9)
                            payload-type=""  (10)
                            use-broker=""  (11)
                            auto-startup=""  (12)
                            phase=""/>  (13)
1 組件 Bean 名稱。如果您未設定 channel 屬性,則會建立 DirectChannel,並以這個 id 屬性作為 Bean 名稱,註冊到應用程式上下文中。在這種情況下,端點會以 Bean 名稱 id 加上 .adapter 註冊。
2 識別附加到此配接器的通道。
3 MessageChannel Bean 參考,錯誤訊息 ( ErrorMessage ) 實例應該傳送到的目標。
4 請參閱 <int-websocket:outbound-channel-adapter> 上相同的選項。
5 請參閱 <int-websocket:outbound-channel-adapter> 上相同的選項。
6 請參閱 <int-websocket:outbound-channel-adapter> 上相同的選項。
7 請參閱 <int-websocket:outbound-channel-adapter> 上相同的選項。
8 請參閱 <int-websocket:outbound-channel-adapter> 上相同的選項。
9 在傳送訊息到通道時,等待的最長時間(以毫秒為單位),如果通道可能會阻塞。例如,如果 QueueChannel 已達到其最大容量,則它可能會阻塞直到有可用空間。
10 目標 payload 的 Java 類型的完整限定名稱,從傳入的 WebSocketMessage 轉換而來。預設為 java.lang.String
11 指示此配接器是否將 non-MESSAGE WebSocketMessage 實例和具有代理目的地 (broker destination) 的訊息,從應用程式內容傳送到 AbstractBrokerMessageHandler。當此屬性為 true 時,需要 Broker Relay 配置。此屬性僅在伺服器端使用。在客戶端,它會被忽略。預設為 false
12 請參閱 <int-websocket:outbound-channel-adapter> 上相同的選項。
13 請參閱 <int-websocket:outbound-channel-adapter> 上相同的選項。

使用 ClientStompEncoder

從 4.3.13 版本開始,Spring Integration 提供了 ClientStompEncoder (作為標準 StompEncoder 的擴展),用於 WebSocket 通道配接器的客戶端。為了正確準備客戶端訊息,您必須將 ClientStompEncoder 的實例注入到 StompSubProtocolHandler 中。預設 StompSubProtocolHandler 的一個問題是,它是為伺服器端設計的,因此它會將 SEND stompCommand 標頭更新為 MESSAGE (這是 STOMP 協定對伺服器端的要求)。如果客戶端未在其訊息中發送正確的 SEND WebSocket 框架,則某些 STOMP 代理程式將不接受它們。在這種情況下,ClientStompEncoder 的目的是覆蓋 stompCommand 標頭,並在將訊息編碼為 byte[] 之前,將其設定為 SEND 值。

動態 WebSocket 端點註冊

從 5.5 版本開始,WebSocket 伺服器端點 (基於 ServerWebSocketContainer 的通道配接器) 現在可以在運行時註冊 (和移除) - ServerWebSocketContainer 對應的 paths 會透過 HandlerMapping 暴露到 DispatcherServlet 中,並可供 WebSocket 客戶端存取。動態和運行時整合流程 支援有助於以透明的方式註冊這些端點

@Autowired
IntegrationFlowContext integrationFlowContext;

@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
       new ServerWebSocketContainer("/dynamic")
               .setHandshakeHandler(this.handshakeHandler);

WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
       new WebSocketInboundChannelAdapter(serverWebSocketContainer);

QueueChannel dynamicRequestsChannel = new QueueChannel();

IntegrationFlow serverFlow =
       IntegrationFlow.from(webSocketInboundChannelAdapter)
               .channel(dynamicRequestsChannel)
               .get();

IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
       this.integrationFlowContext.registration(serverFlow)
               .addBean(serverWebSocketContainer)
               .register();
...
dynamicServerFlow.destroy();
重要的是,在動態流程註冊時呼叫 .addBean(serverWebSocketContainer),以將 ServerWebSocketContainer 的實例新增到 ApplicationContext 中,用於端點註冊。當動態流程註冊被銷毀時,相關聯的 ServerWebSocketContainer 實例也會被銷毀,以及相應的端點註冊,包括 URL 路徑映射。
動態 WebSocket 端點只能透過 Spring Integration 機制註冊:當使用常規 Spring @EnableWebsocket 時,Spring Integration 配置會退避,並且不會註冊動態端點的基礎架構。