重新平衡監聽器
ContainerProperties
具有一個名為 consumerRebalanceListener
的屬性,它接受 Kafka 客户端 ConsumerRebalanceListener
介面的實作。如果未提供此屬性,容器會配置一個日誌記錄監聽器,以 INFO
層級記錄重新平衡事件。框架還新增了一個子介面 ConsumerAwareRebalanceListener
。以下清單顯示 ConsumerAwareRebalanceListener
介面定義
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
請注意,當分割區被撤銷時,有兩個回呼。第一個會立即呼叫。第二個會在提交任何待處理的偏移量後呼叫。如果您希望在某些外部儲存庫中維護偏移量,這會很有用,如下例所示
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
從 2.4 版開始,新增了一個新的方法 onPartitionsLost() (類似於 ConsumerRebalanceLister 中同名的方法)。ConsumerRebalanceLister 上的預設實作只會呼叫 onPartionsRevoked 。ConsumerAwareRebalanceListener 上的預設實作則不做任何事。當為監聽器容器提供自訂監聽器(任一類型)時,重要的是您的實作不要從 onPartitionsLost 呼叫 onPartitionsRevoked 。如果您實作 ConsumerRebalanceListener ,則應覆寫預設方法。這是因為監聽器容器將在呼叫您實作上的方法後,從其 onPartitionsLost 實作中呼叫自己的 onPartitionsRevoked 。如果您的實作委派給預設行為,則每次 Consumer 在容器的監聽器上呼叫該方法時,onPartitionsRevoked 都會被呼叫兩次。 |