執行緒與非同步消費者

許多不同的執行緒與非同步消費者有關。

RabbitMQ Client 交付新訊息時,從 SimpleMessageListenerContainer 中設定的 TaskExecutor 來的執行緒會被用來調用 MessageListener。如果未設定,則會使用 SimpleAsyncTaskExecutor。如果您使用集區執行器,您需要確保集區大小足以處理已設定的並行。使用 DirectMessageListenerContainerMessageListener 會直接在 RabbitMQ Client 執行緒上調用。在這種情況下,taskExecutor 用於監視消費者的任務。

當使用預設的 SimpleAsyncTaskExecutor 時,對於在其上調用監聽器的執行緒,監聽器容器 beanName 會用於 threadNamePrefix 中。這對於日誌分析很有用。我們通常建議始終在記錄附加程式組態中包含執行緒名稱。當透過容器上的 taskExecutor 屬性明確提供 TaskExecutor 時,將按原樣使用,不做修改。建議您使用類似的技術來命名自訂 TaskExecutor Bean 定義所建立的執行緒,以協助在日誌訊息中識別執行緒。

在建立連線時,CachingConnectionFactory 中設定的 Executor 會傳遞到 RabbitMQ Client,並且其執行緒會用於將新訊息交付到監聽器容器。如果未設定,用戶端會使用內部執行緒集區執行器,在撰寫本文時,每個連線的集區大小為 Runtime.getRuntime().availableProcessors() * 2

如果您有大量的工廠或正在使用 CacheMode.CONNECTION,您可能希望考慮使用共用的 ThreadPoolTaskExecutor,其具有足夠的執行緒來滿足您的工作負載。

使用 DirectMessageListenerContainer,您需要確保連線工廠設定了具有足夠執行緒的任務執行器,以支援您在所有使用該工廠的監聽器容器中所需的並行。預設集區大小 (在撰寫本文時) 為 Runtime.getRuntime().availableProcessors() * 2

RabbitMQ client 使用 ThreadFactory 來建立用於低階 I/O (socket) 操作的執行緒。若要修改此工廠,您需要設定底層的 RabbitMQ ConnectionFactory,如 設定底層用戶端連線工廠 中所述。