WebSockets

參考文件的這部分涵蓋了對反應式堆疊 WebSocket 訊息傳遞的支援。

WebSocket 簡介

WebSocket 協定,RFC 6455,提供了一種標準化的方法,可以在用戶端和伺服器之間透過單一 TCP 連線建立全雙工、雙向通訊通道。它是一種與 HTTP 不同的 TCP 協定,但設計為可透過 HTTP 工作,使用埠 80 和 443,並允許重複使用現有的防火牆規則。

WebSocket 互動開始於 HTTP 請求,該請求使用 HTTP Upgrade 標頭來升級,或者在這種情況下,切換到 WebSocket 協定。以下範例顯示了這樣的互動

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: https://127.0.0.1:8080
1 Upgrade 標頭。
2 使用 Upgrade 連線。

具有 WebSocket 支援的伺服器,不會傳回通常的 200 狀態碼,而是傳回類似於以下的輸出

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 協定切換

成功握手後,HTTP 升級請求的底層 TCP Socket 保持開啟,以便用戶端和伺服器繼續傳送和接收訊息。

完整介紹 WebSocket 的運作方式超出了本文檔的範圍。請參閱 RFC 6455、HTML5 的 WebSocket 章節,或網路上眾多的簡介和教學。

請注意,如果 WebSocket 伺服器在 Web 伺服器(例如 nginx)後面執行,您可能需要將其組態為將 WebSocket 升級請求傳遞到 WebSocket 伺服器。同樣地,如果應用程式在雲端環境中執行,請查看雲端供應商關於 WebSocket 支援的說明。

HTTP 與 WebSocket

即使 WebSocket 設計為與 HTTP 相容,並且從 HTTP 請求開始,但重要的是要了解這兩種協定會導致非常不同的架構和應用程式程式設計模型。

在 HTTP 和 REST 中,應用程式被建模為許多 URL。為了與應用程式互動,用戶端存取這些 URL,採用請求-回應樣式。伺服器根據 HTTP URL、方法和標頭將請求路由到適當的處理常式。

相比之下,在 WebSocket 中,通常只有一個 URL 用於初始連線。隨後,所有應用程式訊息都在同一 TCP 連線上流動。這指向完全不同的非同步、事件驅動、訊息傳遞架構。

WebSocket 也是一種低階傳輸協定,與 HTTP 不同,它沒有規定訊息內容的任何語意。這表示除非用戶端和伺服器就訊息語意達成一致,否則無法路由或處理訊息。

WebSocket 用戶端和伺服器可以協商使用更高等級的訊息傳遞協定(例如 STOMP),透過 HTTP 握手請求上的 Sec-WebSocket-Protocol 標頭。在沒有該標頭的情況下,它們需要提出自己的慣例。

何時使用 WebSockets

WebSockets 可以使網頁變得動態且互動性更強。但是,在許多情況下,AJAX 和 HTTP 串流或長輪詢的組合可以提供簡單而有效的解決方案。

例如,新聞、郵件和社群摘要需要動態更新,但每隔幾分鐘執行一次可能完全可以接受。另一方面,協作、遊戲和金融應用程式需要更接近即時。

延遲本身並不是決定性因素。如果訊息量相對較低(例如,監控網路故障),HTTP 串流或輪詢可以提供有效的解決方案。低延遲、高頻率和高容量的組合是使用 WebSocket 的最佳理由。

還要記住,在網際網路上,您無法控制的限制性代理可能會排除 WebSocket 互動,原因可能是它們未組態為傳遞 Upgrade 標頭,或者因為它們關閉了看似閒置的長期連線。這表示在防火牆內部的內部應用程式中使用 WebSocket 比在面向公眾的應用程式中更直接。

WebSocket API

Spring Framework 提供了 WebSocket API,您可以使用它來編寫處理 WebSocket 訊息的用戶端和伺服器端應用程式。

伺服器

若要建立 WebSocket 伺服器,您可以先建立 WebSocketHandler。以下範例顯示如何執行此操作

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

然後您可以將其對應到 URL

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

如果使用 WebFlux Config,則無需執行其他操作,否則,如果不使用 WebFlux 組態,您需要宣告一個 WebSocketHandlerAdapter,如下所示

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

WebSocketHandlerhandle 方法接受 WebSocketSession 並傳回 Mono<Void> 以指示應用程式何時完成對 Session 的處理。Session 是透過兩個串流處理的,一個用於輸入訊息,另一個用於輸出訊息。下表描述了處理串流的兩種方法

WebSocketSession 方法 描述

Flux<WebSocketMessage> receive()

提供對輸入訊息串流的存取,並在連線關閉時完成。

Mono<Void> send(Publisher<WebSocketMessage>)

取得輸出訊息的來源,寫入訊息,並傳回一個 Mono<Void>,該 Mono<Void> 在來源完成且寫入完成時完成。

WebSocketHandler 必須將輸入和輸出串流組合為統一的流程,並傳回一個 Mono<Void>,以反映該流程的完成。根據應用程式需求,統一流程在以下情況下完成

  • 輸入或輸出訊息串流完成。

  • 輸入串流完成(即,連線已關閉),而輸出串流是無限的。

  • 在選定的點,透過 WebSocketSessionclose 方法。

當輸入和輸出訊息串流組合在一起時,無需檢查連線是否開啟,因為 Reactive Streams 會發出結束活動的訊號。輸入串流接收到完成或錯誤訊號,而輸出串流接收到取消訊號。

處理常式的最基本實作是處理輸入串流的實作。以下範例顯示了這樣的實作

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 存取輸入訊息串流。
2 對每個訊息執行某些操作。
3 執行使用訊息內容的巢狀非同步操作。
4 傳回一個 Mono<Void>,該 Mono<Void> 在接收完成時完成。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()            (1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()                     (4)
	}
}
1 存取輸入訊息串流。
2 對每個訊息執行某些操作。
3 執行使用訊息內容的巢狀非同步操作。
4 傳回一個 Mono<Void>,該 Mono<Void> 在接收完成時完成。
對於巢狀非同步操作,您可能需要在使用集區資料緩衝區的底層伺服器(例如 Netty)上呼叫 message.retain()。否則,資料緩衝區可能會在您有機會讀取資料之前釋放。如需更多背景資訊,請參閱 資料緩衝區與編解碼器

以下實作組合了輸入和輸出串流

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 處理輸入訊息串流。
2 建立輸出訊息,產生組合流程。
3 傳回一個 Mono<Void>,該 Mono<Void> 在我們繼續接收時不會完成。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()                      (1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }    (2)

		return session.send(output)                         (3)
	}
}
1 處理輸入訊息串流。
2 建立輸出訊息,產生組合流程。
3 傳回一個 Mono<Void>,該 Mono<Void> 在我們繼續接收時不會完成。

輸入和輸出串流可以是獨立的,並且僅為了完成而加入,如下列範例所示

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return Mono.zip(input, output).then();								(3)
	}
}
1 處理輸入訊息串流。
2 傳送輸出訊息。
3 加入串流並傳回一個 Mono<Void>,該 Mono<Void> 在任一串流結束時完成。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
1 處理輸入訊息串流。
2 傳送輸出訊息。
3 加入串流並傳回一個 Mono<Void>,該 Mono<Void> 在任一串流結束時完成。

DataBuffer

DataBuffer 是 WebFlux 中位元組緩衝區的表示形式。參考文件的 Spring Core 部分在關於 資料緩衝區與編解碼器 的章節中有更多資訊。要了解的重點是,在某些伺服器(如 Netty)上,位元組緩衝區是集區化和參考計數的,並且必須在使用後釋放,以避免記憶體洩漏。

在 Netty 上執行時,如果應用程式希望保留輸入資料緩衝區以確保它們不會被釋放,則必須使用 DataBufferUtils.retain(dataBuffer),然後在使用緩衝區後使用 DataBufferUtils.release(dataBuffer)

握手

WebSocketHandlerAdapter 委派給 WebSocketService。預設情況下,它是 HandshakeWebSocketService 的實例,它對 WebSocket 請求執行基本檢查,然後使用 RequestUpgradeStrategy 用於使用的伺服器。目前,內建支援 Reactor Netty、Tomcat、Jetty 和 Undertow。

HandshakeWebSocketService 公開一個 sessionAttributePredicate 屬性,該屬性允許設定一個 Predicate<String>,以從 WebSession 中提取屬性並將其插入到 WebSocketSession 的屬性中。

伺服器組態

每個伺服器的 RequestUpgradeStrategy 都公開了特定於底層 WebSocket 伺服器引擎的組態。當使用 WebFlux Java 組態時,您可以自訂這些屬性,如 WebFlux Config 的相應章節所示,否則,如果不使用 WebFlux 組態,請使用以下內容

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

檢查伺服器的升級策略,以查看有哪些選項可用。目前,只有 Tomcat 和 Jetty 公開了此類選項。

CORS

組態 CORS 並限制對 WebSocket 端點存取的最簡單方法是讓您的 WebSocketHandler 實作 CorsConfigurationSource 並傳回具有允許的來源、標頭和其他詳細資訊的 CorsConfiguration。如果您無法執行此操作,您也可以在 SimpleUrlHandler 上設定 corsConfigurations 屬性,以按 URL 模式指定 CORS 設定。如果兩者都指定,則使用 CorsConfiguration 上的 combine 方法將它們組合在一起。

客户端

Spring WebFlux 提供了一個 WebSocketClient 抽象,其中包含 Reactor Netty、Tomcat、Jetty、Undertow 和標準 Java(即 JSR-356)的實作。

Tomcat 客户端實際上是標準 Java 客户端的擴充功能,在 WebSocketSession 處理中具有一些額外功能,可以利用 Tomcat 特定的 API 來暫停接收訊息以進行背壓。

若要啟動 WebSocket Session,您可以建立客户端的實例並使用其 execute 方法

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://127.0.0.1:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://127.0.0.1:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

某些客户端(例如 Jetty)實作了 Lifecycle,需要在您可以使用它們之前停止和啟動它們。所有客户端都有與底層 WebSocket 客户端組態相關的建構子選項。