WebFlux 支援
WebFlux Spring Integration 模組 (spring-integration-webflux
) 允許以反應式方式執行 HTTP 請求和處理輸入 HTTP 請求。
您需要將此相依性包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.3.5"
在非基於 Servlet 的伺服器設定中,必須包含 io.projectreactor.netty:reactor-netty
相依性。
WebFlux 支援包含下列閘道器實作:WebFluxInboundEndpoint
和 WebFluxRequestExecutingMessageHandler
。此支援完全基於 Spring WebFlux 和 Project Reactor 基礎。如需更多資訊,請參閱 HTTP 支援,因為許多選項在反應式和常規 HTTP 元件之間是共用的。
WebFlux 命名空間支援
Spring Integration 提供 webflux
命名空間和對應的結構描述定義。若要將其包含在您的設定中,請在您的應用程式內容設定檔中新增下列命名空間宣告
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 輸入元件
從 5.0 版開始,提供 WebHandler
的 WebFluxInboundEndpoint
實作。此元件類似於基於 MVC 的 HttpRequestHandlingEndpointSupport
,它透過新提取的 BaseHttpInboundEndpoint
與其共用一些常見選項。它用於 Spring WebFlux 反應式環境中(而不是 MVC)。下列範例顯示 WebFlux 端點的簡單實作
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
此設定類似於 HttpRequestHandlingEndpointSupport
(在範例之前提到),除了我們使用 @EnableWebFlux
將 WebFlux 基礎架構新增至我們的整合應用程式。此外,WebFluxInboundEndpoint
透過使用背壓、基於需求的容量(由反應式 HTTP 伺服器實作提供)對下游流程執行 sendAndReceive
操作。
回覆部分也是非封鎖的,並且基於內部 FutureReplyChannel ,它被平面映射到回覆 Mono 以進行基於需求的分辨率。 |
您可以使用自訂 ServerCodecConfigurer
、RequestedContentTypeResolver
甚至 ReactiveAdapterRegistry
來設定 WebFluxInboundEndpoint
。後者提供了一種機制,您可以使用該機制將回覆作為任何反應式類型傳回:Reactor Flux
、RxJava Observable
、Flowable
和其他類型。這樣,我們可以使用 Spring Integration 元件實作 伺服器發送事件 情境,如下列範例所示
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
如需更多可能的設定選項,請參閱 請求對應支援 和 跨來源資源共享 (CORS) 支援。
當請求主體為空或 payloadExpression
傳回 null
時,請求參數 (MultiValueMap<String, String>
) 用於要處理的目標訊息的 payload
。
酬載驗證
從 5.2 版開始,可以使用 Validator
設定 WebFluxInboundEndpoint
。與 HTTP 支援 中的 MVC 驗證不同,它用於驗證 Publisher
中的元素,請求已由 HttpMessageReader
轉換為 Publisher
,然後執行回退和 payloadExpression
函數。架構無法假設在建立最終酬載後 Publisher
物件可能有多複雜。如果需要限制最終酬載(或其 Publisher
元素)的驗證可見性,則驗證應在下游而不是 WebFlux 端點進行。如需更多資訊,請參閱 Spring WebFlux 文件。無效的酬載將被 IntegrationWebExchangeBindException
(WebExchangeBindException
擴充)拒絕,其中包含所有驗證 Errors
。如需更多資訊,請參閱 Spring Framework 參考手冊 關於驗證的章節。
WebFlux 輸出元件
WebFluxRequestExecutingMessageHandler
(從 5.0 版開始)實作類似於 HttpRequestExecutingMessageHandler
。它使用 Spring Framework WebFlux 模組中的 WebClient
。若要設定它,請定義類似於下列的 bean
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="http://localhost/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="http://localhost/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient
exchange()
操作傳回 Mono<ClientResponse>
,它被映射(透過使用多個 Mono.map()
步驟)到 AbstractIntegrationMessageBuilder
作為 WebFluxRequestExecutingMessageHandler
的輸出。與作為 outputChannel
的 ReactiveChannel
一起,Mono<ClientResponse>
評估會延遲到進行下游訂閱為止。否則,它會被視為 async
模式,並且 Mono
回應會調整為 SettableListenableFuture
,以從 WebFluxRequestExecutingMessageHandler
進行非同步回覆。輸出訊息的目標酬載取決於 WebFluxRequestExecutingMessageHandler
設定。setExpectedResponseType(Class<?>)
或 setExpectedResponseTypeExpression(Expression)
識別回應主體元素轉換的目標類型。如果 replyPayloadToFlux
設定為 true
,則回應主體會轉換為 Flux
,其中包含為每個元素提供的 expectedResponseType
,並且此 Flux
會作為酬載向下游傳送。之後,您可以使用 分割器 以反應式方式迭代此 Flux
。
此外,可以將 BodyExtractor<?, ClientHttpResponse>
注入到 WebFluxRequestExecutingMessageHandler
中,而不是 expectedResponseType
和 replyPayloadToFlux
屬性。它可以用於低階存取 ClientHttpResponse
,並更好地控制主體和 HTTP 標頭轉換。Spring Integration 提供 ClientHttpResponseBodyExtractor
作為身分函數,以產生(下游)整個 ClientHttpResponse
和任何其他可能的自訂邏輯。
從 5.2 版開始,WebFluxRequestExecutingMessageHandler
支援反應式 Publisher
、Resource
和 MultiValueMap
類型作為請求訊息酬載。在內部使用各自的 BodyInserter
填充到 WebClient.RequestBodySpec
中。當酬載是反應式 Publisher
時,可以使用設定的 publisherElementType
或 publisherElementTypeExpression
來判斷發佈者元素類型的類型。運算式必須解析為 Class<?>
、String
(解析為目標 Class<?>
)或 ParameterizedTypeReference
。
從 5.5 版開始,WebFluxRequestExecutingMessageHandler
公開 extractResponseBody
旗標(預設為 true
),以僅傳回回應主體,或傳回整個 ResponseEntity
作為回覆訊息酬載,與提供的 expectedResponseType
或 replyPayloadToFlux
無關。如果 ResponseEntity
中不存在主體,則會忽略此旗標,並傳回整個 ResponseEntity
。
如需更多可能的設定選項,請參閱 HTTP 輸出元件。
WebFlux 標頭對應
由於 WebFlux 元件完全基於 HTTP 協定,因此 HTTP 標頭對應沒有差異。如需更多可能的選項和用於對應標頭的元件,請參閱 HTTP 標頭對應。
WebFlux 請求屬性
從 6.0 版開始,可以設定 WebFluxRequestExecutingMessageHandler
以透過 setAttributeVariablesExpression()
評估請求屬性。此 SpEL 運算式必須在 Map
中評估。然後,此映射會傳播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP 請求設定回呼。如果需要以鍵值物件形式傳遞資訊從 Message
到請求,並且下游篩選器將存取這些屬性以進行進一步處理,這將很有幫助。