時間戳記提取器
Kafka Streams 允許您根據時間戳記的各種概念來控制消費者記錄的處理。預設情況下,Kafka Streams 會提取嵌入在消費者記錄中的時間戳記元數據。您可以通過為每個輸入綁定提供不同的 TimestampExtractor
實作來更改此預設行為。以下是如何完成的一些詳細資訊。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然後,您為每個消費者綁定設定上述 TimestampExtractor
bean 名稱。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果您跳過輸入消費者綁定以設定自訂時間戳記提取器,則該消費者將使用預設設定。