應用程式事件

下列 Spring 應用程式事件由監聽器容器及其消費者發布

  • ConsumerStartingEvent:當消費者執行緒首次啟動時發布,在開始輪詢之前。

  • ConsumerStartedEvent:當消費者即將開始輪詢時發布。

  • ConsumerFailedToStartEvent:如果在 consumerStartTimeout 容器屬性指定的時間內未發布 ConsumerStartingEvent,則會發布此事件。此事件可能表示設定的工作執行器沒有足夠的執行緒來支援其使用的容器及其並行性。當發生這種情況時,也會記錄錯誤訊息。

  • ListenerContainerIdleEvent:如果在 idleEventInterval 內沒有收到訊息(如果已設定),則會發布此事件。

  • ListenerContainerNoLongerIdleEvent:在先前發布 ListenerContainerIdleEvent 之後,當收到記錄時發布。

  • ListenerContainerPartitionIdleEvent:如果在 idlePartitionEventInterval 內沒有從該分割區收到訊息(如果已設定),則會發布此事件。

  • ListenerContainerPartitionNoLongerIdleEvent:當從先前發布 ListenerContainerPartitionIdleEvent 的分割區收到記錄時發布。

  • NonResponsiveConsumerEvent:當消費者似乎在 poll 方法中被封鎖時發布。

  • ConsumerPartitionPausedEvent:當分割區暫停時,由每個消費者發布。

  • ConsumerPartitionResumedEvent:當分割區恢復時,由每個消費者發布。

  • ConsumerPausedEvent:當容器暫停時,由每個消費者發布。

  • ConsumerResumedEvent:當容器恢復時,由每個消費者發布。

  • ConsumerStoppingEvent:在停止之前,由每個消費者發布。

  • ConsumerStoppedEvent:在消費者關閉後發布。請參閱 執行緒安全

  • ConsumerRetryAuthEvent:當消費者的身份驗證或授權失敗並正在重試時發布。

  • ConsumerRetryAuthSuccessfulEvent:當身份驗證或授權已成功重試時發布。只能在之前有 ConsumerRetryAuthEvent 的情況下發生。

  • ContainerStoppedEvent:當所有消費者都已停止時發布。

預設情況下,應用程式內容的事件多播器在呼叫執行緒上調用事件監聽器。如果您將多播器變更為使用非同步執行器,則當事件包含對消費者的參考時,您不得調用任何 Consumer 方法。

ListenerContainerIdleEvent 具有以下屬性

  • source:發布事件的監聽器容器實例。

  • container:監聽器容器或父監聽器容器,如果來源容器是子容器。

  • id:監聽器 ID(或容器 bean 名稱)。

  • idleTime:事件發布時容器處於閒置狀態的時間。

  • topicPartitions:事件產生時容器被分配的主題和分割區。

  • consumer:對 Kafka Consumer 物件的參考。例如,如果先前調用了消費者的 pause() 方法,則可以在收到事件時 resume()

  • paused:容器目前是否已暫停。有關更多資訊,請參閱 暫停和恢復監聽器容器

ListenerContainerNoLongerIdleEvent 具有相同的屬性,除了 idleTimepaused

ListenerContainerPartitionIdleEvent 具有以下屬性

  • source:發布事件的監聽器容器實例。

  • container:監聽器容器或父監聽器容器,如果來源容器是子容器。

  • id:監聽器 ID(或容器 bean 名稱)。

  • idleTime:分割區消費處於閒置狀態的時間(事件發布時)。

  • topicPartition:觸發事件的主題和分割區。

  • consumer:對 Kafka Consumer 物件的參考。例如,如果先前調用了消費者的 pause() 方法,則可以在收到事件時 resume()

  • paused:該分割區消費目前是否針對該消費者暫停。有關更多資訊,請參閱 暫停和恢復監聽器容器

ListenerContainerPartitionNoLongerIdleEvent 具有相同的屬性,除了 idleTimepaused

NonResponsiveConsumerEvent 具有以下屬性

  • source:發布事件的監聽器容器實例。

  • container:監聽器容器或父監聽器容器,如果來源容器是子容器。

  • id:監聽器 ID(或容器 bean 名稱)。

  • timeSinceLastPoll:容器上次調用 poll() 之前的時間。

  • topicPartitions:事件產生時容器被分配的主題和分割區。

  • consumer:對 Kafka Consumer 物件的參考。例如,如果先前調用了消費者的 pause() 方法,則可以在收到事件時 resume()

  • paused:容器目前是否已暫停。有關更多資訊,請參閱 暫停和恢復監聽器容器

ConsumerPausedEventConsumerResumedEventConsumerStopping 事件具有以下屬性

  • source:發布事件的監聽器容器實例。

  • container:監聽器容器或父監聽器容器,如果來源容器是子容器。

  • partitions:涉及的 TopicPartition 實例。

ConsumerPartitionPausedEventConsumerPartitionResumedEvent 事件具有以下屬性

  • source:發布事件的監聽器容器實例。

  • container:監聽器容器或父監聽器容器,如果來源容器是子容器。

  • partition:涉及的 TopicPartition 實例。

ConsumerRetryAuthEvent 事件具有以下屬性

  • source:發布事件的監聽器容器實例。

  • container:監聽器容器或父監聽器容器,如果來源容器是子容器。

  • 原因:

    • AUTHENTICATION - 事件發布是因為身份驗證例外。

    • AUTHORIZATION - 事件發布是因為授權例外。

ConsumerStartingEventConsumerStartedEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEvent 事件具有以下屬性

  • source:發布事件的監聽器容器實例。

  • container:監聽器容器或父監聽器容器,如果來源容器是子容器。

所有容器(無論是子容器還是父容器)都發布 ContainerStoppedEvent。對於父容器,來源和容器屬性相同。

此外,ConsumerStoppedEvent 還具有以下附加屬性

  • 原因:

    • NORMAL - 消費者正常停止(容器已停止)。

    • ERROR - 拋出 java.lang.Error

    • FENCED - 交易生產者被隔離,且 stopContainerWhenFenced 容器屬性為 true

    • AUTH - 拋出 AuthenticationExceptionAuthorizationException,且未設定 authExceptionRetryInterval

    • NO_OFFSET - 分割區沒有偏移量,且 auto.offset.reset 策略為 none

您可以使用此事件在這種情況後重新啟動容器

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}

偵測閒置和無回應的消費者

雖然非同步消費者很有效率,但一個問題是如何偵測它們何時閒置。如果一段時間內沒有訊息到達,您可能想要採取一些動作。

您可以設定監聽器容器,以便在一段時間沒有訊息傳遞時發布 ListenerContainerIdleEvent。當容器閒置時,每隔 idleEventInterval 毫秒就會發布一個事件。

若要設定此功能,請在容器上設定 idleEventInterval。以下範例示範如何執行此操作

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

以下範例示範如何為 @KafkaListener 設定 idleEventInterval

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在每種情況下,當容器閒置時,每分鐘都會發布一次事件。

如果由於某些原因,消費者 poll() 方法沒有退出,則不會收到任何訊息,並且無法產生閒置事件(這在早期版本的 kafka-clients 中是一個問題,當時 Broker 無法連線)。在這種情況下,如果輪詢在 3x pollTimeout 屬性內沒有傳回,則容器會發布 NonResponsiveConsumerEvent。預設情況下,每個容器每 30 秒執行一次此檢查。您可以透過在設定監聽器容器時,在 ContainerProperties 中設定 monitorInterval(預設為 30 秒)和 noPollThreshold(預設為 3.0)屬性來修改此行為。noPollThreshold 應大於 1.0,以避免由於競爭條件而產生虛假事件。收到此類事件可讓您停止容器,從而喚醒消費者以便它可以停止。

從 2.6.2 版開始,如果容器發布了 ListenerContainerIdleEvent,則在隨後收到記錄時,它將發布 ListenerContainerNoLongerIdleEvent

事件消費

您可以透過實作 ApplicationListener 來捕獲這些事件 — 一般監聽器或僅接收此特定事件的監聽器。您也可以使用 Spring Framework 4.2 中引入的 @EventListener

下一個範例將 @KafkaListener@EventListener 組合到一個類別中。您應該理解,應用程式監聽器會取得所有容器的事件,因此如果您想要根據哪個容器閒置來採取特定動作,則可能需要檢查監聽器 ID。您也可以使用 @EventListenercondition 來達到此目的。

請參閱 應用程式事件 以取得有關事件屬性的資訊。

事件通常在消費者執行緒上發布,因此可以安全地與 Consumer 物件互動。

以下範例同時使用 @KafkaListener@EventListener

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
事件監聽器會看到所有容器的事件。因此,在前面的範例中,我們根據監聽器 ID 縮小了收到的事件範圍。由於為 @KafkaListener 建立的容器支援並行性,因此實際容器被命名為 id-n,其中 n 是每個實例的唯一值,以支援並行性。這就是我們在條件中使用 startsWith 的原因。
如果您希望使用閒置事件來停止監聽器容器,則不應在調用監聽器的執行緒上調用 container.stop()。這樣做會導致延遲和不必要的日誌訊息。相反地,您應該將事件交給不同的執行緒,然後該執行緒可以停止容器。此外,如果容器是子容器,則不應 stop() 容器實例。您應該停止並行容器。

閒置時的目前位置

請注意,您可以透過在監聽器中實作 ConsumerSeekAware 來取得偵測到閒置時的目前位置。請參閱 seek 中的 onIdleContainer()