reactive()
端點
從 5.5 版本開始,ConsumerEndpointSpec
提供了一個 reactive()
設定屬性,以及一個可選的自訂器 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
。 此選項將目標端點配置為 ReactiveStreamsConsumer
實例,與輸入通道類型無關,輸入通道類型透過 IntegrationReactiveUtils.messageChannelToFlux()
轉換為 Flux
。 提供的函數從 Flux.transform()
運算符使用,以自訂(publishOn()
、log()
、doOnNext()
等)來自輸入通道的反應式串流來源。
以下範例示範如何從輸入通道變更發布線程,與最終訂閱者和生產者到該 DirectChannel
無關
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
有關更多資訊,請參閱 反應式串流支援。