分散-收集
從 4.1 版本開始,Spring Integration 提供了 scatter-gather 企業整合模式的實作。它是一個複合端點,其目標是將訊息傳送給接收者並彙總結果。正如 Enterprise Integration Patterns 中所述,它是一個適用於「最佳報價」等情境的元件,在這些情境中,我們需要向多個供應商請求資訊,並決定哪一個供應商為我們提供的請求項目提供最佳條款。
先前,此模式可以透過使用離散元件來設定。此增強功能帶來更方便的設定。
ScatterGatherHandler
是一個請求-回覆端點,它結合了 PublishSubscribeChannel
(或 RecipientListRouter
) 和 AggregatingMessageHandler
。請求訊息會傳送到 scatter
通道,而 ScatterGatherHandler
會等待彙總器傳送到 outputChannel
的回覆。
功能
分散-收集
模式建議兩種情境:「拍賣」和「分配」。在這兩種情況下,彙總
函數是相同的,並提供 AggregatingMessageHandler
的所有可用選項。(實際上,ScatterGatherHandler
只需要 AggregatingMessageHandler
作為建構子引數。) 有關更多資訊,請參閱 彙總器。
拍賣
拍賣 分散-收集
變體對請求訊息使用「發布-訂閱」邏輯,其中「scatter」通道是 PublishSubscribeChannel
,且 apply-sequence="true"
。但是,此通道可以是任何 MessageChannel
實作 (如同 ContentEnricher
中的 request-channel
情況一樣 — 請參閱 內容豐富器)。但是,在這種情況下,您應該為 彙總
函數建立自己的自訂 correlationStrategy
。
分配
分配 分散-收集
變體基於 RecipientListRouter
(請參閱 RecipientListRouter
),並具有 RecipientListRouter
的所有可用選項。這是第二個 ScatterGatherHandler
建構子引數。如果您只想依賴 recipient-list-router
和 aggregator
的預設 correlationStrategy
,則應指定 apply-sequence="true"
。否則,您應該為 aggregator
提供自訂 correlationStrategy
。與 PublishSubscribeChannel
變體 (拍賣變體) 不同,具有 recipient-list-router
selector
選項可以根據訊息篩選目標供應商。使用 apply-sequence="true"
,會提供預設的 sequenceSize
,並且 aggregator
可以正確釋放群組。分配選項與拍賣選項互斥。
applySequence=true 僅適用於基於 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 建構子設定的純 Java 設定,因為框架無法變更外部提供的元件。為了方便起見,分散-收集 的 XML 和 Java DSL 從 6.0 版本開始將 applySequence 設定為 true。 |
對於拍賣和分配變體,請求 (scatter) 訊息都使用 gatherResultChannel
標頭進行豐富,以等待來自 aggregator
的回覆訊息。
預設情況下,所有供應商都應將其結果傳送到 replyChannel
標頭 (通常是透過從最終端點省略 output-channel
)。但是,也提供了 gatherChannel
選項,讓供應商將其回覆傳送到該通道以進行彙總。
設定分散-收集端點
以下範例顯示了 分散-收集
的 Bean 定義的 Java 設定
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的範例中,我們使用 applySequence="true"
和接收者通道列表設定了 RecipientListRouter
distributor
Bean。下一個 Bean 用於 AggregatingMessageHandler
。最後,我們將這兩個 Bean 注入到 ScatterGatherHandler
Bean 定義中,並將其標記為 @ServiceActivator
,以將分散-收集元件連接到整合流程中。
以下範例顯示如何使用 XML 命名空間設定 <scatter-gather>
端點
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
1 | 端點的 ID。ScatterGatherHandler Bean 以 id + '.handler' 的別名註冊。RecipientListRouter Bean 以 id + '.scatterer' 的別名註冊。AggregatingMessageHandler Bean 以 id + '.gatherer' 的別名註冊。選用。(BeanFactory 會產生預設的 id 值。) |
2 | 生命週期屬性,表示是否應在應用程式內容初始化期間啟動端點。此外,ScatterGatherHandler 也實作了 Lifecycle ,並啟動和停止 gatherEndpoint ,如果提供了 gather-channel ,則會在內部建立該端點。選用。(預設值為 true 。) |
3 | 接收請求訊息以在 ScatterGatherHandler 中處理它們的通道。必填。 |
4 | ScatterGatherHandler 將彙總結果傳送到的通道。選用。(傳入訊息可以在 replyChannel 訊息標頭中自行指定回覆通道。) |
5 | 在拍賣情境中傳送 scatter 訊息的通道。選用。與 <scatterer> 子元素互斥。 |
6 | 接收來自每個供應商的回覆以進行彙總的通道。它在 scatter 訊息中用作 replyChannel 標頭。選用。預設情況下,會建立 FixedSubscriberChannel 。 |
7 | 當多個處理器訂閱同一個 DirectChannel 時,此元件的順序 (用於負載平衡目的)。選用。 |
8 | 指定應啟動和停止端點的階段。啟動順序從最低到最高,而關閉順序從最高到最低。預設情況下,此值為 Integer.MAX_VALUE ,表示此容器會盡可能晚啟動並盡可能早停止。選用。 |
9 | 將回覆 Message 傳送到 output-channel 時要等待的逾時間隔。預設情況下,send() 會封鎖一秒鐘。它僅在輸出通道有一些「傳送」限制時適用 — 例如,具有固定「容量」且已滿的 QueueChannel 。在這種情況下,會擲回 MessageDeliveryException 。send-timeout 對於 AbstractSubscribableChannel 實作會被忽略。對於 group-timeout(-expression) ,來自排定的過期任務的 MessageDeliveryException 會導致重新排定此任務。選用。 |
10 | 讓您指定分散-收集在傳回之前等待回覆訊息的時間長度。預設情況下,它會等待 30 秒。如果回覆逾時,則會傳回 'null'。選用。 |
11 | 指定分散-收集是否必須傳回非 null 值。此值預設為 true 。因此,當基礎彙總器在 gather-timeout 後傳回 null 值時,會擲回 ReplyRequiredException 。請注意,如果 null 是一種可能性,則應指定 gather-timeout 以避免無限期等待。 |
12 | <recipient-list-router> 選項。選用。與 scatter-channel 屬性互斥。 |
13 | <aggregator> 選項。必填。 |
錯誤處理
由於分散-收集是一個多請求-回覆元件,因此錯誤處理有一些額外的複雜性。在某些情況下,如果 ReleaseStrategy
允許流程以少於請求的回覆完成,則最好只捕獲並忽略下游異常。在其他情況下,當發生錯誤時,應考慮使用類似「補償訊息」的東西從子流程傳回。
每個非同步子流程都應使用 errorChannel
標頭進行設定,以便從 MessagePublishingErrorHandler
正確傳送錯誤訊息。否則,錯誤將會使用常見的錯誤處理邏輯傳送到全域 errorChannel
。有關非同步錯誤處理的更多資訊,請參閱 錯誤處理。
同步流程可以使用 ExpressionEvaluatingRequestHandlerAdvice
來忽略異常或傳回補償訊息。當從其中一個子流程向 ScatterGatherHandler
擲回異常時,它只會重新擲回上游。這樣,所有其他子流程都將白費力氣,並且它們的回覆將在 ScatterGatherHandler
中被忽略。這有時可能是預期的行為,但在大多數情況下,最好在特定的子流程中處理錯誤,而不會影響所有其他子流程和收集器中的預期。
從 5.1.3 版本開始,ScatterGatherHandler
提供了 errorChannelName
選項。它會填入 scatter 訊息的 errorChannel
標頭,並在發生非同步錯誤時使用,或者可以在常規同步子流程中使用,以直接傳送錯誤訊息。
下面的範例設定示範了透過傳回補償訊息來處理非同步錯誤
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
為了產生正確的回覆,我們必須從 MessagePublishingErrorHandler
傳送到 scatterGatherErrorChannel
的 MessagingException
的 failedMessage
中複製標頭 (包括 replyChannel
和 errorChannel
)。這樣,目標異常就會傳回到 ScatterGatherHandler
的收集器,以完成回覆訊息群組。這樣的異常 payload
可以在收集器的 MessageGroupProcessor
中篩選掉,或者在分散-收集端點之後以其他方式在下游處理。
在將分散結果傳送到收集器之前,ScatterGatherHandler 會恢復請求訊息標頭,包括回覆和錯誤通道 (如果有的話)。這樣,即使在分散接收者子流程中應用了非同步交接,來自 AggregatingMessageHandler 的錯誤也將傳播給呼叫者。為了成功運作,gatherResultChannel 、originalReplyChannel 和 originalErrorChannel 標頭必須傳回給來自 scatter 接收者子流程的回覆。在這種情況下,必須為 ScatterGatherHandler 設定合理的、有限的 gatherTimeout 。否則,預設情況下,它將被封鎖,永遠等待來自收集器的回覆。 |