WebFlux 支援

WebFlux Spring Integration 模組 (spring-integration-webflux) 允許以反應式方式執行 HTTP 請求和處理輸入 HTTP 請求。

您需要將此相依性包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.3.5"

在非基於 Servlet 的伺服器設定中,必須包含 io.projectreactor.netty:reactor-netty 相依性。

WebFlux 支援包含下列閘道器實作:WebFluxInboundEndpointWebFluxRequestExecutingMessageHandler。此支援完全基於 Spring WebFluxProject Reactor 基礎。如需更多資訊,請參閱 HTTP 支援,因為許多選項在反應式和常規 HTTP 元件之間是共用的。

WebFlux 命名空間支援

Spring Integration 提供 webflux 命名空間和對應的結構描述定義。若要將其包含在您的設定中,請在您的應用程式內容設定檔中新增下列命名空間宣告

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

WebFlux 輸入元件

從 5.0 版開始,提供 WebHandlerWebFluxInboundEndpoint 實作。此元件類似於基於 MVC 的 HttpRequestHandlingEndpointSupport,它透過新提取的 BaseHttpInboundEndpoint 與其共用一些常見選項。它用於 Spring WebFlux 反應式環境中(而不是 MVC)。下列範例顯示 WebFlux 端點的簡單實作

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}
@Bean
fun inboundChannelAdapterFlow() =
    integrationFlow(
        WebFlux.inboundChannelAdapter("/reactivePost")
            .apply {
                requestMapping { m -> m.methods(HttpMethod.POST) }
                requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
                statusCodeFunction { m -> HttpStatus.ACCEPTED }
            })
    {
        channel { queue("storeChannel") }
    }
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
    <int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>

此設定類似於 HttpRequestHandlingEndpointSupport(在範例之前提到),除了我們使用 @EnableWebFlux 將 WebFlux 基礎架構新增至我們的整合應用程式。此外,WebFluxInboundEndpoint 透過使用背壓、基於需求的容量(由反應式 HTTP 伺服器實作提供)對下游流程執行 sendAndReceive 操作。

回覆部分也是非封鎖的,並且基於內部 FutureReplyChannel,它被平面映射到回覆 Mono 以進行基於需求的分辨率。

您可以使用自訂 ServerCodecConfigurerRequestedContentTypeResolver 甚至 ReactiveAdapterRegistry 來設定 WebFluxInboundEndpoint。後者提供了一種機制,您可以使用該機制將回覆作為任何反應式類型傳回:Reactor Flux、RxJava ObservableFlowable 和其他類型。這樣,我們可以使用 Spring Integration 元件實作 伺服器發送事件 情境,如下列範例所示

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlow
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}
@Bean
fun sseFlow() =
     integrationFlow(
            WebFlux.inboundGateway("/sse")
                       .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            {
                 handle { (p, h) -> Flux.just("foo", "bar", "baz") }
            }
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/sse");
    requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannelName("requests");
    return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                               path="test1"
                               auto-startup="false"
                               phase="101"
                               request-payload-type="byte[]"
                               error-channel="errorChannel"
                               payload-expression="payload"
                               supported-methods="PUT"
                               status-code-expression="'202'"
                               header-mapper="headerMapper"
                               codec-configurer="codecConfigurer"
                               reactive-adapter-registry="reactiveAdapterRegistry"
                               requested-content-type-resolver="requestedContentTypeResolver">
            <int-webflux:request-mapping headers="foo"/>
            <int-webflux:cross-origin origin="foo" method="PUT"/>
            <int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>

如需更多可能的設定選項,請參閱 請求對應支援跨來源資源共享 (CORS) 支援

當請求主體為空或 payloadExpression 傳回 null 時,請求參數 (MultiValueMap<String, String>) 用於要處理的目標訊息的 payload

酬載驗證

從 5.2 版開始,可以使用 Validator 設定 WebFluxInboundEndpoint。與 HTTP 支援 中的 MVC 驗證不同,它用於驗證 Publisher 中的元素,請求已由 HttpMessageReader 轉換為 Publisher,然後執行回退和 payloadExpression 函數。架構無法假設在建立最終酬載後 Publisher 物件可能有多複雜。如果需要限制最終酬載(或其 Publisher 元素)的驗證可見性,則驗證應在下游而不是 WebFlux 端點進行。如需更多資訊,請參閱 Spring WebFlux 文件。無效的酬載將被 IntegrationWebExchangeBindExceptionWebExchangeBindException 擴充)拒絕,其中包含所有驗證 Errors。如需更多資訊,請參閱 Spring Framework 參考手冊 關於驗證的章節。

WebFlux 輸出元件

WebFluxRequestExecutingMessageHandler(從 5.0 版開始)實作類似於 HttpRequestExecutingMessageHandler。它使用 Spring Framework WebFlux 模組中的 WebClient。若要設定它,請定義類似於下列的 bean

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
    integrationFlow {
        handle(
            WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                    .queryParams(m.getPayload())
                    .build()
                    .toUri()
            })
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String::class.java)
        )
    }
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

WebClient exchange() 操作傳回 Mono<ClientResponse>,它被映射(透過使用多個 Mono.map() 步驟)到 AbstractIntegrationMessageBuilder 作為 WebFluxRequestExecutingMessageHandler 的輸出。與作為 outputChannelReactiveChannel 一起,Mono<ClientResponse> 評估會延遲到進行下游訂閱為止。否則,它會被視為 async 模式,並且 Mono 回應會調整為 SettableListenableFuture,以從 WebFluxRequestExecutingMessageHandler 進行非同步回覆。輸出訊息的目標酬載取決於 WebFluxRequestExecutingMessageHandler 設定。setExpectedResponseType(Class<?>)setExpectedResponseTypeExpression(Expression) 識別回應主體元素轉換的目標類型。如果 replyPayloadToFlux 設定為 true,則回應主體會轉換為 Flux,其中包含為每個元素提供的 expectedResponseType,並且此 Flux 會作為酬載向下游傳送。之後,您可以使用 分割器 以反應式方式迭代此 Flux

此外,可以將 BodyExtractor<?, ClientHttpResponse> 注入到 WebFluxRequestExecutingMessageHandler 中,而不是 expectedResponseTypereplyPayloadToFlux 屬性。它可以用於低階存取 ClientHttpResponse,並更好地控制主體和 HTTP 標頭轉換。Spring Integration 提供 ClientHttpResponseBodyExtractor 作為身分函數,以產生(下游)整個 ClientHttpResponse 和任何其他可能的自訂邏輯。

從 5.2 版開始,WebFluxRequestExecutingMessageHandler 支援反應式 PublisherResourceMultiValueMap 類型作為請求訊息酬載。在內部使用各自的 BodyInserter 填充到 WebClient.RequestBodySpec 中。當酬載是反應式 Publisher 時,可以使用設定的 publisherElementTypepublisherElementTypeExpression 來判斷發佈者元素類型的類型。運算式必須解析為 Class<?>String(解析為目標 Class<?>)或 ParameterizedTypeReference

從 5.5 版開始,WebFluxRequestExecutingMessageHandler 公開 extractResponseBody 旗標(預設為 true),以僅傳回回應主體,或傳回整個 ResponseEntity 作為回覆訊息酬載,與提供的 expectedResponseTypereplyPayloadToFlux 無關。如果 ResponseEntity 中不存在主體,則會忽略此旗標,並傳回整個 ResponseEntity

如需更多可能的設定選項,請參閱 HTTP 輸出元件

WebFlux 標頭對應

由於 WebFlux 元件完全基於 HTTP 協定,因此 HTTP 標頭對應沒有差異。如需更多可能的選項和用於對應標頭的元件,請參閱 HTTP 標頭對應

WebFlux 請求屬性

從 6.0 版開始,可以設定 WebFluxRequestExecutingMessageHandler 以透過 setAttributeVariablesExpression() 評估請求屬性。此 SpEL 運算式必須在 Map 中評估。然後,此映射會傳播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer) HTTP 請求設定回呼。如果需要以鍵值物件形式傳遞資訊從 Message 到請求,並且下游篩選器將存取這些屬性以進行進一步處理,這將很有幫助。