尋求特定偏移量

為了執行尋求 (seek) 操作,您的監聽器必須實作 `ConsumerSeekAware`,它具有以下方法

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions)

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

`registerSeekCallback` 在容器啟動時以及每次分配分割區時被呼叫。您應該在初始化後在任意時間尋求時使用此回呼。您應該儲存對回呼的參考。如果您在多個容器(或在 `ConcurrentMessageListenerContainer` 中)使用相同的監聽器,則應將回呼儲存在 `ThreadLocal` 或其他以監聽器 `Thread` 為鍵的結構中。

當使用群組管理時,當分割區被分配時,會呼叫 `onPartitionsAssigned`。例如,您可以透過呼叫回呼,使用此方法來設定分割區的初始偏移量。您也可以使用此方法將此執行緒的回呼與分配的分割區關聯(請參閱以下範例)。您必須使用回呼引數,而不是傳遞到 `registerSeekCallback` 中的引數。從 2.5.5 版本開始,即使在使用手動分割區分配時,也會呼叫此方法。

當容器停止或 Kafka 撤銷分配時,會呼叫 `onPartitionsRevoked`。您應該捨棄此執行緒的回呼,並移除與撤銷分割區的任何關聯。

回呼具有以下方法

void seek(String topic, int partition, long offset);

void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

`seek` 方法的兩種不同變體提供了一種尋求任意偏移量的方法。以 `Function` 作為引數來計算偏移量的方法是在框架的 3.2 版本中新增的。此函數提供對目前偏移量的存取權(消費者傳回的目前位置,即將擷取的下一個偏移量)。使用者可以根據函數定義中消費者中的目前偏移量,決定要尋求的偏移量。

`seekRelative` 在 2.3 版本中新增,以執行相對尋求。

  • `offset` 為負數且 `toCurrent` 為 `false` - 相對於分割區的末尾尋求。

  • `offset` 為正數且 `toCurrent` 為 `false` - 相對於分割區的開頭尋求。

  • `offset` 為負數且 `toCurrent` 為 `true` - 相對於目前位置尋求(倒轉)。

  • `offset` 為正數且 `toCurrent` 為 `true` - 相對於目前位置尋求(快轉)。

`seekToTimestamp` 方法也在 2.3 版本中新增。

當在 `onIdleContainer` 或 `onPartitionsAssigned` 方法中尋求多個分割區的相同時間戳記時,第二種方法是較佳的選擇,因為在單次呼叫消費者的 `offsetsForTimes` 方法中尋找時間戳記的偏移量更有效率。從其他位置呼叫時,容器將收集所有時間戳記尋求請求,並單次呼叫 `offsetsForTimes`。

當偵測到閒置容器時,您也可以從 `onIdleContainer()` 執行尋求操作。請參閱偵測閒置和無回應的消費者,以了解如何啟用閒置容器偵測。

接受集合的 `seekToBeginning` 方法很有用,例如,當處理壓縮主題且您希望在每次應用程式啟動時尋求開頭時
public class MyListener implements ConsumerSeekAware {

    ...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}

若要在執行時任意尋求,請使用來自 `registerSeekCallback` 的回呼參考,用於適當的執行緒。

這是一個簡單的 Spring Boot 應用程式,示範如何使用回呼;它傳送 10 筆紀錄到主題;在主控台中按下 `<Enter>` 會導致所有分割區尋求到開頭。

@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

為了簡化操作,2.3 版本新增了 `AbstractConsumerSeekAware` 類別,它可以追蹤哪個回呼要用於主題/分割區。以下範例示範如何在每次容器閒置時,尋求每個分割區中最後處理的紀錄。它也具有允許任意外部呼叫將分割區倒轉一個紀錄的方法。

public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getSeekCallbacks()
            .forEach((tp, callback) ->
                callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbackFor(new TopicPartition(topic, partition))
            .seekRelative(topic, partition, -1, true);
    }

}

2.6 版本在抽象類別中新增了便利方法

  • `seekToBeginning()` - 將所有分配的分割區尋求到開頭。

  • `seekToEnd()` - 將所有分配的分割區尋求到末尾。

  • `seekToTimestamp(long timestamp)` - 將所有分配的分割區尋求到該時間戳記表示的偏移量。

範例

public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listn(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
    }

}