分散-收集

從 4.1 版本開始,Spring Integration 提供了 scatter-gather 企業整合模式的實作。它是一個複合端點,其目標是將訊息傳送給接收者並彙總結果。正如 Enterprise Integration Patterns 中所述,它是一個適用於「最佳報價」等情境的元件,在這些情境中,我們需要向多個供應商請求資訊,並決定哪一個供應商為我們提供的請求項目提供最佳條款。

先前,此模式可以透過使用離散元件來設定。此增強功能帶來更方便的設定。

ScatterGatherHandler 是一個請求-回覆端點,它結合了 PublishSubscribeChannel (或 RecipientListRouter) 和 AggregatingMessageHandler。請求訊息會傳送到 scatter 通道,而 ScatterGatherHandler 會等待彙總器傳送到 outputChannel 的回覆。

功能

分散-收集 模式建議兩種情境:「拍賣」和「分配」。在這兩種情況下,彙總 函數是相同的,並提供 AggregatingMessageHandler 的所有可用選項。(實際上,ScatterGatherHandler 只需要 AggregatingMessageHandler 作為建構子引數。) 有關更多資訊,請參閱 彙總器

拍賣

拍賣 分散-收集 變體對請求訊息使用「發布-訂閱」邏輯,其中「scatter」通道是 PublishSubscribeChannel,且 apply-sequence="true"。但是,此通道可以是任何 MessageChannel 實作 (如同 ContentEnricher 中的 request-channel 情況一樣 — 請參閱 內容豐富器)。但是,在這種情況下,您應該為 彙總 函數建立自己的自訂 correlationStrategy

分配

分配 分散-收集 變體基於 RecipientListRouter (請參閱 RecipientListRouter),並具有 RecipientListRouter 的所有可用選項。這是第二個 ScatterGatherHandler 建構子引數。如果您只想依賴 recipient-list-routeraggregator 的預設 correlationStrategy,則應指定 apply-sequence="true"。否則,您應該為 aggregator 提供自訂 correlationStrategy。與 PublishSubscribeChannel 變體 (拍賣變體) 不同,具有 recipient-list-router selector 選項可以根據訊息篩選目標供應商。使用 apply-sequence="true",會提供預設的 sequenceSize,並且 aggregator 可以正確釋放群組。分配選項與拍賣選項互斥。

applySequence=true 僅適用於基於 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 建構子設定的純 Java 設定,因為框架無法變更外部提供的元件。為了方便起見,分散-收集 的 XML 和 Java DSL 從 6.0 版本開始將 applySequence 設定為 true。

對於拍賣和分配變體,請求 (scatter) 訊息都使用 gatherResultChannel 標頭進行豐富,以等待來自 aggregator 的回覆訊息。

預設情況下,所有供應商都應將其結果傳送到 replyChannel 標頭 (通常是透過從最終端點省略 output-channel)。但是,也提供了 gatherChannel 選項,讓供應商將其回覆傳送到該通道以進行彙總。

設定分散-收集端點

以下範例顯示了 分散-收集 的 Bean 定義的 Java 設定

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

在前面的範例中,我們使用 applySequence="true" 和接收者通道列表設定了 RecipientListRouter distributor Bean。下一個 Bean 用於 AggregatingMessageHandler。最後,我們將這兩個 Bean 注入到 ScatterGatherHandler Bean 定義中,並將其標記為 @ServiceActivator,以將分散-收集元件連接到整合流程中。

以下範例顯示如何使用 XML 命名空間設定 <scatter-gather> 端點

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 端點的 ID。ScatterGatherHandler Bean 以 id + '.handler' 的別名註冊。RecipientListRouter Bean 以 id + '.scatterer' 的別名註冊。AggregatingMessageHandler Bean 以 id + '.gatherer' 的別名註冊。選用。(BeanFactory 會產生預設的 id 值。)
2 生命週期屬性,表示是否應在應用程式內容初始化期間啟動端點。此外,ScatterGatherHandler 也實作了 Lifecycle,並啟動和停止 gatherEndpoint,如果提供了 gather-channel,則會在內部建立該端點。選用。(預設值為 true。)
3 接收請求訊息以在 ScatterGatherHandler 中處理它們的通道。必填。
4 ScatterGatherHandler 將彙總結果傳送到的通道。選用。(傳入訊息可以在 replyChannel 訊息標頭中自行指定回覆通道。)
5 在拍賣情境中傳送 scatter 訊息的通道。選用。與 <scatterer> 子元素互斥。
6 接收來自每個供應商的回覆以進行彙總的通道。它在 scatter 訊息中用作 replyChannel 標頭。選用。預設情況下,會建立 FixedSubscriberChannel
7 當多個處理器訂閱同一個 DirectChannel 時,此元件的順序 (用於負載平衡目的)。選用。
8 指定應啟動和停止端點的階段。啟動順序從最低到最高,而關閉順序從最高到最低。預設情況下,此值為 Integer.MAX_VALUE,表示此容器會盡可能晚啟動並盡可能早停止。選用。
9 將回覆 Message 傳送到 output-channel 時要等待的逾時間隔。預設情況下,send() 會封鎖一秒鐘。它僅在輸出通道有一些「傳送」限制時適用 — 例如,具有固定「容量」且已滿的 QueueChannel。在這種情況下,會擲回 MessageDeliveryExceptionsend-timeout 對於 AbstractSubscribableChannel 實作會被忽略。對於 group-timeout(-expression),來自排定的過期任務的 MessageDeliveryException 會導致重新排定此任務。選用。
10 讓您指定分散-收集在傳回之前等待回覆訊息的時間長度。預設情況下,它會等待 30 秒。如果回覆逾時,則會傳回 'null'。選用。
11 指定分散-收集是否必須傳回非 null 值。此值預設為 true。因此,當基礎彙總器在 gather-timeout 後傳回 null 值時,會擲回 ReplyRequiredException。請注意,如果 null 是一種可能性,則應指定 gather-timeout 以避免無限期等待。
12 <recipient-list-router> 選項。選用。與 scatter-channel 屬性互斥。
13 <aggregator> 選項。必填。

錯誤處理

由於分散-收集是一個多請求-回覆元件,因此錯誤處理有一些額外的複雜性。在某些情況下,如果 ReleaseStrategy 允許流程以少於請求的回覆完成,則最好只捕獲並忽略下游異常。在其他情況下,當發生錯誤時,應考慮使用類似「補償訊息」的東西從子流程傳回。

每個非同步子流程都應使用 errorChannel 標頭進行設定,以便從 MessagePublishingErrorHandler 正確傳送錯誤訊息。否則,錯誤將會使用常見的錯誤處理邏輯傳送到全域 errorChannel。有關非同步錯誤處理的更多資訊,請參閱 錯誤處理

同步流程可以使用 ExpressionEvaluatingRequestHandlerAdvice 來忽略異常或傳回補償訊息。當從其中一個子流程向 ScatterGatherHandler 擲回異常時,它只會重新擲回上游。這樣,所有其他子流程都將白費力氣,並且它們的回覆將在 ScatterGatherHandler 中被忽略。這有時可能是預期的行為,但在大多數情況下,最好在特定的子流程中處理錯誤,而不會影響所有其他子流程和收集器中的預期。

從 5.1.3 版本開始,ScatterGatherHandler 提供了 errorChannelName 選項。它會填入 scatter 訊息的 errorChannel 標頭,並在發生非同步錯誤時使用,或者可以在常規同步子流程中使用,以直接傳送錯誤訊息。

下面的範例設定示範了透過傳回補償訊息來處理非同步錯誤

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

為了產生正確的回覆,我們必須從 MessagePublishingErrorHandler 傳送到 scatterGatherErrorChannelMessagingExceptionfailedMessage 中複製標頭 (包括 replyChannelerrorChannel)。這樣,目標異常就會傳回到 ScatterGatherHandler 的收集器,以完成回覆訊息群組。這樣的異常 payload 可以在收集器的 MessageGroupProcessor 中篩選掉,或者在分散-收集端點之後以其他方式在下游處理。

在將分散結果傳送到收集器之前,ScatterGatherHandler 會恢復請求訊息標頭,包括回覆和錯誤通道 (如果有的話)。這樣,即使在分散接收者子流程中應用了非同步交接,來自 AggregatingMessageHandler 的錯誤也將傳播給呼叫者。為了成功運作,gatherResultChanneloriginalReplyChanneloriginalErrorChannel 標頭必須傳回給來自 scatter 接收者子流程的回覆。在這種情況下,必須為 ScatterGatherHandler 設定合理的、有限的 gatherTimeout。否則,預設情況下,它將被封鎖,永遠等待來自收集器的回覆。