RSocket 支援

RSocket Spring Integration 模組 (spring-integration-rsocket) 允許執行 RSocket 應用程式協定

您需要將此相依性包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.3.5"

此模組從 5.2 版開始提供,並且基於 Spring Messaging 基礎,及其 RSocket 元件實作,例如 RSocketRequesterRSocketMessageHandlerRSocketStrategies。請參閱 Spring Framework RSocket 支援 以取得關於 RSocket 協定、術語和元件的更多資訊。

在透過通道配接器開始整合流程處理之前,我們需要建立伺服器和客戶端之間的 RSocket 連線。為此,Spring Integration RSocket 支援提供了 ServerRSocketConnectorClientRSocketConnectorAbstractRSocketConnector 實作。

ServerRSocketConnector 根據提供的 io.rsocket.transport.ServerTransport 在主機和連接埠上公開一個監聽器,以接受來自客戶端的連線。可以使用 setServerConfigurer() 以及其他可以配置的選項(例如 RSocketStrategies 和酬載資料和標頭中繼資料的 MimeType)來自訂內部 RSocketServer 實例。當從客戶端請求者提供 setupRoute 時(請參閱下方的 ClientRSocketConnector),已連線的客戶端會以 clientRSocketKeyStrategy BiFunction<Map<String, Object>, DataBuffer, Object> 決定的金鑰儲存為 RSocketRequester。預設情況下,連線資料會用作金鑰,並轉換為 UTF-8 字元集的字串值。此類 RSocketRequester 登錄可以用於應用程式邏輯中,以決定與其互動的特定客戶端連線,或將相同的訊息發佈到所有已連線的客戶端。當從客戶端建立連線時,會從 ServerRSocketConnector 發出 RSocketConnectedEvent。這與 Spring Messaging 模組中的 @ConnectMapping 註解提供的功能類似。對應模式 * 表示接受所有客戶端路由。RSocketConnectedEvent 可用於透過 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER 標頭區分不同的路由。

典型的伺服器配置可能如下所示

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有選項,包括 RSocketStrategies bean 和 RSocketConnectedEvent@EventListener,都是可選的。請參閱 ServerRSocketConnector JavaDocs 以取得更多資訊。

從 5.2.1 版開始,ServerRSocketMessageHandler 被提取到一個公開的頂層類別,以便可能與現有的 RSocket 伺服器連線。當 ServerRSocketConnector 提供 ServerRSocketMessageHandler 的外部實例時,它不會在內部建立 RSocket 伺服器,而只是將所有處理邏輯委派給提供的實例。此外,可以配置 ServerRSocketMessageHandlermessageMappingCompatible 標誌,以同時處理 RSocket 控制器的 @MessageMapping,完全取代標準 RSocketMessageHandler 提供的功能。當經典的 @MessageMapping 方法與 RSocket 通道配接器一起存在於同一個應用程式中,並且應用程式中存在外部配置的 RSocket 伺服器時,這可能很有用。

ClientRSocketConnector 作為 RSocketRequester 的持有者,基於透過提供的 ClientTransport 連線的 RSocket。可以使用提供的 RSocketConnectorConfigurer 自訂 RSocketConnector。也可以在此元件上配置 setupRoute(帶有可選的範本變數)和帶有中繼資料的 setupData

典型的客戶端配置可能如下所示

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

大多數這些選項(包括 RSocketStrategies bean)都是可選的。請注意我們如何連線到任意連接埠上本地啟動的 RSocket 伺服器。請參閱 ServerRSocketConnector.clientRSocketKeyStrategy 以取得 setupData 使用案例。另請參閱 ClientRSocketConnector 及其 AbstractRSocketConnector 超類別 JavaDocs 以取得更多資訊。

ClientRSocketConnectorServerRSocketConnector 都負責將輸入通道配接器對應到其 path 配置,以路由傳入的 RSocket 請求。請參閱下一節以取得更多資訊。

RSocket 輸入閘道器

RSocketInboundGateway 負責接收 RSocket 請求並產生回應(如果有的話)。它需要一個 path 對應陣列,該陣列可以是類似於 MVC 請求對應或 @MessageMapping 語義的模式。此外,(自 5.2.2 版起),可以在 RSocketInboundGateway 上配置一組互動模型(請參閱 RSocketInteractionModel),以將 RSocket 請求限制為此端點,並根據特定的訊框類型。預設情況下,支援所有互動模型。此類 bean,根據其 IntegrationRSocketEndpoint 實作(ReactiveMessageHandler 的擴充功能),由 ServerRSocketConnectorClientRSocketConnector 自動偵測,以在內部 IntegrationRSocketMessageHandler 中進行傳入請求的路由邏輯。可以將 AbstractRSocketConnector 提供給 RSocketInboundGateway 以進行明確的端點註冊。這樣,在該 AbstractRSocketConnector 上會停用自動偵測選項。RSocketStrategies 也可以注入到 RSocketInboundGateway 中,或者從提供的 AbstractRSocketConnector 取得它們,從而覆蓋任何明確的注入。解碼器從這些 RSocketStrategies 中使用,以根據提供的 requestElementType 解碼請求酬載。如果傳入的 Message 中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 標頭,則 RSocketInboundGateway 將請求視為 fireAndForget RSocket 互動模型。在這種情況下,RSocketInboundGateway 會執行純粹的 send 操作到 outputChannel 中。否則,來自 RSocketPayloadReturnValueHandler.RESPONSE_HEADER 標頭的 MonoProcessor 值會用於將回覆傳送給 RSocket。為此,RSocketInboundGatewayoutputChannel 上執行 sendAndReceiveMessageReactive 操作。要向下游傳送的訊息的 payload 始終是 Flux,根據 MessagingRSocket 邏輯。當在 fireAndForget RSocket 互動模型中時,訊息具有純粹轉換的 payload。回覆 payload 可以是純粹的物件或 Publisher - RSocketInboundGateway 會根據 RSocketStrategies 中提供的編碼器將它們正確地轉換為 RSocket 回應。

從 5.3 版開始,將 decodeFluxAsUnit 選項(預設為 false)新增至 RSocketInboundGateway。預設情況下,傳入的 Flux 會以這樣的方式轉換,即其每個事件都會被單獨解碼。這是目前 @MessageMapping 語義中存在的精確行為。要恢復先前的行為或根據應用程式需求將整個 Flux 解碼為單個單元,則必須將 decodeFluxAsUnit 設定為 true。但是,目標解碼邏輯取決於選定的 Decoder,例如,StringDecoder 需要在串流中存在換行符分隔符(預設情況下)以指示位元組緩衝區結束。

請參閱 使用 Java 配置 RSocket 端點 以取得關於如何配置 RSocketInboundGateway 端點以及如何處理下游酬載的範例。

RSocket 輸出閘道器

RSocketOutboundGateway 是一個 AbstractReplyProducingMessageHandler,用於執行 RSocket 請求並根據 RSocket 回覆(如果有的話)產生回覆。低階 RSocket 協定互動委派給從提供的 ClientRSocketConnector 或來自伺服器端請求訊息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 標頭解析的 RSocketRequester。伺服器端上的目標 RSocketRequester 可以從 RSocketConnectedEvent 或使用 ServerRSocketConnector.getClientRSocketRequester() API 根據為透過 ServerRSocketConnector.setClientRSocketKeyStrategy() 的連線請求對應選取的一些業務金鑰來解析。請參閱 ServerRSocketConnector JavaDocs 以取得更多資訊。

要傳送請求的 route 必須明確配置(連同路徑變數)或透過 SpEL 運算式配置,該運算式會針對請求訊息進行評估。

RSocket 互動模型可以透過 RSocketInteractionModel 選項或各自的運算式設定來提供。預設情況下,requestResponse 用於常見的閘道器使用案例。

當請求訊息酬載為 Publisher 時,可以提供 publisherElementType 選項,以根據目標 RSocketRequester 中提供的 RSocketStrategies 對其元素進行編碼。此選項的運算式可以評估為 ParameterizedTypeReference。請參閱 RSocketRequester.RequestSpec.data() JavaDocs 以取得關於資料及其類型的更多資訊。

RSocket 請求也可以使用 metadata 進行增強。為此,可以在 RSocketOutboundGateway 上配置針對請求訊息的 metadataExpression。此類運算式必須評估為 Map<Object, MimeType>

interactionModel 不是 fireAndForget 時,必須提供 expectedResponseType。預設情況下,它是 String.class。此選項的運算式可以評估為 ParameterizedTypeReference。請參閱 RSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux() JavaDocs 以取得關於回覆資料及其類型的更多資訊。

來自 RSocketOutboundGateway 的回覆 payload 是一個 Mono(即使對於 fireAndForget 互動模型,它也是 Mono<Void>),始終使此元件成為 async。此類 Mono 在產生到常規通道的 outputChannel 之前訂閱,或由 FluxMessageChannel 按需處理。requestStreamrequestChannel 互動模型的 Flux 回應也包裝在回覆 Mono 中。它可以透過帶有直通服務啟動器的 FluxMessageChannel 在下游展平

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或在目標應用程式邏輯中明確訂閱。

預期的回應類型也可以配置(或透過運算式評估)為 void,從而將此閘道器視為輸出通道配接器。但是,仍然必須配置 outputChannel(即使它只是一個 NullChannel)以啟動對傳回的 Mono 的訂閱。

請參閱 使用 Java 配置 RSocket 端點 以取得關於如何配置 RSocketOutboundGateway 端點以及如何處理下游酬載的範例。

RSocket 命名空間支援

Spring Integration 提供了 rsocket 命名空間和對應的結構描述定義。若要在您的配置中包含它,請在您的應用程式上下文配置文件中新增下列命名空間宣告

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

輸入

若要使用 XML 配置 Spring Integration RSocket 輸入通道配接器,您需要使用來自 int-rsocket 命名空間的適當 inbound-gateway 元件。下列範例顯示如何配置它

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

ClientRSocketConnectorServerRSocketConnector 應配置為通用 <bean> 定義。

輸出

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

請參閱 spring-integration-rsocket.xsd 以取得所有這些 XML 屬性的描述。

使用 Java 配置 RSocket 端點

下列範例顯示如何使用 Java 配置 RSocket 輸入端點

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

在此配置中假定存在 ClientRSocketConnectorServerRSocketConnector,其意義在於自動偵測「echo」路徑上的此類端點。請注意 @Transformer 簽名,及其對 RSocket 請求的完全反應式處理和產生反應式回覆。

下列範例顯示如何使用 Java DSL 配置 RSocket 輸入閘道器

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlow
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

在此配置中假定存在 ClientRSocketConnectorServerRSocketConnector,其意義在於自動偵測「/uppercase」路徑上的此類端點,以及預期的互動模型為「請求通道」。

下列範例顯示如何使用 Java 配置 RSocket 輸出閘道器

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

setClientRSocketConnector() 僅在客戶端需要。在伺服器端,必須在請求訊息中提供帶有 RSocketRequester 值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER 標頭。

下列範例顯示如何使用 Java DSL 配置 RSocket 輸出閘道器

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlow
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

請參閱 IntegrationFlow 作為閘道器 以取得關於如何在上述流程開頭使用提及的 Function 介面的更多資訊。