暫停和恢復監聽器容器
Version 2.1.3 在監聽器容器中新增了 pause()
和 resume()
方法。先前,您可以在 ConsumerAwareMessageListener
中暫停消費者,並透過監聽 ListenerContainerIdleEvent
來恢復它,這提供了對 Consumer
物件的存取權。雖然您可以使用事件監聽器來暫停閒置容器中的消費者,但在某些情況下,這不是執行緒安全的,因為無法保證事件監聽器是在消費者執行緒上調用的。為了安全地暫停和恢復消費者,您應該在監聽器容器上使用 pause
和 resume
方法。pause()
在下一個 poll()
之前生效;resume()
在目前的 poll()
返回之後生效。當容器暫停時,它會繼續 poll()
消費者,如果正在使用群組管理,則避免重新平衡,但它不會檢索任何記錄。請參閱 Kafka 文件以獲取更多資訊。
從版本 2.1.5 開始,您可以呼叫 isPauseRequested()
來查看是否已呼叫 pause()
。但是,消費者可能尚未實際暫停。如果所有 Consumer
實例都已實際暫停,則 isConsumerPaused()
返回 true。
此外(也從 2.1.5 開始),ConsumerPausedEvent
和 ConsumerResumedEvent
實例會與容器一起發布,作為 source
屬性,而 TopicPartition
實例則包含在 partitions
屬性中。
從版本 2.9 開始,一個新的容器屬性 pauseImmediate
,當設定為 true 時,會導致在處理完當前記錄後暫停生效。預設情況下,暫停會在處理完先前輪詢的所有記錄後生效。請參閱 pauseImmediate。
以下簡單的 Spring Boot 應用程式示範了如何使用容器註冊表來取得對 @KafkaListener
方法容器的參考,並暫停或恢復其消費者,以及接收相應的事件
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
以下清單顯示了先前範例的結果
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2