RSocket

本節說明 Spring Framework 對於 RSocket 協定的支援。

概觀

RSocket 是一種應用程式協定,用於透過 TCP、WebSocket 和其他位元組串流傳輸進行多工、雙工通訊,並使用下列其中一種互動模型

  • 請求-回應 — 發送一則訊息並接收一則回應。

  • 請求-串流 — 發送一則訊息並接收一串訊息回應。

  • 通道 — 在兩個方向發送訊息串流。

  • 發送後不理 — 發送單向訊息。

一旦建立初始連線,「用戶端」與「伺服器」之間的區別就會消失,因為雙方都變得對稱,且每一方都可以啟動上述互動之一。這就是為什麼在協定呼叫中,參與方被稱為「請求者」和「回應者」,而上述互動則被稱為「請求串流」或簡稱為「請求」。

這些是 RSocket 協定的主要功能和優點

  • Reactive Streams 語意跨越網路邊界 — 對於諸如 請求-串流通道 等串流請求,背壓訊號會在請求者和回應者之間傳輸,允許請求者從源頭減慢回應者的速度,從而減少對網路層壅塞控制的依賴,並減少在網路層或任何層級進行緩衝的需求。

  • 請求節流 — 此功能在 LEASE 框架之後被命名為「租賃」,該框架可以從每一端發送,以限制另一端在給定時間內允許的請求總數。租賃會定期續訂。

  • 會話恢復 — 這是為連線中斷而設計,並且需要維護某些狀態。狀態管理對於應用程式是透明的,並且與背壓結合良好,背壓可以在可能時停止生產者並減少所需狀態的數量。

  • 大型訊息的片段化和重新組裝。

  • Keepalive (心跳)。

RSocket 在多種語言中都有實作。Java 程式庫建立在 Project Reactor 之上,而傳輸則建立在 Reactor Netty 之上。這表示來自您應用程式中 Reactive Streams Publisher 的訊號會透過 RSocket 透明地跨網路傳播。

協定

RSocket 的優點之一是它在線路上具有明確定義的行為,並且易於閱讀的 規格 以及一些協定 擴充功能。因此,最好閱讀規格,而無需考慮語言實作和更高等級的框架 API。本節提供簡潔的概述,以建立一些背景知識。

連線

最初,用戶端透過諸如 TCP 或 WebSocket 之類的低階串流傳輸連線到伺服器,並向伺服器發送 SETUP 框架以設定連線參數。

伺服器可能會拒絕 SETUP 框架,但通常在發送(對於用戶端)和接收(對於伺服器)之後,雙方都可以開始發出請求,除非 SETUP 指示使用租賃語意來限制請求數量,在這種情況下,雙方都必須等待來自另一端的 LEASE 框架以允許發出請求。

發出請求

一旦建立連線,雙方都可以透過框架 REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF 之一啟動請求。這些框架中的每一個都攜帶一則從請求者到回應者的訊息。

然後,回應者可能會傳回包含回應訊息的 PAYLOAD 框架,而在 REQUEST_CHANNEL 的情況下,請求者也可能會發送包含更多請求訊息的 PAYLOAD 框架。

當請求涉及訊息串流(例如 請求-串流通道)時,回應者必須尊重來自請求者的需求訊號。需求表示為訊息數量。初始需求在 REQUEST_STREAMREQUEST_CHANNEL 框架中指定。後續需求透過 REQUEST_N 框架發出訊號。

每一方也可以透過 METADATA_PUSH 框架發送元資料通知,這些通知不涉及任何個別請求,而是涉及整個連線。

訊息格式

RSocket 訊息包含資料和元資料。元資料可用於發送路由、安全 Token 等。資料和元資料可以採用不同的格式。每種格式的 Mime 類型都在 SETUP 框架中宣告,並適用於給定連線上的所有請求。

雖然所有訊息都可以包含元資料,但通常諸如路由之類的元資料是按請求劃分的,因此僅包含在請求的第一則訊息中,即使用框架 REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF 之一。

協定擴充功能定義了應用程式中使用的常見元資料格式

Java 實作

RSocket 的 Java 實作 建立在 Project Reactor 之上。TCP 和 WebSocket 的傳輸建立在 Reactor Netty 之上。作為 Reactive Streams 程式庫,Reactor 簡化了協定的實作工作。對於應用程式來說,使用 FluxMono 以及宣告式運算子和透明背壓支援是一種自然的選擇。

RSocket Java 中的 API 刻意保持最小化和基本。它專注於協定功能,並將應用程式程式設計模型(例如,RPC 代碼生成與其他)作為更高等級、獨立的考量。

主要契約 io.rsocket.RSocket 使用 Mono(表示單一訊息的 Promise)、Flux(訊息串流)和 io.rsocket.Payload(可存取資料和元資料作為位元組緩衝區的實際訊息)來模擬四種請求互動類型。RSocket 契約是對稱使用的。對於請求,應用程式會獲得一個 RSocket 以執行請求。對於回應,應用程式會實作 RSocket 以處理請求。

這並非旨在成為全面的介紹。在大多數情況下,Spring 應用程式不必直接使用其 API。但是,獨立於 Spring 查看或試驗 RSocket 可能很重要。RSocket Java 儲存庫包含許多 範例應用程式,這些應用程式示範了其 API 和協定功能。

Spring 支援

spring-messaging 模組包含下列項目

  • RSocketRequester — Fluent API,用於透過 io.rsocket.RSocket 發出請求,並進行資料和元資料編碼/解碼。

  • 註解回應器 — 用於回應的 @MessageMapping@RSocketExchange 註解處理器方法。

  • RSocket 介面 — RSocket 服務宣告,作為具有 @RSocketExchange 方法的 Java 介面,用於作為請求者或回應者。

spring-web 模組包含 EncoderDecoder 實作,例如 Jackson CBOR/JSON 和 Protobuf,RSocket 應用程式可能需要這些實作。它還包含 PathPatternParser,可以插入以實現高效的路由匹配。

Spring Boot 2.2 支援透過 TCP 或 WebSocket 啟動 RSocket 伺服器,包括在 WebFlux 伺服器中透過 WebSocket 公開 RSocket 的選項。此外,還提供用戶端支援以及 RSocketRequester.BuilderRSocketStrategies 的自動組態。有關更多詳細資訊,請參閱 Spring Boot 參考文件中的 RSocket 章節

Spring Security 5.2 提供 RSocket 支援。

Spring Integration 5.2 提供入站和出站閘道,用於與 RSocket 用戶端和伺服器互動。有關更多詳細資訊,請參閱 Spring Integration 參考手冊。

Spring Cloud Gateway 支援 RSocket 連線。

RSocketRequester

RSocketRequester 提供 Fluent API,用於執行 RSocket 請求,接受和傳回資料和元資料的物件,而不是低階資料緩衝區。它可以對稱地使用,以從用戶端發出請求以及從伺服器發出請求。

用戶端請求器

在用戶端取得 RSocketRequester 是為了連線到伺服器,這涉及發送包含連線設定的 RSocket SETUP 框架。RSocketRequester 提供了一個 Builder,可協助準備 io.rsocket.core.RSocketConnector,其中包括 SETUP 框架的連線設定。

這是使用預設設定連線的最基本方式

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

上述程式碼不會立即連線。當發出請求時,會透明地建立並使用共用連線。

連線設定

RSocketRequester.Builder 提供以下功能來自訂初始 SETUP 框架

  • dataMimeType(MimeType) — 設定連線上資料的 mime 類型。

  • metadataMimeType(MimeType) — 設定連線上元資料的 mime 類型。

  • setupData(Object) — 要包含在 SETUP 中的資料。

  • setupRoute(String, Object…) — 要包含在 SETUP 中的元資料中的路由。

  • setupMetadata(Object, MimeType) — 要包含在 SETUP 中的其他元資料。

對於資料,預設 mime 類型是從第一個設定的 Decoder 衍生而來。對於元資料,預設 mime 類型是複合元資料,它允許每個請求有多個元資料值和 mime 類型對。通常兩者都不需要更改。

SETUP 框架中的資料和元資料是選用的。在伺服器端,可以使用 @ConnectMapping 方法來處理連線的開始和 SETUP 框架的內容。元資料可以用於連線層級安全性。

策略

RSocketRequester.Builder 接受 RSocketStrategies 來設定請求器。您需要使用它來為資料和元資料值的 (反)序列化提供編碼器和解碼器。預設情況下,僅註冊了來自 spring-coreStringbyte[]ByteBuffer 的基本編解碼器。新增 spring-web 可以存取更多可以按如下方式註冊的編解碼器

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies 設計為可重複使用。在某些情況下,例如,同一應用程式中的用戶端和伺服器,最好在 Spring 組態中宣告它。

用戶端回應器

RSocketRequester.Builder 可用於設定回應器以回應來自伺服器的請求。

您可以使用註解處理器進行用戶端回應,這些處理器基於與伺服器上使用的相同基礎架構,但以程式設計方式註冊,如下所示

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 如果存在 spring-web,則使用 PathPatternRouteMatcher 以實現高效的路徑匹配。
2 從具有 @MessageMapping 和/或 @ConnectMapping 方法的類別建立回應器。
3 註冊回應器。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 如果存在 spring-web,則使用 PathPatternRouteMatcher 以實現高效的路徑匹配。
2 從具有 @MessageMapping 和/或 @ConnectMapping 方法的類別建立回應器。
3 註冊回應器。

請注意,以上僅是為程式化註冊用戶端回應器而設計的快捷方式。對於其他情境,例如用戶端回應器位於 Spring 設定中,您仍然可以將 RSocketMessageHandler 宣告為 Spring Bean,然後按如下方式套用

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

對於上述情況,您可能還需要在 RSocketMessageHandler 中使用 setHandlerPredicate,以切換到不同的策略來偵測用戶端回應器,例如,基於自訂註解(例如 @RSocketClientResponder)而不是預設的 @Controller。這在具有用戶端和伺服器,或同一應用程式中有多個用戶端的情境中是必要的。

另請參閱 註解回應器,以取得有關程式設計模型的更多資訊。

進階

RSocketRequesterBuilder 提供一個回呼,以公開底層的 io.rsocket.core.RSocketConnector,以便進行更多組態選項,例如 keepalive 間隔、會話恢復、攔截器等等。您可以按如下方式在該層級設定選項

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

伺服器請求器

若要從伺服器向連線的用戶端發出請求,只需從伺服器取得連線用戶端的請求器即可。

註解回應器 中,@ConnectMapping@MessageMapping 方法支援 RSocketRequester 引數。使用它來存取連線的請求器。請記住,@ConnectMapping 方法本質上是 SETUP 訊框的處理常式,必須在請求開始之前處理。因此,一開始的請求必須與處理分離。例如

  • Java

  • Kotlin

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 非同步啟動請求,獨立於處理。
2 執行處理並傳回完成 Mono<Void>
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 非同步啟動請求,獨立於處理。
2 在暫停函數中執行處理。

請求

一旦您擁有 用戶端伺服器 請求器,您就可以按如下方式發出請求

  • Java

  • Kotlin

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 指定要包含在請求訊息中繼資料中的路由。
2 提供請求訊息的資料。
3 宣告預期的回應。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 指定要包含在請求訊息中繼資料中的路由。
2 提供請求訊息的資料。
3 宣告預期的回應。

互動類型是根據輸入和輸出的基數隱式決定的。上面的範例是 Request-Stream,因為傳送一個值並接收一個值串流。在大多數情況下,您不需要考慮這一點,只要輸入和輸出的選擇符合 RSocket 互動類型,並且輸入和輸出的類型符合回應器的預期即可。唯一無效的組合是多對一。

data(Object) 方法也接受任何 Reactive Streams Publisher,包括 FluxMono,以及在 ReactiveAdapterRegistry 中註冊的任何其他值產生器。對於產生相同類型值的多值 Publisher(例如 Flux),請考慮使用其中一個多載的 data 方法,以避免在每個元素上進行類型檢查和 Encoder 查找

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) 步驟是選用的。對於不傳送資料的請求,請跳過它

  • Java

  • Kotlin

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用 複合中繼資料(預設)並且值受已註冊的 Encoder 支援,則可以新增額外的中繼資料值。例如

  • Java

  • Kotlin

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

對於 Fire-and-Forget,請使用傳回 Mono<Void>send() 方法。請注意,Mono 僅指示訊息已成功傳送,而非已處理。

對於 Metadata-Push,請使用傳回值為 Mono<Void>sendMetadata() 方法。

註解回應器

RSocket 回應器可以實作為 @MessageMapping@ConnectMapping 方法。@MessageMapping 方法處理個別請求,而 @ConnectMapping 方法處理連線層級事件(設定和中繼資料推送)。註解回應器是對稱支援的,用於從伺服器端回應和從用戶端回應。

伺服器回應器

若要在伺服器端使用註解回應器,請將 RSocketMessageHandler 新增至您的 Spring 設定,以偵測具有 @MessageMapping@ConnectMapping 方法的 @Controller Bean

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然後透過 Java RSocket API 啟動 RSocket 伺服器,並將 RSocketMessageHandler 插入為回應器,如下所示

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler 預設支援 複合路由 中繼資料。如果您需要切換到不同的 mime 類型或註冊其他中繼資料 mime 類型,則可以設定其 MetadataExtractor

您需要設定支援中繼資料和資料格式所需的 EncoderDecoder 實例。您可能需要 spring-web 模組來實現編解碼器。

預設情況下,SimpleRouteMatcher 用於透過 AntPathMatcher 匹配路由。我們建議插入來自 spring-webPathPatternRouteMatcher 以實現高效的路徑匹配。RSocket 路由可以是階層式的,但不是 URL 路徑。路由匹配器都配置為預設使用 "." 作為分隔符號,並且沒有像 HTTP URL 那樣的 URL 解碼。

RSocketMessageHandler 可以透過 RSocketStrategies 進行配置,如果您需要在同一程序中的用戶端和伺服器之間共用組態,這可能會很有用

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

用戶端回應器

用戶端上的註解回應器需要在 RSocketRequester.Builder 中配置。如需詳細資訊,請參閱 用戶端回應器

@MessageMapping

一旦 伺服器用戶端 回應器組態就位,就可以按如下方式使用 @MessageMapping 方法

  • Java

  • Kotlin

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

上面的 @MessageMapping 方法回應具有路由 "locate.radars.within" 的 Request-Stream 互動。它支援彈性的方法簽名,並可選擇使用以下方法引數

方法引數 描述

@Payload

請求的酬載。這可以是 MonoFlux 等非同步類型的具體值。

注意: 註解的使用是選用的。如果方法引數不是簡單類型,也不是任何其他支援的引數,則假定為預期的酬載。

RSocketRequester

用於向遠端端點發出請求的請求器。

@DestinationVariable

根據對應模式中的變數從路由中提取的值,例如 @MessageMapping("find.radar.{id}")

@Header

為提取而註冊的中繼資料值,如 MetadataExtractor 中所述。

@Headers Map<String, Object>

為提取而註冊的所有中繼資料值,如 MetadataExtractor 中所述。

傳回值預期為一個或多個要序列化為回應酬載的物件。這可以是 MonoFlux 等非同步類型、具體值,或 void 或無值非同步類型(例如 Mono<Void>)。

@MessageMapping 方法支援的 RSocket 互動類型是根據輸入(即 @Payload 引數)和輸出的基數決定的,其中基數表示以下內容

基數 描述

1

明確值,或單值非同步類型,例如 Mono<T>

多個

多值非同步類型,例如 Flux<T>

0

對於輸入,這表示方法沒有 @Payload 引數。

對於輸出,這是 void 或無值非同步類型,例如 Mono<Void>

下表顯示了所有輸入和輸出基數組合以及對應的互動類型

輸入基數 輸出基數 互動類型

0, 1

0

Fire-and-Forget、Request-Response

0, 1

1

Request-Response

0, 1

多個

Request-Stream

多個

0、1、多個

Request-Channel

@RSocketExchange

作為 @MessageMapping 的替代方案,您也可以使用 @RSocketExchange 方法處理請求。此類方法在 RSocket 介面 上宣告,並且可以透過 RSocketServiceProxyFactory 用作請求器,或由回應器實作。

例如,將請求作為回應器處理

  • Java

  • Kotlin

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

@RSocketExhange@MessageMapping 之間存在一些差異,因為前者需要保持適用於請求器和回應器用途。例如,雖然可以宣告 @MessageMapping 來處理任意數量的路由,並且每個路由都可以是模式,但 @RSocketExchange 必須使用單個具體路由宣告。在與中繼資料相關的支援方法參數中也存在一些小的差異,請參閱 @MessageMappingRSocket 介面 以取得支援參數的列表。

@RSocketExchange 可以在類型層級使用,以指定給定 RSocket 服務介面的所有路由的通用前綴。

@ConnectMapping

@ConnectMapping 處理 RSocket 連線開始時的 SETUP 訊框,以及後續透過 METADATA_PUSH 訊框進行的任何中繼資料推送通知,即 io.rsocket.RSocket 中的 metadataPush(Payload)

@ConnectMapping 方法支援與 @MessageMapping 相同的引數,但基於來自 SETUPMETADATA_PUSH 訊框的中繼資料和資料。@ConnectMapping 可以具有模式,以將處理範圍縮小到中繼資料中具有路由的特定連線,如果未宣告任何模式,則所有連線都符合。

@ConnectMapping 方法無法傳回資料,並且必須宣告為以 voidMono<Void> 作為傳回值。如果新連線的處理傳回錯誤,則連線將被拒絕。處理不得被擱置以向連線的 RSocketRequester 發出請求。如需詳細資訊,請參閱 伺服器請求器

MetadataExtractor

回應器必須解釋中繼資料。複合中繼資料 允許獨立格式化的中繼資料值(例如,用於路由、安全性、追蹤),每個值都有自己的 mime 類型。應用程式需要一種方法來設定要支援的中繼資料 mime 類型,以及一種方法來存取提取的值。

MetadataExtractor 是一份合約,用於接收序列化的中繼資料並傳回解碼的名稱-值對,然後可以像標頭一樣按名稱存取這些值,例如透過註解處理常式方法中的 @Header

可以為 DefaultMetadataExtractor 提供 Decoder 實例以解碼中繼資料。它開箱即用地內建支援 "message/x.rsocket.routing.v0",它將其解碼為 String 並儲存在 "route" 鍵下。對於任何其他 mime 類型,您需要提供 Decoder 並按如下方式註冊 mime 類型

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

複合中繼資料非常適合組合獨立的中繼資料值。但是,請求器可能不支援複合中繼資料,或者可能選擇不使用它。為此,DefaultMetadataExtractor 可能需要自訂邏輯來將解碼的值對應到輸出映射。以下是一個使用 JSON 作為中繼資料的範例

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

透過 RSocketStrategies 設定 MetadataExtractor 時,您可以讓 RSocketStrategies.Builder 使用已配置的解碼器建立提取器,並簡單地使用回呼來自訂註冊,如下所示

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket 介面

Spring Framework 允許您將 RSocket 服務定義為具有 @RSocketExchange 方法的 Java 介面。您可以將此類介面傳遞給 RSocketServiceProxyFactory 以建立代理,該代理透過 RSocketRequester 執行請求。您也可以將介面實作為處理請求的回應器。

首先建立具有 @RSocketExchange 方法的介面

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

現在您可以建立一個代理,該代理在呼叫方法時執行請求

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

您也可以實作介面以將請求作為回應器處理。請參閱 註解回應器

方法參數

註解的 RSocket 交換方法支援彈性的方法簽名,其中包含以下方法參數

方法引數 描述

@DestinationVariable

新增路由變數以與來自 @RSocketExchange 註解的路由一起傳遞到 RSocketRequester,以便展開路由中的範本預留位置。此變數可以是字串或任何物件,然後透過 toString() 格式化。

@Payload

設定請求的輸入酬載。這可以是具體值,或任何可以透過 ReactiveAdapterRegistry 適配到 Reactive Streams Publisher 的值產生器。除非將 required 屬性設定為 false,或者參數被標記為選用(由 MethodParameter#isOptional 確定),否則必須提供酬載。

Object,如果後接 MimeType

輸入酬載中繼資料項目的值。這可以是任何 Object,只要下一個引數是中繼資料項目 MimeType。該值可以是具體值或任何可以透過 ReactiveAdapterRegistry 適配到 Reactive Streams Publisher 的單個值產生器。

MimeType

中繼資料項目的 MimeType。預期的前一個方法引數是中繼資料值。

傳回值

註解的 RSocket 交換方法支援作為具體值或可以透過 ReactiveAdapterRegistry 適配到 Reactive Streams Publisher 的值產生器的傳回值。

預設情況下,具有同步(阻塞)方法簽名的 RSocket 服務方法的行為取決於底層 RSocket ClientTransport 的回應逾時設定以及 RSocket keep-alive 設定。RSocketServiceProxyFactory.Builder 公開了一個 blockTimeout 選項,該選項也允許您設定阻塞以等待回應的最長時間,但我們建議在 RSocket 層級配置逾時值以獲得更多控制。