嚴格訊息排序

本節描述輸入和輸出訊息的訊息排序。

輸入

如果您需要嚴格的輸入訊息排序,則必須將輸入監聽器容器的 prefetchCount 屬性設定為 1。 這是因為,如果訊息失敗並重新傳遞,它會在現有的預先提取訊息之後到達。 自 Spring AMQP 2.0 版本起,prefetchCount 預設為 250,以提高效能。 嚴格的排序要求會以降低效能為代價。

輸出

考慮以下整合流程

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

假設我們將訊息 ABC 發送到閘道器。 雖然訊息 ABC 很可能依序發送,但無法保證。 這是因為範本會為每個發送操作從快取中「借用」一個通道,並且無法保證每個訊息都使用相同的通道。 一個解決方案是在分割器之前啟動交易,但交易在 RabbitMQ 中成本很高,並且可能會將效能降低數百倍。

為了以更有效率的方式解決此問題,從 5.1 版開始,Spring Integration 提供了 BoundRabbitChannelAdvice,它是 HandleMessageAdvice。 請參閱 處理訊息 Advice。 當在分割器之前應用時,它可以確保所有下游操作都在同一個通道上執行,並且可以選擇性地等待直到收到所有已發送訊息的發布者確認(如果連線工廠已設定為確認)。 以下範例示範如何使用 BoundRabbitChannelAdvice

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

請注意,相同的 RabbitTemplate(實作 RabbitOperations)用於 advice 和輸出适配器中。 Advice 在範本的 invoke 方法中執行下游流程,以便所有操作都在同一個通道上執行。 如果提供了選用的逾時,當流程完成時,advice 會呼叫 waitForConfirmsOrDie 方法,如果在指定時間內未收到確認,則會擲回例外狀況。

下游流程中不得有執行緒交接 (thread hands-off) (QueueChannelExecutorChannel 和其他)。