重新平衡監聽器

ContainerProperties 具有一個名為 consumerRebalanceListener 的屬性,它接受 Kafka 客户端 ConsumerRebalanceListener 介面的實作。如果未提供此屬性,容器會配置一個日誌記錄監聽器,以 INFO 層級記錄重新平衡事件。框架還新增了一個子介面 ConsumerAwareRebalanceListener。以下清單顯示 ConsumerAwareRebalanceListener 介面定義

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

請注意,當分割區被撤銷時,有兩個回呼。第一個會立即呼叫。第二個會在提交任何待處理的偏移量後呼叫。如果您希望在某些外部儲存庫中維護偏移量,這會很有用,如下例所示

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
從 2.4 版開始,新增了一個新的方法 onPartitionsLost()(類似於 ConsumerRebalanceLister 中同名的方法)。ConsumerRebalanceLister 上的預設實作只會呼叫 onPartionsRevokedConsumerAwareRebalanceListener 上的預設實作則不做任何事。當為監聽器容器提供自訂監聽器(任一類型)時,重要的是您的實作不要從 onPartitionsLost 呼叫 onPartitionsRevoked。如果您實作 ConsumerRebalanceListener,則應覆寫預設方法。這是因為監聽器容器將在呼叫您實作上的方法後,從其 onPartitionsLost 實作中呼叫自己的 onPartitionsRevoked。如果您的實作委派給預設行為,則每次 Consumer 在容器的監聽器上呼叫該方法時,onPartitionsRevoked 都會被呼叫兩次。