依序啟動 @KafkaListener
常見的使用案例是在另一個監聽器已消耗完主題中的所有記錄後啟動一個監聽器。例如,您可能希望在處理來自其他主題的記錄之前,將一個或多個壓縮主題的內容載入記憶體中。從 2.7.3 版本開始,引入了一個新的元件 ContainerGroupSequencer
。它使用 @KafkaListener
的 containerGroup
屬性將容器分組在一起,並在目前群組中的所有容器都閒置時,啟動下一個群組中的容器。
最好用一個範例來說明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
在這裡,我們有 4 個監聽器,分為 g1
和 g2
兩個群組。
在應用程式內容初始化期間,sequencer 會將所提供群組中所有容器的 autoStartup
屬性設定為 false
。它也會為任何容器(尚未設定的容器)將 idleEventInterval
設定為提供的值(在此範例中為 5000 毫秒)。然後,當 sequencer 由應用程式內容啟動時,會啟動第一個群組中的容器。當接收到 ListenerContainerIdleEvent
時,每個容器中的每個個別子容器都會停止。當 ConcurrentMessageListenerContainer
中的所有子容器都停止時,父容器也會停止。當一個群組中的所有容器都已停止時,就會啟動下一個群組中的容器。群組或群組中容器的數量沒有限制。
預設情況下,當最後一個群組(上面的 g2
)中的容器閒置時,它們不會停止。若要修改該行為,請在 sequencer 上將 stopLastGroupWhenIdle
設定為 true
。
順帶一提,先前每個群組中的容器都會新增到類型為 Collection<MessageListenerContainer>
的 bean 中,bean 名稱是 containerGroup
。這些集合現在已被棄用,改用類型為 ContainerGroup
的 bean,其 bean 名稱是群組名稱,並附加 .group
後綴;在上面的範例中,將會有 2 個 bean,分別是 g1.group
和 g2.group
。Collection
bean 將在未來的版本中移除。