監聽器並行性

SimpleMessageListenerContainer

預設情況下,監聽器容器會啟動單一消費者,從佇列接收訊息。

檢視前一節的表格時,您可以看到許多控制並行性的屬性和特性。最簡單的是 `concurrentConsumers`,它會建立(固定)數量的消費者,同時處理訊息。

在 1.3.0 版本之前,這是唯一可用的設定,而且必須停止並重新啟動容器才能變更設定。

從 1.3.0 版本開始,您現在可以動態調整 `concurrentConsumers` 屬性。如果在容器執行時變更此屬性,則會根據需要新增或移除消費者,以調整為新設定。

此外,新增了一個名為 `maxConcurrentConsumers` 的新屬性,容器會根據工作負載動態調整並行性。這與四個額外屬性 `consecutiveActiveTrigger`、`startConsumerMinInterval`、`consecutiveIdleTrigger` 和 `stopConsumerMinInterval` 協同運作。使用預設設定,增加消費者的演算法如下:

如果尚未達到 `maxConcurrentConsumers`,且現有消費者已連續十個週期處於活動狀態,且自上次啟動消費者以來至少已過了 10 秒,則會啟動新的消費者。如果消費者在 `batchSize` * `receiveTimeout` 毫秒內收到至少一則訊息,則視為活動狀態。

使用預設設定,減少消費者的演算法如下:

如果運行的消費者數量超過 `concurrentConsumers`,且消費者偵測到連續十次逾時(閒置),且上次停止消費者至少在 60 秒前,則會停止消費者。逾時時間取決於 `receiveTimeout` 和 `batchSize` 屬性。如果消費者在 `batchSize` * `receiveTimeout` 毫秒內未收到任何訊息,則視為閒置狀態。因此,使用預設逾時時間(一秒)和 `batchSize` 為四,則在閒置時間 40 秒後(四次逾時對應於一次閒置偵測)才會考慮停止消費者。

實際上,只有在整個容器閒置一段時間後,才能停止消費者。這是因為 Broker 會在所有活動消費者之間共享其工作。

無論配置的佇列數量為何,每個消費者都使用單一通道。

從 2.0 版本開始,可以使用 `concurrency` 屬性設定 `concurrentConsumers` 和 `maxConcurrentConsumers` 屬性,例如 `2-4`。

使用 DirectMessageListenerContainer

使用此容器,並行性基於配置的佇列和 `consumersPerQueue`。每個佇列的每個消費者都使用單獨的通道,並且並行性由 Rabbit 用戶端程式庫控制。預設情況下,在撰寫本文時,它使用 `DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2` 個執行緒的執行緒池。

您可以配置 `taskExecutor` 以提供所需的最大並行性。