執行緒安全

當使用並行訊息監聽器容器時,單個監聽器實例會在所有消費者執行緒上被調用。 因此,監聽器需要是執行緒安全的,並且最好使用無狀態監聽器。 如果無法使您的監聽器執行緒安全,或者添加同步會顯著降低添加並行的好處,您可以使用以下幾種技術之一

  • 使用 n 個容器,並將 concurrency=1 與原型作用域的 MessageListener Bean 一起使用,以便每個容器都獲得自己的實例(當使用 @KafkaListener 時,這是不可能的)。

  • 將狀態保存在 ThreadLocal<?> 實例中。

  • 讓單例監聽器委託給在 SimpleThreadScope(或類似作用域)中宣告的 Bean。

為了方便清理執行緒狀態(對於前面列表中的第二項和第三項),從版本 2.2 開始,當每個執行緒退出時,監聽器容器會發布 ConsumerStoppedEvent。 您可以使用 ApplicationListener@EventListener 方法來使用這些事件,以移除 ThreadLocal<?> 實例或從作用域中 remove() 執行緒作用域的 Bean。 請注意,SimpleThreadScope 不會銷毀具有銷毀介面(例如 DisposableBean)的 Bean,因此您應該自行 destroy() 實例。

預設情況下,應用程式上下文的事件多播器會在調用執行緒上調用事件監聽器。 如果您更改多播器以使用異步執行器,則執行緒清理將無效。

關於虛擬執行緒和並行訊息監聽器容器的特別注意事項

由於底層程式庫類別中仍然使用 synchronized 區塊進行執行緒協調的某些限制,因此在使用虛擬執行緒和並行訊息監聽器容器時,應用程式需要謹慎。 當啟用虛擬執行緒時,如果並行性超過可用平台執行緒的數量,則虛擬執行緒很可能被釘在平台執行緒上,並可能發生競爭條件。 因此,隨著 Spring for Apache Kafka 使用的第三方程式庫不斷發展以完全支援虛擬執行緒,建議將訊息監聽器容器上的並行性保持為等於或小於平台執行緒的數量。 這樣,應用程式可以避免執行緒之間的任何競爭條件以及虛擬執行緒被釘在平台執行緒上。