執行緒與非同步消費者
許多不同的執行緒與非同步消費者有關。
在 RabbitMQ Client
交付新訊息時,從 SimpleMessageListenerContainer
中設定的 TaskExecutor
來的執行緒會被用來調用 MessageListener
。如果未設定,則會使用 SimpleAsyncTaskExecutor
。如果您使用集區執行器,您需要確保集區大小足以處理已設定的並行。使用 DirectMessageListenerContainer
,MessageListener
會直接在 RabbitMQ Client
執行緒上調用。在這種情況下,taskExecutor
用於監視消費者的任務。
當使用預設的 SimpleAsyncTaskExecutor 時,對於在其上調用監聽器的執行緒,監聽器容器 beanName 會用於 threadNamePrefix 中。這對於日誌分析很有用。我們通常建議始終在記錄附加程式組態中包含執行緒名稱。當透過容器上的 taskExecutor 屬性明確提供 TaskExecutor 時,將按原樣使用,不做修改。建議您使用類似的技術來命名自訂 TaskExecutor Bean 定義所建立的執行緒,以協助在日誌訊息中識別執行緒。 |
在建立連線時,CachingConnectionFactory
中設定的 Executor
會傳遞到 RabbitMQ Client
,並且其執行緒會用於將新訊息交付到監聽器容器。如果未設定,用戶端會使用內部執行緒集區執行器,在撰寫本文時,每個連線的集區大小為 Runtime.getRuntime().availableProcessors() * 2
。
如果您有大量的工廠或正在使用 CacheMode.CONNECTION
,您可能希望考慮使用共用的 ThreadPoolTaskExecutor
,其具有足夠的執行緒來滿足您的工作負載。
使用 DirectMessageListenerContainer ,您需要確保連線工廠設定了具有足夠執行緒的任務執行器,以支援您在所有使用該工廠的監聽器容器中所需的並行。預設集區大小 (在撰寫本文時) 為 Runtime.getRuntime().availableProcessors() * 2 。 |
RabbitMQ client
使用 ThreadFactory
來建立用於低階 I/O (socket) 操作的執行緒。若要修改此工廠,您需要設定底層的 RabbitMQ ConnectionFactory
,如 設定底層用戶端連線工廠 中所述。