RSocket 支援
RSocket Spring Integration 模組 (spring-integration-rsocket
) 允許執行 RSocket 應用程式協定。
您需要將此相依性包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.3.5"
此模組從 5.2 版開始提供,並且基於 Spring Messaging 基礎,及其 RSocket 元件實作,例如 RSocketRequester
、RSocketMessageHandler
和 RSocketStrategies
。請參閱 Spring Framework RSocket 支援 以取得關於 RSocket 協定、術語和元件的更多資訊。
在透過通道配接器開始整合流程處理之前,我們需要建立伺服器和客戶端之間的 RSocket 連線。為此,Spring Integration RSocket 支援提供了 ServerRSocketConnector
和 ClientRSocketConnector
的 AbstractRSocketConnector
實作。
ServerRSocketConnector
根據提供的 io.rsocket.transport.ServerTransport
在主機和連接埠上公開一個監聽器,以接受來自客戶端的連線。可以使用 setServerConfigurer()
以及其他可以配置的選項(例如 RSocketStrategies
和酬載資料和標頭中繼資料的 MimeType
)來自訂內部 RSocketServer
實例。當從客戶端請求者提供 setupRoute
時(請參閱下方的 ClientRSocketConnector
),已連線的客戶端會以 clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
決定的金鑰儲存為 RSocketRequester
。預設情況下,連線資料會用作金鑰,並轉換為 UTF-8 字元集的字串值。此類 RSocketRequester
登錄可以用於應用程式邏輯中,以決定與其互動的特定客戶端連線,或將相同的訊息發佈到所有已連線的客戶端。當從客戶端建立連線時,會從 ServerRSocketConnector
發出 RSocketConnectedEvent
。這與 Spring Messaging 模組中的 @ConnectMapping
註解提供的功能類似。對應模式 *
表示接受所有客戶端路由。RSocketConnectedEvent
可用於透過 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
標頭區分不同的路由。
典型的伺服器配置可能如下所示
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有選項,包括 RSocketStrategies
bean 和 RSocketConnectedEvent
的 @EventListener
,都是可選的。請參閱 ServerRSocketConnector
JavaDocs 以取得更多資訊。
從 5.2.1 版開始,ServerRSocketMessageHandler
被提取到一個公開的頂層類別,以便可能與現有的 RSocket 伺服器連線。當 ServerRSocketConnector
提供 ServerRSocketMessageHandler
的外部實例時,它不會在內部建立 RSocket 伺服器,而只是將所有處理邏輯委派給提供的實例。此外,可以配置 ServerRSocketMessageHandler
的 messageMappingCompatible
標誌,以同時處理 RSocket 控制器的 @MessageMapping
,完全取代標準 RSocketMessageHandler
提供的功能。當經典的 @MessageMapping
方法與 RSocket 通道配接器一起存在於同一個應用程式中,並且應用程式中存在外部配置的 RSocket 伺服器時,這可能很有用。
ClientRSocketConnector
作為 RSocketRequester
的持有者,基於透過提供的 ClientTransport
連線的 RSocket
。可以使用提供的 RSocketConnectorConfigurer
自訂 RSocketConnector
。也可以在此元件上配置 setupRoute
(帶有可選的範本變數)和帶有中繼資料的 setupData
。
典型的客戶端配置可能如下所示
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
大多數這些選項(包括 RSocketStrategies
bean)都是可選的。請注意我們如何連線到任意連接埠上本地啟動的 RSocket 伺服器。請參閱 ServerRSocketConnector.clientRSocketKeyStrategy
以取得 setupData
使用案例。另請參閱 ClientRSocketConnector
及其 AbstractRSocketConnector
超類別 JavaDocs 以取得更多資訊。
ClientRSocketConnector
和 ServerRSocketConnector
都負責將輸入通道配接器對應到其 path
配置,以路由傳入的 RSocket 請求。請參閱下一節以取得更多資訊。
RSocket 輸入閘道器
RSocketInboundGateway
負責接收 RSocket 請求並產生回應(如果有的話)。它需要一個 path
對應陣列,該陣列可以是類似於 MVC 請求對應或 @MessageMapping
語義的模式。此外,(自 5.2.2 版起),可以在 RSocketInboundGateway
上配置一組互動模型(請參閱 RSocketInteractionModel
),以將 RSocket 請求限制為此端點,並根據特定的訊框類型。預設情況下,支援所有互動模型。此類 bean,根據其 IntegrationRSocketEndpoint
實作(ReactiveMessageHandler
的擴充功能),由 ServerRSocketConnector
或 ClientRSocketConnector
自動偵測,以在內部 IntegrationRSocketMessageHandler
中進行傳入請求的路由邏輯。可以將 AbstractRSocketConnector
提供給 RSocketInboundGateway
以進行明確的端點註冊。這樣,在該 AbstractRSocketConnector
上會停用自動偵測選項。RSocketStrategies
也可以注入到 RSocketInboundGateway
中,或者從提供的 AbstractRSocketConnector
取得它們,從而覆蓋任何明確的注入。解碼器從這些 RSocketStrategies
中使用,以根據提供的 requestElementType
解碼請求酬載。如果傳入的 Message
中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
標頭,則 RSocketInboundGateway
將請求視為 fireAndForget
RSocket 互動模型。在這種情況下,RSocketInboundGateway
會執行純粹的 send
操作到 outputChannel
中。否則,來自 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
標頭的 MonoProcessor
值會用於將回覆傳送給 RSocket。為此,RSocketInboundGateway
在 outputChannel
上執行 sendAndReceiveMessageReactive
操作。要向下游傳送的訊息的 payload
始終是 Flux
,根據 MessagingRSocket
邏輯。當在 fireAndForget
RSocket 互動模型中時,訊息具有純粹轉換的 payload
。回覆 payload
可以是純粹的物件或 Publisher
- RSocketInboundGateway
會根據 RSocketStrategies
中提供的編碼器將它們正確地轉換為 RSocket 回應。
從 5.3 版開始,將 decodeFluxAsUnit
選項(預設為 false
)新增至 RSocketInboundGateway
。預設情況下,傳入的 Flux
會以這樣的方式轉換,即其每個事件都會被單獨解碼。這是目前 @MessageMapping
語義中存在的精確行為。要恢復先前的行為或根據應用程式需求將整個 Flux
解碼為單個單元,則必須將 decodeFluxAsUnit
設定為 true
。但是,目標解碼邏輯取決於選定的 Decoder
,例如,StringDecoder
需要在串流中存在換行符分隔符(預設情況下)以指示位元組緩衝區結束。
請參閱 使用 Java 配置 RSocket 端點 以取得關於如何配置 RSocketInboundGateway
端點以及如何處理下游酬載的範例。
RSocket 輸出閘道器
RSocketOutboundGateway
是一個 AbstractReplyProducingMessageHandler
,用於執行 RSocket 請求並根據 RSocket 回覆(如果有的話)產生回覆。低階 RSocket 協定互動委派給從提供的 ClientRSocketConnector
或來自伺服器端請求訊息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
標頭解析的 RSocketRequester
。伺服器端上的目標 RSocketRequester
可以從 RSocketConnectedEvent
或使用 ServerRSocketConnector.getClientRSocketRequester()
API 根據為透過 ServerRSocketConnector.setClientRSocketKeyStrategy()
的連線請求對應選取的一些業務金鑰來解析。請參閱 ServerRSocketConnector
JavaDocs 以取得更多資訊。
要傳送請求的 route
必須明確配置(連同路徑變數)或透過 SpEL 運算式配置,該運算式會針對請求訊息進行評估。
RSocket 互動模型可以透過 RSocketInteractionModel
選項或各自的運算式設定來提供。預設情況下,requestResponse
用於常見的閘道器使用案例。
當請求訊息酬載為 Publisher
時,可以提供 publisherElementType
選項,以根據目標 RSocketRequester
中提供的 RSocketStrategies
對其元素進行編碼。此選項的運算式可以評估為 ParameterizedTypeReference
。請參閱 RSocketRequester.RequestSpec.data()
JavaDocs 以取得關於資料及其類型的更多資訊。
RSocket 請求也可以使用 metadata
進行增強。為此,可以在 RSocketOutboundGateway
上配置針對請求訊息的 metadataExpression
。此類運算式必須評估為 Map<Object, MimeType>
。
當 interactionModel
不是 fireAndForget
時,必須提供 expectedResponseType
。預設情況下,它是 String.class
。此選項的運算式可以評估為 ParameterizedTypeReference
。請參閱 RSocketRequester.RetrieveSpec.retrieveMono()
和 RSocketRequester.RetrieveSpec.retrieveFlux()
JavaDocs 以取得關於回覆資料及其類型的更多資訊。
來自 RSocketOutboundGateway
的回覆 payload
是一個 Mono
(即使對於 fireAndForget
互動模型,它也是 Mono<Void>
),始終使此元件成為 async
。此類 Mono
在產生到常規通道的 outputChannel
之前訂閱,或由 FluxMessageChannel
按需處理。requestStream
或 requestChannel
互動模型的 Flux
回應也包裝在回覆 Mono
中。它可以透過帶有直通服務啟動器的 FluxMessageChannel
在下游展平
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目標應用程式邏輯中明確訂閱。
預期的回應類型也可以配置(或透過運算式評估)為 void
,從而將此閘道器視為輸出通道配接器。但是,仍然必須配置 outputChannel
(即使它只是一個 NullChannel
)以啟動對傳回的 Mono
的訂閱。
請參閱 使用 Java 配置 RSocket 端點 以取得關於如何配置 RSocketOutboundGateway
端點以及如何處理下游酬載的範例。
RSocket 命名空間支援
Spring Integration 提供了 rsocket
命名空間和對應的結構描述定義。若要在您的配置中包含它,請在您的應用程式上下文配置文件中新增下列命名空間宣告
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
輸入
若要使用 XML 配置 Spring Integration RSocket 輸入通道配接器,您需要使用來自 int-rsocket
命名空間的適當 inbound-gateway
元件。下列範例顯示如何配置它
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
ClientRSocketConnector
和 ServerRSocketConnector
應配置為通用 <bean>
定義。
輸出
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
請參閱 spring-integration-rsocket.xsd
以取得所有這些 XML 屬性的描述。
使用 Java 配置 RSocket 端點
下列範例顯示如何使用 Java 配置 RSocket 輸入端點
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
在此配置中假定存在 ClientRSocketConnector
或 ServerRSocketConnector
,其意義在於自動偵測「echo」路徑上的此類端點。請注意 @Transformer
簽名,及其對 RSocket 請求的完全反應式處理和產生反應式回覆。
下列範例顯示如何使用 Java DSL 配置 RSocket 輸入閘道器
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
在此配置中假定存在 ClientRSocketConnector
或 ServerRSocketConnector
,其意義在於自動偵測「/uppercase」路徑上的此類端點,以及預期的互動模型為「請求通道」。
下列範例顯示如何使用 Java 配置 RSocket 輸出閘道器
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
setClientRSocketConnector()
僅在客戶端需要。在伺服器端,必須在請求訊息中提供帶有 RSocketRequester
值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
標頭。
下列範例顯示如何使用 Java DSL 配置 RSocket 輸出閘道器
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
請參閱 IntegrationFlow
作為閘道器 以取得關於如何在上述流程開頭使用提及的 Function
介面的更多資訊。