入站通道适配器
通常,訊息流從入站通道适配器(例如 <int-jdbc:inbound-channel-adapter>
)開始。适配器配置了 <poller>
,並要求 MessageSource<?>
定期產生訊息。Java DSL 也允許從 MessageSource<?>
啟動 IntegrationFlow
。为此,IntegrationFlow
流暢 API 提供了重載的 IntegrationFlow.from(MessageSource<?> messageSource)
方法。您可以將 MessageSource<?>
配置為 bean,並將其作為該方法的參數提供。IntegrationFlow.from()
的第二個參數是 Consumer<SourcePollingChannelAdapterSpec>
lambda,可讓您為 SourcePollingChannelAdapter
提供選項(例如 PollerMetadata
或 SmartLifecycle
)。以下範例示範如何使用流暢 API 和 lambda 來建立 IntegrationFlow
@Bean
public MessageSource<Object> jdbcMessageSource() {
return new JdbcPollingChannelAdapter(this.dataSource, "SELECT * FROM something");
}
@Bean
public IntegrationFlow pollingFlow() {
return IntegrationFlow.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1)))
.transform(Transformers.toJson())
.channel("furtherProcessChannel")
.get();
}
對於那些沒有直接建立 Message
物件要求的案例,您可以使用基於 java.util.function.Supplier
的 IntegrationFlow.fromSupplier()
變體。Supplier.get()
的結果會自動包裝在 Message
中(如果它還不是 Message
)。