強制消費者重新平衡

Kafka 用戶端現在支援觸發強制重新平衡的選項。從 3.1.2 版本開始,Spring for Apache Kafka 提供一個選項,透過訊息監聽器容器在 Kafka 消費者上調用此 API。當調用此 API 時,它只是提醒 Kafka 消費者觸發強制重新平衡;實際的重新平衡只會在下一次 poll() 操作中發生。如果已經有重新平衡正在進行中,則調用強制重新平衡是無操作 (NO-OP) 的。調用者必須等待目前的重新平衡完成後才能調用另一個。請參閱 enforceRebalance 的 javadocs 以取得更多詳細資訊。

以下程式碼片段顯示使用訊息監聽器容器強制重新平衡的本質。

@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
    System.out.println("From KafkaListener: " + in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
    return args -> {
        final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
        System.out.println("Enforcing a rebalance");
        Thread.sleep(5_000);
        listenerContainer.enforceRebalance();
        Thread.sleep(5_000);
    };
}

如以上程式碼所示,應用程式使用 KafkaListenerEndpointRegistry 來存取訊息監聽器容器,然後在其上調用 enforceRebalnce API。當在監聽器容器上調用 enforceRebalance 時,它會將呼叫委派給底層的 Kafka 消費者。Kafka 消費者將在下一次 poll() 操作中觸發重新平衡。