暫停和恢復監聽器容器

Version 2.1.3 在監聽器容器中新增了 pause()resume() 方法。先前,您可以在 ConsumerAwareMessageListener 中暫停消費者,並透過監聽 ListenerContainerIdleEvent 來恢復它,這提供了對 Consumer 物件的存取權。雖然您可以使用事件監聽器來暫停閒置容器中的消費者,但在某些情況下,這不是執行緒安全的,因為無法保證事件監聽器是在消費者執行緒上調用的。為了安全地暫停和恢復消費者,您應該在監聽器容器上使用 pauseresume 方法。pause() 在下一個 poll() 之前生效;resume() 在目前的 poll() 返回之後生效。當容器暫停時,它會繼續 poll() 消費者,如果正在使用群組管理,則避免重新平衡,但它不會檢索任何記錄。請參閱 Kafka 文件以獲取更多資訊。

從版本 2.1.5 開始,您可以呼叫 isPauseRequested() 來查看是否已呼叫 pause()。但是,消費者可能尚未實際暫停。如果所有 Consumer 實例都已實際暫停,則 isConsumerPaused() 返回 true。

此外(也從 2.1.5 開始),ConsumerPausedEventConsumerResumedEvent 實例會與容器一起發布,作為 source 屬性,而 TopicPartition 實例則包含在 partitions 屬性中。

從版本 2.9 開始,一個新的容器屬性 pauseImmediate,當設定為 true 時,會導致在處理完當前記錄後暫停生效。預設情況下,暫停會在處理完先前輪詢的所有記錄後生效。請參閱 pauseImmediate

以下簡單的 Spring Boot 應用程式示範了如何使用容器註冊表來取得對 @KafkaListener 方法容器的參考,並暫停或恢復其消費者,以及接收相應的事件

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下清單顯示了先前範例的結果

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2