Thread Barrier

有時,我們需要暫停訊息流執行緒,直到發生其他非同步事件。例如,考慮一個發布訊息到 RabbitMQ 的 HTTP 請求。我們可能希望在 RabbitMQ Broker 發出訊息已接收的確認訊息後,才回覆使用者。

在 4.2 版本中,Spring Integration 為了此目的引入了 <barrier/> 元件。底層的 MessageHandlerBarrierMessageHandler。此類別也實作了 MessageTriggerAction,其中傳遞給 trigger() 方法的訊息會在 handleRequestMessage() 方法(如果存在)中釋放對應的執行緒。

暫停的執行緒和觸發執行緒透過在訊息上調用 CorrelationStrategy 進行關聯。當訊息被發送到 input-channel 時,執行緒將被暫停最多 requestTimeout 毫秒,等待對應的觸發訊息。預設的關聯策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 標頭。當觸發訊息到達且具有相同的關聯時,執行緒將被釋放。釋放後發送到 output-channel 的訊息是透過使用 MessageGroupProcessor 建構的。預設情況下,訊息是兩個 payload 的 Collection<?>,並且標頭使用 DefaultAggregatingMessageGroupProcessor 進行合併。

如果先調用 trigger() 方法(或在主執行緒逾時後),則它將暫停最多 triggerTimeout,等待暫停訊息到達。如果您不想暫停觸發執行緒,請考慮移交給 TaskExecutor,以便暫停其執行緒。
在 5.4 之前的版本中,request 和 trigger 訊息只有一個 timeout 選項,但在某些使用案例中,最好為這些動作設定不同的逾時時間。因此,引入了 requestTimeouttriggerTimeout 選項。

requires-reply 屬性決定了如果暫停的執行緒在觸發訊息到達之前逾時時要採取的動作。預設情況下,它是 false,這表示端點返回 null,流程結束,並且執行緒返回到調用者。當為 true 時,將拋出 ReplyRequiredException

您可以程式化地調用 trigger() 方法(透過名稱取得 bean 參考,barrier.handler — 其中 barrier 是 barrier 端點的 bean 名稱)。或者,您可以配置 <outbound-channel-adapter/> 來觸發釋放。

只有一個執行緒可以與相同的關聯暫停。相同的關聯可以多次使用,但一次只能並行使用一次。如果第二個執行緒以相同的關聯到達,則會拋出例外。

以下範例示範如何使用自訂標頭進行關聯

  • Java

  • XML

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根據哪個訊息先到達,發送訊息到 in 的執行緒或發送訊息到 release 的執行緒都會等待最多十秒鐘,直到另一個訊息到達。當訊息被釋放時,out 通道會發送一個訊息,該訊息結合了調用名為 myOutputProcessor 的自訂 MessageGroupProcessor bean 的結果。如果主執行緒逾時且觸發器稍後到達,您可以配置一個丟棄通道,延遲的觸發器將被發送到該通道。如果請求訊息未及時到達,觸發訊息也會被丟棄。

有關此元件的範例,請參閱 barrier 範例應用程式