聚合器和重排序器

Aggregator 在概念上與 Splitter 相反。它將一系列個別訊息聚合為單一訊息,因此必然更複雜。預設情況下,聚合器會傳回一個訊息,其中包含來自傳入訊息的 Payload 集合。相同的規則適用於 Resequencer。以下範例顯示了分割器-聚合器模式的典型範例

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split() 方法將清單分割成個別訊息,並將它們傳送到 ExecutorChannelresequence() 方法依據訊息標頭中找到的順序詳細資訊重新排序訊息。aggregate() 方法收集這些訊息。

但是,您可以透過指定發佈策略和關聯策略等來變更預設行為。請考慮以下範例

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

先前的範例關聯具有 myCorrelationKey 標頭的訊息,並在累積至少十個訊息後發佈這些訊息。

resequence() EIP 方法也提供類似的 Lambda 設定。