嚴格訊息排序
本節描述輸入和輸出訊息的訊息排序。
輸入
如果您需要嚴格的輸入訊息排序,則必須將輸入監聽器容器的 prefetchCount
屬性設定為 1
。 這是因為,如果訊息失敗並重新傳遞,它會在現有的預先提取訊息之後到達。 自 Spring AMQP 2.0 版本起,prefetchCount
預設為 250
,以提高效能。 嚴格的排序要求會以降低效能為代價。
輸出
考慮以下整合流程
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假設我們將訊息 A
、B
和 C
發送到閘道器。 雖然訊息 A
、B
、C
很可能依序發送,但無法保證。 這是因為範本會為每個發送操作從快取中「借用」一個通道,並且無法保證每個訊息都使用相同的通道。 一個解決方案是在分割器之前啟動交易,但交易在 RabbitMQ 中成本很高,並且可能會將效能降低數百倍。
為了以更有效率的方式解決此問題,從 5.1 版開始,Spring Integration 提供了 BoundRabbitChannelAdvice
,它是 HandleMessageAdvice
。 請參閱 處理訊息 Advice。 當在分割器之前應用時,它可以確保所有下游操作都在同一個通道上執行,並且可以選擇性地等待直到收到所有已發送訊息的發布者確認(如果連線工廠已設定為確認)。 以下範例示範如何使用 BoundRabbitChannelAdvice
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
請注意,相同的 RabbitTemplate
(實作 RabbitOperations
)用於 advice 和輸出适配器中。 Advice 在範本的 invoke
方法中執行下游流程,以便所有操作都在同一個通道上執行。 如果提供了選用的逾時,當流程完成時,advice 會呼叫 waitForConfirmsOrDie
方法,如果在指定時間內未收到確認,則會擲回例外狀況。
下游流程中不得有執行緒交接 (thread hands-off) (QueueChannel 、ExecutorChannel 和其他)。 |