@KafkaListener 生命周期管理

@KafkaListener 註解建立的監聽器容器不是應用程式上下文中的 Bean。相反地,它們會向 KafkaListenerEndpointRegistry 類型的基礎結構 Bean 註冊。此 Bean 由框架自動宣告,並管理容器的生命週期;它將自動啟動任何將 autoStartup 設定為 true 的容器。所有容器工廠建立的所有容器都必須處於相同的 phase。請參閱監聽器容器自動啟動以取得更多資訊。您可以使用註冊表以程式化方式管理生命週期。啟動或停止註冊表將啟動或停止所有已註冊的容器。或者,您可以使用其 id 屬性取得對個別容器的參考。您可以在註解上設定 autoStartup,這會覆寫設定到容器工廠中的預設設定。您可以從應用程式上下文取得對 Bean 的參考(例如自動注入)以管理其已註冊的容器。以下範例示範如何執行此操作

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

註冊表僅維護其管理的容器的生命週期;宣告為 Bean 的容器不受註冊表管理,並且可以從應用程式上下文中取得。可以透過呼叫註冊表的 getListenerContainers() 方法取得受管理容器的集合。版本 2.2.5 新增了一個便利方法 getAllListenerContainers(),它會傳回所有容器的集合,包括註冊表管理的容器和宣告為 Bean 的容器。傳回的集合將包含任何已初始化的原型 Bean,但它不會初始化任何延遲 Bean 宣告。

在應用程式上下文重新整理後註冊的端點將立即啟動,無論其 autoStartup 屬性為何,以符合 SmartLifecycle 合約,其中 autoStartup 僅在應用程式上下文初始化期間考量。延遲註冊的一個範例是具有原型範圍 @KafkaListener 的 Bean,在上下文初始化後建立實例。從版本 2.8.7 開始,您可以將註冊表的 alwaysStartAfterRefresh 屬性設定為 false,然後容器的 autoStartup 屬性將定義是否啟動容器。

從 KafkaListenerEndpointRegistry 擷取 MessageListenerContainers

KafkaListenerEndpointRegistry 提供用於擷取 MessageListenerContainer 實例的方法,以適應各種管理情境

所有容器:對於涵蓋所有監聽器容器的操作,請使用 getListenerContainers() 擷取全面的集合。

Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();

依 ID 尋找特定容器:若要管理個別容器,getListenerContainer(String id) 可讓您依 ID 擷取。

MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");

動態容器篩選:在 3.2 版中引入,兩個多載的 getListenerContainersMatching 方法可實現精細的容器選擇。其中一個方法採用 Predicate<String> 作為基於 ID 篩選的參數,而另一個方法採用 BiPredicate<String, MessageListenerContainer> 作為更進階的條件,其中可能包含容器屬性或狀態作為參數。

// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
    registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));

// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
    registry.getListenerContainersMatching(myPattern::matches);

// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
    registry.getListenerContainersMatching(myIdSet::contains);

// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
    registry.getListenerContainersMatching(
        (id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
    );

利用這些方法有效率地管理和查詢應用程式中的 MessageListenerContainer 實例。