反應式串流支援

Spring Integration 在框架的某些地方和不同方面提供對 反應式串流 互動的支援。我們將在此處討論其中大部分,並在必要時提供目標章節的適當連結以了解詳細資訊。

前言

總而言之,Spring Integration 擴展了 Spring 程式設計模型,以支援知名的企業整合模式。Spring Integration 在基於 Spring 的應用程式中實現輕量級訊息傳遞,並透過宣告式配接器支援與外部系統的整合。Spring Integration 的主要目標是提供一個簡單的模型來建構企業整合解決方案,同時保持關注點分離,這對於產生可維護、可測試的程式碼至關重要。此目標在目標應用程式中使用一流的元件(如 messagechannelendpoint)來實現,這些元件允許我們建構整合流程(管線),其中(在大多數情況下)一個端點將訊息產生到通道中,以供另一個端點使用。透過這種方式,我們將整合互動模型與目標業務邏輯區分開來。這裡的關鍵部分是中間的通道:流程行為取決於其實作,而端點保持不變。

另一方面,反應式串流是非同步串流處理的標準,具有非阻塞背壓。反應式串流的主要目標是管理跨非同步邊界(例如,將元素傳遞到另一個執行緒或執行緒池)的串流資料交換,同時確保接收端不會被迫緩衝任意數量資料。換句話說,背壓是此模型不可或缺的一部分,以便允許執行緒之間的中介佇列受到限制。反應式串流實作(例如 Project Reactor)的意圖是在串流應用程式的整個處理圖中保留這些優點和特性。反應式串流程式庫的最終目標是以盡可能透明和平滑的方式,為目標應用程式提供型別、運算子集合和支援 API,這與可用的程式語言結構一樣,但最終解決方案不像正常的函數鏈調用那樣命令式。它分為兩個階段:定義和執行,執行發生在稍後訂閱最終反應式發布者的期間,並且資料需求從定義的底部推送到頂部,並根據需要應用背壓 - 我們請求盡可能多的事件,以便我們目前可以處理。反應式應用程式看起來像一個 "stream",或者像我們在 Spring Integration 術語中習慣的那樣 - "flow"。事實上,自 Java 9 以來,反應式串流 SPI 已在 java.util.concurrent.Flow 類別中呈現。

從這裡來看,當我們在端點上應用一些反應式框架運算子時,Spring Integration 流程似乎非常適合編寫反應式串流應用程式,但實際上問題要廣泛得多,我們需要記住,並非所有端點(例如 JdbcMessageHandler)都可以以反應式串流透明地處理。當然,Spring Integration 中反應式串流支援的主要目標是允許整個流程完全反應式、按需啟動和準備好背壓。在目標協定和通道配接器的系統提供反應式串流互動模型之前,這是不可能實現的。在下面的章節中,我們將描述 Spring Integration 中為開發反應式應用程式而提供的組件和方法,以保留整合流程結構。

Spring Integration 中的所有反應式串流互動均使用 Project Reactor 型別(例如 MonoFlux)實作。

訊息傳遞閘道器

與反應式串流互動的最簡單點是 @MessagingGateway,我們只需將閘道器方法的返回型別設為 Mono<?> - 並且當訂閱發生在返回的 Mono 實例上時,將執行閘道器方法調用背後的整個整合流程。請參閱 Reactor Mono 以取得更多資訊。類似的 Mono-回覆方法在框架內部用於完全基於反應式串流相容協定的輸入閘道器(請參閱下面的 反應式通道配接器 以取得更多資訊)。發送和接收操作封裝在 Mono.deffer() 中,並從 replyChannel 標頭鏈接回覆評估(如果可用)。透過這種方式,特定反應式協定(例如 Netty)的輸入元件將作為訂閱者和 Spring Integration 上執行的反應式流程的啟動器。如果請求 Payload 是反應式型別,則最好在反應式串流定義中處理它,將流程延遲到啟動器訂閱。為此,處理器方法也必須返回反應式型別。請參閱下一節以取得更多資訊。

反應式回覆 Payload

當產生回覆的 MessageHandler 為回覆訊息返回反應式型別 Payload 時,它會以非同步方式處理,並為 outputChannel 提供常規 MessageChannel 實作(async 必須設為 true),並且當輸出通道是 ReactiveStreamsSubscribableChannel 實作(例如 FluxMessageChannel)時,會按需訂閱展平。對於標準命令式 MessageChannel 用例,並且如果回覆 Payload 是多值發布者(請參閱 ReactiveAdapter.isMultiValue() 以取得更多資訊),則它會封裝到 Mono.just() 中。因此,Mono 必須在下游顯式訂閱或由下游的 FluxMessageChannel 展平。對於 outputChannelReactiveStreamsSubscribableChannel,無需擔心返回型別和訂閱;一切都由框架在內部順利處理。

請參閱 非同步服務啟動器 以取得更多資訊。

另請參閱 Kotlin 協程 以取得更多資訊。

FluxMessageChannelReactiveStreamsConsumer

FluxMessageChannelMessageChannelPublisher<Message<?>> 的組合實作。在內部為從 send() 實作接收的訊息建立 Flux 作為熱源。Publisher.subscribe() 實作委派給該內部 Flux。此外,對於按需上游使用,FluxMessageChannelReactiveStreamsSubscribableChannel 契約提供實作。為此通道提供的任何上游 Publisher(例如,請參閱下面的來源輪詢通道配接器和分割器)在準備好訂閱此通道時會自動訂閱。來自此委派發布者的事件會沉入上述內部 Flux 中。

FluxMessageChannel 的消費者必須是 org.reactivestreams.Subscriber 實例,以遵守反應式串流契約。幸運的是,Spring Integration 中的所有 MessageHandler 實作也實作了來自 project Reactor 的 CoreSubscriber。並且由於中間的 ReactiveStreamsConsumer 實作,整個整合流程設定對於目標開發人員來說是透明的。在這種情況下,流程行為從命令式推送模型變更為反應式拉取模型。ReactiveStreamsConsumer 也可用於使用 IntegrationReactiveUtils 將任何 MessageChannel 轉換為反應式來源,從而使整合流程部分反應式。

請參閱 FluxMessageChannel 以取得更多資訊。

從 5.5 版開始,ConsumerEndpointSpec 引入了 reactive() 選項,使流程中的端點作為 ReactiveStreamsConsumer,而與輸入通道無關。可以提供選用的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>,以透過 Flux.transform() 操作(例如,使用 publishOn()doOnNext()retry() 等)自訂來自輸入通道的來源 Flux。此功能表示為所有訊息傳遞註解(@ServiceActivator@Splitter 等)的 @Reactive 子註解,透過它們的 reactive() 屬性。

來源輪詢通道配接器

通常,SourcePollingChannelAdapter 依賴於由 TaskScheduler 啟動的任務。輪詢觸發器是從提供的選項建構的,並用於定期排程任務以輪詢資料或事件的目標來源。當 outputChannelReactiveStreamsSubscribableChannel 時,相同的 Trigger 用於確定下一次執行時間,但 SourcePollingChannelAdapter 不是排程任務,而是基於 Flux.generate()nextExecutionTime 值和 Mono.delay() 為前一個步驟的持續時間建立 Flux<Message<?>>。然後使用 Flux.flatMapMany() 輪詢 maxMessagesPerPoll 並將它們沉入輸出 Flux 中。此產生器 Flux 由提供的 ReactiveStreamsSubscribableChannel 訂閱,以遵守下游背壓。從 5.5 版開始,當 maxMessagesPerPoll == 0 時,根本不會調用來源,並且 flatMapMany() 會透過 Mono.empty() 結果立即完成,直到稍後時間將 maxMessagesPerPoll 變更為非零值,例如透過控制匯流排。透過這種方式,任何 MessageSource 實作都可以轉換為反應式熱源。

請參閱 輪詢消費者 以取得更多資訊。

事件驅動通道配接器

MessageProducerSupport 是事件驅動通道配接器的基類,通常,其 sendMessage(Message<?>) 用作產生驅動程式 API 中的接聽器回呼。當訊息產生者實作建構訊息 Flux 而不是基於接聽器的功能時,此回呼也可以輕鬆插入 doOnNext() Reactor 運算子中。事實上,當訊息產生者的 outputChannel 不是 ReactiveStreamsSubscribableChannel 時,這會在框架中完成。但是,為了改善最終使用者體驗並允許更多準備好背壓的功能,MessageProducerSupport 提供 subscribeToPublisher(Publisher<? extends Message<?>>) API,以在目標實作中使用,當 Publisher<Message<?>>> 是來自目標系統的資料來源時。通常,當為來源資料的 Publisher 調用目標驅動程式 API 時,會從 doStart() 實作中使用它。建議將反應式 MessageProducerSupport 實作與 FluxMessageChannel 結合作為 outputChannel,以便按需訂閱和下游事件使用。當取消訂閱 Publisher 時,通道配接器會進入停止狀態。在此類通道配接器上調用 stop() 會完成從來源 Publisher 的產生。可以使用自動訂閱新建立的來源 Publisher 重新啟動通道配接器。

訊息來源到反應式串流

從 5.3 版開始,提供了 ReactiveMessageSourceProducer。它是提供的 MessageSource 和事件驅動產生到設定的 outputChannel 中的組合。在內部,它將 MessageSource 包裝到重複重新訂閱的 Mono 中,產生 Flux<Message<?>> 以在上述 subscribeToPublisher(Publisher<? extends Message<?>>) 中訂閱。此 Mono 的訂閱是使用 Schedulers.boundedElastic() 完成的,以避免目標 MessageSource 中可能發生的阻塞。當訊息來源返回 null(沒有資料可拉取)時,Mono 會轉換為 repeatWhenEmpty() 狀態,並根據訂閱者內容中的 IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration 條目延遲後續重新訂閱。預設情況下,它是 1 秒。如果訊息來源在標頭中產生具有 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 資訊的訊息,則會在原始 MonodoOnSuccess() 中確認(如果需要),如果在下游流程中擲回帶有失敗訊息以拒絕的 MessagingException,則會在 doOnError() 中拒絕。當輪詢通道配接器的功能應轉換為任何現有 MessageSource<?> 實作的反應式、按需解決方案時,可以使用此 ReactiveMessageSourceProducer 於任何用例。

分割器和聚合器

AbstractMessageSplitter 取得其邏輯的 Publisher 時,流程自然會在 Publisher 中的項目上進行,以將它們對應到要發送到 outputChannel 的訊息。如果此通道是 ReactiveStreamsSubscribableChannel,則 PublisherFlux 包裝器會從該通道按需訂閱,並且此分割器行為看起來更像 flatMap Reactor 運算子,當我們將輸入事件對應到多值輸出 Publisher 時。當整個整合流程是使用分割器之前和之後的 FluxMessageChannel 建構時,這最有意義,使 Spring Integration 設定與反應式串流需求及其事件處理運算子對齊。使用常規通道,Publisher 會轉換為 Iterable 以進行標準迭代和產生分割邏輯。

FluxAggregatorMessageHandler 是特定反應式串流邏輯實作的另一個範例,可以將其視為 Project Reactor 術語中的 "reactive operator"。它基於 Flux.groupBy()Flux.window()(或 buffer())運算子。輸入訊息會沉入在建立 FluxAggregatorMessageHandler 時啟動的 Flux.create() 中,使其成為熱源。此 Flux 會根據需求由 ReactiveStreamsSubscribableChannel 訂閱,或者在 outputChannel 不是反應式時直接在 FluxAggregatorMessageHandler.start() 中訂閱。當整個整合流程是使用此組件之前和之後的 FluxMessageChannel 建構時,此 MessageHandler 具有其強大的功能,使整個邏輯準備好背壓。

請參閱 串流和 Flux 分割Flux 聚合器 以取得更多資訊。

Java DSL

Java DSL 中的 IntegrationFlow 可以從任何 Publisher 實例開始(請參閱 IntegrationFlow.from(Publisher<Message<T>>))。此外,使用 IntegrationFlowBuilder.toReactivePublisher() 運算子,IntegrationFlow 可以轉換為反應式熱源。FluxMessageChannel 在這兩種情況下都在內部使用;它可以根據其 ReactiveStreamsSubscribableChannel 契約訂閱輸入 Publisher,並且它本身是下游訂閱者的 Publisher<Message<?>>。透過動態 IntegrationFlow 註冊,我們可以實作強大的邏輯,將反應式串流與此整合流程結合,以橋接往/返 Publisher

從 5.5.6 版開始,提供了 toReactivePublisher(boolean autoStartOnSubscribe) 運算子變體,以控制返回的 Publisher<Message<?>> 背後的整個 IntegrationFlow 的生命週期。通常,反應式發布者的訂閱和使用發生在稍後的執行階段階段,而不是在反應式串流組合期間,甚至在 ApplicationContext 啟動期間。為了避免在 Publisher<Message<?>> 訂閱點進行 IntegrationFlow 的生命週期管理的樣板程式碼,並為了更好的最終使用者體驗,引入了具有 autoStartOnSubscribe 標誌的這個新運算子。它將 IntegrationFlow 及其組件標記為 autoStartup = false(如果為 true),因此 ApplicationContext 不會自動啟動流程中訊息的產生和使用。相反,IntegrationFlowstart() 是從內部 Flux.doOnSubscribe() 啟動的。獨立於 autoStartOnSubscribe 值,流程會從 Flux.doOnCancel()Flux.doOnTerminate() 停止 - 如果沒有任何東西可以使用它們,則產生訊息是沒有意義的。

對於完全相反的用例,當 IntegrationFlow 應調用反應式串流並在完成後繼續時,IntegrationFlowDefinition 中提供了 fluxTransform() 運算子。此時的流程會轉換為 FluxMessageChannel,它會傳播到提供的 fluxFunction 中,並在 Flux.transform() 運算子中執行。函數的結果會封裝到 Mono<Message<?>> 中,以平面對應到由另一個 FluxMessageChannel 訂閱的輸出 Flux 以進行下游流程。

請參閱 Java DSL 章節 以取得更多資訊。

ReactiveMessageHandler

從 5.3 版開始,框架原生支援 ReactiveMessageHandler。此型別的訊息處理器專為反應式用戶端而設計,這些用戶端返回反應式型別,以便按需訂閱低階操作執行,並且不提供任何回覆資料以繼續反應式串流組合。當 ReactiveMessageHandler 用於命令式整合流程時,handleMessage() 結果會在返回後立即訂閱,只是因為在這樣的流程中沒有反應式串流組合來遵守背壓。在這種情況下,框架會將此 ReactiveMessageHandler 包裝到 ReactiveMessageHandlerAdapter 中 - MessageHandler 的簡單實作。但是,當流程中涉及 ReactiveStreamsConsumer 時(例如,當要使用的通道是 FluxMessageChannel 時),此類 ReactiveMessageHandler 會與 flatMap() Reactor 運算子組合到整個反應式串流,以在使用期間遵守背壓。

開箱即用的 ReactiveMessageHandler 實作之一是用於輸出通道配接器的 ReactiveMongoDbStoringMessageHandler。請參閱 MongoDB 反應式通道配接器 以取得更多資訊。

從 6.1 版開始,IntegrationFlowDefinition 公開了一個方便的 handleReactive(ReactiveMessageHandler) 終端運算子。任何 ReactiveMessageHandler 實作(甚至只是使用 Mono API 的簡單 Lambda)都可以用於此運算子。框架會自動訂閱返回的 Mono<Void>。以下是此運算子的可能設定的簡單範例

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

此運算子的多載版本接受 Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>,以自訂圍繞提供的 ReactiveMessageHandler 的消費者端點。

此外,也提供了基於 ReactiveMessageHandlerSpec 的變體。在大多數情況下,它們用於協定特定的通道配接器實作。請參閱下一節,其中包含目標技術的連結以及各自的反應式通道配接器。

反應式通道配接器

當整合的目標協定提供反應式串流解決方案時,在 Spring Integration 中實作通道配接器會變得非常簡單。

入站、事件驅動的通道适配器實作是關於將請求(如有必要)封裝到延遲的 MonoFlux 中,並且僅在協議組件啟動對從監聽器方法返回的 Mono 的訂閱時才執行發送(並產生回覆,如果有的話)。 這樣,我們就能在這個組件中精確地封裝一個響應式流解決方案。 當然,訂閱輸出通道的下游集成流程應遵循響應式流規範,並以按需、具備背壓能力的方式執行。

在集成流程中使用的 MessageHandler 處理器的本質(或目前的實作)並非總是能做到這一點。 當沒有響應式實作時,可以使用線程池和隊列或 FluxMessageChannel(見上文)來處理此限制,在集成端點之前和之後。

響應式事件驅動入站通道适配器的範例

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

用法如下所示

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

或以宣告式方式

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

或者即使沒有通道适配器,我們也始終可以透過以下方式使用 Java DSL

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

響應式出站通道适配器實作是關於啟動(或延續)響應式流,以根據目標協議提供的響應式 API 與外部系統互動。 入站負載本身可以是響應式類型,或者作為整個集成流程的事件,而整個集成流程是頂層響應式流的一部分。 如果我們處於單向、即發即忘的場景中,則可以立即訂閱返回的響應式類型;或者,它可以向下游傳播(請求-回覆場景),以進行進一步的集成流程或目標業務邏輯中的顯式訂閱,但仍然是下游保留響應式流語義。

響應式出站通道适配器的範例

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

我們將能夠同時使用這兩種通道适配器

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

目前,Spring Integration 為 WebFluxRSocketMongoDbR2DBCZeroMQGraphQLApache Cassandra 提供了通道适配器(或閘道器)實作。 Redis Stream 通道适配器也是響應式的,並且使用來自 Spring Data 的 ReactiveStreamOperations。 更多響應式通道适配器即將推出,例如基於來自 Spring for Apache KafkaReactiveKafkaProducerTemplateReactiveKafkaConsumerTemplateKafka 中的 Apache Kafka 适配器等。 對於許多其他非響應式通道适配器,建議使用線程池以避免在響應式流處理期間發生阻塞。

從響應式到命令式上下文傳播

Context Propagation 庫在類路徑上時,Project Reactor 可以獲取 ThreadLocal 值(例如 Micrometer ObservationSecurityContextHolder)並將它們存儲到 Subscriber 上下文中。 當我們需要為追蹤填充日誌 MDC 或讓從響應式流中調用的服務從作用域恢復觀察時,也可以進行相反的操作。 有關上下文傳播的更多資訊,請參閱 Project Reactor 文件中關於其用於上下文傳播的特殊運算符。 如果我們的整個解決方案是單個響應式流組合,則存儲和恢復上下文可以順利進行,因為 Subscriber 上下文從下游到組合的開始處(FluxMono)都是可見的。 但是,如果應用程式在不同的 Flux 實例之間切換或切換到命令式處理再切換回來,則綁定到 Subscriber 的上下文可能不可用。 對於這種使用情境,Spring Integration 提供了一個額外功能(從 6.0.5 版本開始),用於將 Reactor ContextView 存儲到從響應式流產生的 IntegrationMessageHeaderAccessor.REACTOR_CONTEXT 消息頭中,例如,當我們執行直接 send() 操作時。 然後在 FluxMessageChannel.subscribeTo() 中使用此標頭來恢復此通道將要發出的 Message 的 Reactor 上下文。 目前,此標頭是從 WebFluxInboundEndpointRSocketInboundGateway 組件填充的,但可以用於執行從響應式到命令式集成的任何解決方案中。 填充此標頭的邏輯如下所示

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

請注意,我們仍然需要使用 handle() 運算符,以使 Reactor 從上下文中恢復 ThreadLocal 值。 即使它是作為標頭發送的,框架也無法假設它是否將恢復到下游的 ThreadLocal 值。

若要從另一個 FluxMono 組合上的 Message 中恢復上下文,可以執行以下邏輯

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));