@KafkaListener
生命周期管理
為 @KafkaListener
註解建立的監聽器容器不是應用程式上下文中的 Bean。相反地,它們會向 KafkaListenerEndpointRegistry
類型的基礎結構 Bean 註冊。此 Bean 由框架自動宣告,並管理容器的生命週期;它將自動啟動任何將 autoStartup
設定為 true
的容器。所有容器工廠建立的所有容器都必須處於相同的 phase
。請參閱監聽器容器自動啟動以取得更多資訊。您可以使用註冊表以程式化方式管理生命週期。啟動或停止註冊表將啟動或停止所有已註冊的容器。或者,您可以使用其 id
屬性取得對個別容器的參考。您可以在註解上設定 autoStartup
,這會覆寫設定到容器工廠中的預設設定。您可以從應用程式上下文取得對 Bean 的參考(例如自動注入)以管理其已註冊的容器。以下範例示範如何執行此操作
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
註冊表僅維護其管理的容器的生命週期;宣告為 Bean 的容器不受註冊表管理,並且可以從應用程式上下文中取得。可以透過呼叫註冊表的 getListenerContainers()
方法取得受管理容器的集合。版本 2.2.5 新增了一個便利方法 getAllListenerContainers()
,它會傳回所有容器的集合,包括註冊表管理的容器和宣告為 Bean 的容器。傳回的集合將包含任何已初始化的原型 Bean,但它不會初始化任何延遲 Bean 宣告。
在應用程式上下文重新整理後註冊的端點將立即啟動,無論其 autoStartup 屬性為何,以符合 SmartLifecycle 合約,其中 autoStartup 僅在應用程式上下文初始化期間考量。延遲註冊的一個範例是具有原型範圍 @KafkaListener 的 Bean,在上下文初始化後建立實例。從版本 2.8.7 開始,您可以將註冊表的 alwaysStartAfterRefresh 屬性設定為 false ,然後容器的 autoStartup 屬性將定義是否啟動容器。 |
從 KafkaListenerEndpointRegistry 擷取 MessageListenerContainers
KafkaListenerEndpointRegistry
提供用於擷取 MessageListenerContainer
實例的方法,以適應各種管理情境
所有容器:對於涵蓋所有監聽器容器的操作,請使用 getListenerContainers()
擷取全面的集合。
Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();
依 ID 尋找特定容器:若要管理個別容器,getListenerContainer(String id)
可讓您依 ID 擷取。
MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");
動態容器篩選:在 3.2 版中引入,兩個多載的 getListenerContainersMatching
方法可實現精細的容器選擇。其中一個方法採用 Predicate<String>
作為基於 ID 篩選的參數,而另一個方法採用 BiPredicate<String, MessageListenerContainer>
作為更進階的條件,其中可能包含容器屬性或狀態作為參數。
// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));
// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
registry.getListenerContainersMatching(myPattern::matches);
// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
registry.getListenerContainersMatching(myIdSet::contains);
// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
registry.getListenerContainersMatching(
(id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
);
利用這些方法有效率地管理和查詢應用程式中的 MessageListenerContainer
實例。