聚合器和重排序器
Aggregator
在概念上與 Splitter
相反。它將一系列個別訊息聚合為單一訊息,因此必然更複雜。預設情況下,聚合器會傳回一個訊息,其中包含來自傳入訊息的 Payload 集合。相同的規則適用於 Resequencer
。以下範例顯示了分割器-聚合器模式的典型範例
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
split()
方法將清單分割成個別訊息,並將它們傳送到 ExecutorChannel
。resequence()
方法依據訊息標頭中找到的順序詳細資訊重新排序訊息。aggregate()
方法收集這些訊息。
但是,您可以透過指定發佈策略和關聯策略等來變更預設行為。請考慮以下範例
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
先前的範例關聯具有 myCorrelationKey
標頭的訊息,並在累積至少十個訊息後發佈這些訊息。
resequence()
EIP 方法也提供類似的 Lambda 設定。