reactive() 端點

從 5.5 版本開始,ConsumerEndpointSpec 提供了一個 reactive() 設定屬性,以及一個可選的自訂器 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>。 此選項將目標端點配置為 ReactiveStreamsConsumer 實例,與輸入通道類型無關,輸入通道類型透過 IntegrationReactiveUtils.messageChannelToFlux() 轉換為 Flux。 提供的函數從 Flux.transform() 運算符使用,以自訂(publishOn()log()doOnNext() 等)來自輸入通道的反應式串流來源。

以下範例示範如何從輸入通道變更發布線程,與最終訂閱者和生產者到該 DirectChannel 無關

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}

有關更多資訊,請參閱 反應式串流支援