尋求特定偏移量
為了執行尋求 (seek) 操作,您的監聽器必須實作 `ConsumerSeekAware`,它具有以下方法
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
`registerSeekCallback` 在容器啟動時以及每次分配分割區時被呼叫。您應該在初始化後在任意時間尋求時使用此回呼。您應該儲存對回呼的參考。如果您在多個容器(或在 `ConcurrentMessageListenerContainer` 中)使用相同的監聽器,則應將回呼儲存在 `ThreadLocal` 或其他以監聽器 `Thread` 為鍵的結構中。
當使用群組管理時,當分割區被分配時,會呼叫 `onPartitionsAssigned`。例如,您可以透過呼叫回呼,使用此方法來設定分割區的初始偏移量。您也可以使用此方法將此執行緒的回呼與分配的分割區關聯(請參閱以下範例)。您必須使用回呼引數,而不是傳遞到 `registerSeekCallback` 中的引數。從 2.5.5 版本開始,即使在使用手動分割區分配時,也會呼叫此方法。
當容器停止或 Kafka 撤銷分配時,會呼叫 `onPartitionsRevoked`。您應該捨棄此執行緒的回呼,並移除與撤銷分割區的任何關聯。
回呼具有以下方法
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
`seek` 方法的兩種不同變體提供了一種尋求任意偏移量的方法。以 `Function` 作為引數來計算偏移量的方法是在框架的 3.2 版本中新增的。此函數提供對目前偏移量的存取權(消費者傳回的目前位置,即將擷取的下一個偏移量)。使用者可以根據函數定義中消費者中的目前偏移量,決定要尋求的偏移量。
`seekRelative` 在 2.3 版本中新增,以執行相對尋求。
-
`offset` 為負數且 `toCurrent` 為 `false` - 相對於分割區的末尾尋求。
-
`offset` 為正數且 `toCurrent` 為 `false` - 相對於分割區的開頭尋求。
-
`offset` 為負數且 `toCurrent` 為 `true` - 相對於目前位置尋求(倒轉)。
-
`offset` 為正數且 `toCurrent` 為 `true` - 相對於目前位置尋求(快轉)。
`seekToTimestamp` 方法也在 2.3 版本中新增。
當在 `onIdleContainer` 或 `onPartitionsAssigned` 方法中尋求多個分割區的相同時間戳記時,第二種方法是較佳的選擇,因為在單次呼叫消費者的 `offsetsForTimes` 方法中尋找時間戳記的偏移量更有效率。從其他位置呼叫時,容器將收集所有時間戳記尋求請求,並單次呼叫 `offsetsForTimes`。 |
當偵測到閒置容器時,您也可以從 `onIdleContainer()` 執行尋求操作。請參閱偵測閒置和無回應的消費者,以了解如何啟用閒置容器偵測。
接受集合的 `seekToBeginning` 方法很有用,例如,當處理壓縮主題且您希望在每次應用程式啟動時尋求開頭時 |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
若要在執行時任意尋求,請使用來自 `registerSeekCallback` 的回呼參考,用於適當的執行緒。
這是一個簡單的 Spring Boot 應用程式,示範如何使用回呼;它傳送 10 筆紀錄到主題;在主控台中按下 `<Enter>` 會導致所有分割區尋求到開頭。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
為了簡化操作,2.3 版本新增了 `AbstractConsumerSeekAware` 類別,它可以追蹤哪個回呼要用於主題/分割區。以下範例示範如何在每次容器閒置時,尋求每個分割區中最後處理的紀錄。它也具有允許任意外部呼叫將分割區倒轉一個紀錄的方法。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
2.6 版本在抽象類別中新增了便利方法
-
`seekToBeginning()` - 將所有分配的分割區尋求到開頭。
-
`seekToEnd()` - 將所有分配的分割區尋求到末尾。
-
`seekToTimestamp(long timestamp)` - 將所有分配的分割區尋求到該時間戳記表示的偏移量。
範例
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}