應用程式事件
下列 Spring 應用程式事件由監聽器容器及其消費者發布
-
ConsumerStartingEvent
:當消費者執行緒首次啟動時發布,在開始輪詢之前。 -
ConsumerStartedEvent
:當消費者即將開始輪詢時發布。 -
ConsumerFailedToStartEvent
:如果在consumerStartTimeout
容器屬性指定的時間內未發布ConsumerStartingEvent
,則會發布此事件。此事件可能表示設定的工作執行器沒有足夠的執行緒來支援其使用的容器及其並行性。當發生這種情況時,也會記錄錯誤訊息。 -
ListenerContainerIdleEvent
:如果在idleEventInterval
內沒有收到訊息(如果已設定),則會發布此事件。 -
ListenerContainerNoLongerIdleEvent
:在先前發布ListenerContainerIdleEvent
之後,當收到記錄時發布。 -
ListenerContainerPartitionIdleEvent
:如果在idlePartitionEventInterval
內沒有從該分割區收到訊息(如果已設定),則會發布此事件。 -
ListenerContainerPartitionNoLongerIdleEvent
:當從先前發布ListenerContainerPartitionIdleEvent
的分割區收到記錄時發布。 -
NonResponsiveConsumerEvent
:當消費者似乎在poll
方法中被封鎖時發布。 -
ConsumerPartitionPausedEvent
:當分割區暫停時,由每個消費者發布。 -
ConsumerPartitionResumedEvent
:當分割區恢復時,由每個消費者發布。 -
ConsumerPausedEvent
:當容器暫停時,由每個消費者發布。 -
ConsumerResumedEvent
:當容器恢復時,由每個消費者發布。 -
ConsumerStoppingEvent
:在停止之前,由每個消費者發布。 -
ConsumerStoppedEvent
:在消費者關閉後發布。請參閱 執行緒安全。 -
ConsumerRetryAuthEvent
:當消費者的身份驗證或授權失敗並正在重試時發布。 -
ConsumerRetryAuthSuccessfulEvent
:當身份驗證或授權已成功重試時發布。只能在之前有ConsumerRetryAuthEvent
的情況下發生。 -
ContainerStoppedEvent
:當所有消費者都已停止時發布。
預設情況下,應用程式內容的事件多播器在呼叫執行緒上調用事件監聽器。如果您將多播器變更為使用非同步執行器,則當事件包含對消費者的參考時,您不得調用任何 Consumer 方法。 |
ListenerContainerIdleEvent
具有以下屬性
-
source
:發布事件的監聽器容器實例。 -
container
:監聽器容器或父監聽器容器,如果來源容器是子容器。 -
id
:監聽器 ID(或容器 bean 名稱)。 -
idleTime
:事件發布時容器處於閒置狀態的時間。 -
topicPartitions
:事件產生時容器被分配的主題和分割區。 -
consumer
:對 KafkaConsumer
物件的參考。例如,如果先前調用了消費者的pause()
方法,則可以在收到事件時resume()
。 -
paused
:容器目前是否已暫停。有關更多資訊,請參閱 暫停和恢復監聽器容器。
ListenerContainerNoLongerIdleEvent
具有相同的屬性,除了 idleTime
和 paused
。
ListenerContainerPartitionIdleEvent
具有以下屬性
-
source
:發布事件的監聽器容器實例。 -
container
:監聽器容器或父監聽器容器,如果來源容器是子容器。 -
id
:監聽器 ID(或容器 bean 名稱)。 -
idleTime
:分割區消費處於閒置狀態的時間(事件發布時)。 -
topicPartition
:觸發事件的主題和分割區。 -
consumer
:對 KafkaConsumer
物件的參考。例如,如果先前調用了消費者的pause()
方法,則可以在收到事件時resume()
。 -
paused
:該分割區消費目前是否針對該消費者暫停。有關更多資訊,請參閱 暫停和恢復監聽器容器。
ListenerContainerPartitionNoLongerIdleEvent
具有相同的屬性,除了 idleTime
和 paused
。
NonResponsiveConsumerEvent
具有以下屬性
-
source
:發布事件的監聽器容器實例。 -
container
:監聽器容器或父監聽器容器,如果來源容器是子容器。 -
id
:監聽器 ID(或容器 bean 名稱)。 -
timeSinceLastPoll
:容器上次調用poll()
之前的時間。 -
topicPartitions
:事件產生時容器被分配的主題和分割區。 -
consumer
:對 KafkaConsumer
物件的參考。例如,如果先前調用了消費者的pause()
方法,則可以在收到事件時resume()
。 -
paused
:容器目前是否已暫停。有關更多資訊,請參閱 暫停和恢復監聽器容器。
ConsumerPausedEvent
、ConsumerResumedEvent
和 ConsumerStopping
事件具有以下屬性
-
source
:發布事件的監聽器容器實例。 -
container
:監聽器容器或父監聽器容器,如果來源容器是子容器。 -
partitions
:涉及的TopicPartition
實例。
ConsumerPartitionPausedEvent
、ConsumerPartitionResumedEvent
事件具有以下屬性
-
source
:發布事件的監聽器容器實例。 -
container
:監聽器容器或父監聽器容器,如果來源容器是子容器。 -
partition
:涉及的TopicPartition
實例。
ConsumerRetryAuthEvent
事件具有以下屬性
-
source
:發布事件的監聽器容器實例。 -
container
:監聽器容器或父監聽器容器,如果來源容器是子容器。 -
原因
:-
AUTHENTICATION
- 事件發布是因為身份驗證例外。 -
AUTHORIZATION
- 事件發布是因為授權例外。
-
ConsumerStartingEvent
、ConsumerStartedEvent
、ConsumerFailedToStartEvent
、ConsumerStoppedEvent
、ConsumerRetryAuthSuccessfulEvent
和 ContainerStoppedEvent
事件具有以下屬性
-
source
:發布事件的監聽器容器實例。 -
container
:監聽器容器或父監聽器容器,如果來源容器是子容器。
所有容器(無論是子容器還是父容器)都發布 ContainerStoppedEvent
。對於父容器,來源和容器屬性相同。
此外,ConsumerStoppedEvent
還具有以下附加屬性
-
原因
:-
NORMAL
- 消費者正常停止(容器已停止)。 -
ERROR
- 拋出java.lang.Error
。 -
FENCED
- 交易生產者被隔離,且stopContainerWhenFenced
容器屬性為true
。 -
AUTH
- 拋出AuthenticationException
或AuthorizationException
,且未設定authExceptionRetryInterval
。 -
NO_OFFSET
- 分割區沒有偏移量,且auto.offset.reset
策略為none
。
-
您可以使用此事件在這種情況後重新啟動容器
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
偵測閒置和無回應的消費者
雖然非同步消費者很有效率,但一個問題是如何偵測它們何時閒置。如果一段時間內沒有訊息到達,您可能想要採取一些動作。
您可以設定監聽器容器,以便在一段時間沒有訊息傳遞時發布 ListenerContainerIdleEvent
。當容器閒置時,每隔 idleEventInterval
毫秒就會發布一個事件。
若要設定此功能,請在容器上設定 idleEventInterval
。以下範例示範如何執行此操作
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下範例示範如何為 @KafkaListener
設定 idleEventInterval
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在每種情況下,當容器閒置時,每分鐘都會發布一次事件。
如果由於某些原因,消費者 poll()
方法沒有退出,則不會收到任何訊息,並且無法產生閒置事件(這在早期版本的 kafka-clients
中是一個問題,當時 Broker 無法連線)。在這種情況下,如果輪詢在 3x
pollTimeout
屬性內沒有傳回,則容器會發布 NonResponsiveConsumerEvent
。預設情況下,每個容器每 30 秒執行一次此檢查。您可以透過在設定監聽器容器時,在 ContainerProperties
中設定 monitorInterval
(預設為 30 秒)和 noPollThreshold
(預設為 3.0)屬性來修改此行為。noPollThreshold
應大於 1.0
,以避免由於競爭條件而產生虛假事件。收到此類事件可讓您停止容器,從而喚醒消費者以便它可以停止。
從 2.6.2 版開始,如果容器發布了 ListenerContainerIdleEvent
,則在隨後收到記錄時,它將發布 ListenerContainerNoLongerIdleEvent
。
事件消費
您可以透過實作 ApplicationListener
來捕獲這些事件 — 一般監聽器或僅接收此特定事件的監聽器。您也可以使用 Spring Framework 4.2 中引入的 @EventListener
。
下一個範例將 @KafkaListener
和 @EventListener
組合到一個類別中。您應該理解,應用程式監聽器會取得所有容器的事件,因此如果您想要根據哪個容器閒置來採取特定動作,則可能需要檢查監聽器 ID。您也可以使用 @EventListener
的 condition
來達到此目的。
請參閱 應用程式事件 以取得有關事件屬性的資訊。
事件通常在消費者執行緒上發布,因此可以安全地與 Consumer
物件互動。
以下範例同時使用 @KafkaListener
和 @EventListener
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件監聽器會看到所有容器的事件。因此,在前面的範例中,我們根據監聽器 ID 縮小了收到的事件範圍。由於為 @KafkaListener 建立的容器支援並行性,因此實際容器被命名為 id-n ,其中 n 是每個實例的唯一值,以支援並行性。這就是我們在條件中使用 startsWith 的原因。 |
如果您希望使用閒置事件來停止監聽器容器,則不應在調用監聽器的執行緒上調用 container.stop() 。這樣做會導致延遲和不必要的日誌訊息。相反地,您應該將事件交給不同的執行緒,然後該執行緒可以停止容器。此外,如果容器是子容器,則不應 stop() 容器實例。您應該停止並行容器。 |
閒置時的目前位置
請注意,您可以透過在監聽器中實作 ConsumerSeekAware
來取得偵測到閒置時的目前位置。請參閱 seek 中的 onIdleContainer()
。