Apache Kafka Streams 支援
從 1.1.4 版本開始,Spring for Apache Kafka 為 Kafka Streams 提供一流的支援。若要從 Spring 應用程式中使用它,kafka-streams
jar 必須存在於類別路徑中。它是 Spring for Apache Kafka 專案的可選相依性,不會以傳遞方式下載。
基礎知識
參考 Apache Kafka Streams 文件建議以下使用 API 的方式
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
因此,我們有兩個主要組件
-
StreamsBuilder
:具有建構KStream
(或KTable
)實例的 API。 -
KafkaStreams
:用於管理這些實例的生命週期。
由單一 StreamsBuilder 暴露給 KafkaStreams 實例的所有 KStream 實例都會同時啟動和停止,即使它們具有不同的邏輯。換句話說,由 StreamsBuilder 定義的所有串流都與單一生命週期控制繫結。一旦 KafkaStreams 實例被 streams.close() 關閉,就無法重新啟動。相反地,必須建立新的 KafkaStreams 實例才能重新啟動串流處理。 |
Spring 管理
為了簡化從 Spring 應用程式上下文角度使用 Kafka Streams,並透過容器使用生命週期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean
。這是 AbstractFactoryBean
實作,用於將 StreamsBuilder
單例實例公開為 bean。以下範例建立這樣的 bean
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
從 2.2 版本開始,串流組態現在以 KafkaStreamsConfiguration 物件而非 StreamsConfig 的形式提供。 |
StreamsBuilderFactoryBean
也實作了 SmartLifecycle
來管理內部 KafkaStreams
實例的生命週期。與 Kafka Streams API 類似,您必須在啟動 KafkaStreams
之前定義 KStream
實例。這也適用於 Kafka Streams 的 Spring API。因此,當您在 StreamsBuilderFactoryBean
上使用預設的 autoStartup = true
時,您必須在應用程式上下文重新整理之前在 StreamsBuilder
上宣告 KStream
實例。例如,KStream
可以是常規的 bean 定義,同時 Kafka Streams API 的使用不會產生任何影響。以下範例示範如何執行此操作
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果您想手動控制生命週期(例如,透過某些條件停止和啟動),您可以透過使用 factory bean (&
) 前綴 直接參考 StreamsBuilderFactoryBean
bean。由於 StreamsBuilderFactoryBean
使用其內部 KafkaStreams
實例,因此可以安全地再次停止和重新啟動它。每次 start()
都會建立新的 KafkaStreams
。如果您想分別控制 KStream
實例的生命週期,您也可以考慮使用不同的 StreamsBuilderFactoryBean
實例。
您也可以在 StreamsBuilderFactoryBean
上指定 KafkaStreams.StateListener
、Thread.UncaughtExceptionHandler
和 StateRestoreListener
選項,這些選項會委派給內部 KafkaStreams
實例。此外,除了間接在 StreamsBuilderFactoryBean
上設定這些選項之外,從版本 2.1.5 開始,您可以使用 KafkaStreamsCustomizer
回呼介面來組態內部 KafkaStreams
實例。請注意,KafkaStreamsCustomizer
會覆寫 StreamsBuilderFactoryBean
提供的選項。如果您需要直接執行某些 KafkaStreams
操作,您可以使用 StreamsBuilderFactoryBean.getKafkaStreams()
存取該內部 KafkaStreams
實例。您可以依類型自動裝配 StreamsBuilderFactoryBean
bean,但您應確保在 bean 定義中使用完整類型,如下列範例所示
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,如果您使用介面 bean 定義,您可以新增 @Qualifier
以按名稱注入。以下範例示範如何執行此操作
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
從 2.4.1 版本開始,factory bean 有一個新的屬性 infrastructureCustomizer
,其類型為 KafkaStreamsInfrastructureCustomizer
;這允許在建立串流之前自訂 StreamsBuilder
(例如,新增狀態儲存)和/或 Topology
。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供預設的 no-op 實作,以避免在不需要時實作兩種方法。
提供 CompositeKafkaStreamsInfrastructureCustomizer
,以便在需要套用多個自訂程式時使用。
KafkaStreams Micrometer 支援
在 2.5.3 版本中引入,您可以組態 KafkaStreamsMicrometerListener
,以自動為 factory bean 管理的 KafkaStreams
物件註冊 micrometer 指標
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
串流 JSON 序列化和反序列化
為了在以 JSON 格式讀取或寫入主題或狀態儲存時序列化和反序列化資料,Spring for Apache Kafka 提供了 JsonSerde
實作,它使用 JSON,委派給 序列化、反序列化和訊息轉換 中描述的 JsonSerializer
和 JsonDeserializer
。JsonSerde
實作透過其建構子(目標類型或 ObjectMapper
)提供相同的組態選項。在以下範例中,我們使用 JsonSerde
來序列化和反序列化 Kafka 串流的 Cat
酬載(JsonSerde
可以以類似的方式在需要實例的任何地方使用)
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
當以程式方式建構序列化程式/反序列化程式以在生產者/消費者工廠中使用時,從 2.3 版本開始,您可以使用 fluent API,這簡化了組態。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
使用 KafkaStreamBrancher
KafkaStreamBrancher
類別引入了一種更方便的方式,可以在 KStream
之上建構條件分支。
考慮以下未使用 KafkaStreamBrancher
的範例
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下範例使用 KafkaStreamBrancher
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
組態
若要組態 Kafka Streams 環境,StreamsBuilderFactoryBean
需要 KafkaStreamsConfiguration
實例。請參閱 Apache Kafka 文件,以取得所有可能的選項。
從 2.2 版本開始,串流組態現在以 KafkaStreamsConfiguration 物件而非 StreamsConfig 的形式提供。 |
為了避免在大多數情況下使用樣板程式碼,尤其是在您開發微服務時,Spring for Apache Kafka 提供了 @EnableKafkaStreams
註解,您應將其放置在 @Configuration
類別上。您只需要宣告一個名為 defaultKafkaStreamsConfig
的 KafkaStreamsConfiguration
bean 即可。名為 defaultKafkaStreamsBuilder
的 StreamsBuilderFactoryBean
bean 會自動在應用程式上下文中宣告。您也可以宣告和使用任何額外的 StreamsBuilderFactoryBean
bean。您可以透過提供實作 StreamsBuilderFactoryBeanConfigurer
的 bean 來執行該 bean 的其他自訂。如果有多個這樣的 bean,它們將根據其 Ordered.order
屬性套用。
清除 & 停止組態
當工廠停止時,會使用 2 個參數呼叫 KafkaStreams.close()
-
closeTimeout:等待執行緒關閉的時間長度(預設為設定為 10 秒的
DEFAULT_CLOSE_TIMEOUT
)。可以使用StreamsBuilderFactoryBean.setCloseTimeout()
進行組態。 -
leaveGroupOnClose:觸發來自群組的消費者離開呼叫(預設為
false
)。可以使用StreamsBuilderFactoryBean.setLeaveGroupOnClose()
進行組態。
預設情況下,當 factory bean 停止時,會呼叫 KafkaStreams.cleanUp()
方法。從 2.1.2 版本開始,factory bean 具有額外的建構子,採用 CleanupConfig
物件,該物件具有屬性,可讓您控制是否在 start()
或 stop()
期間或兩者都不呼叫 cleanUp()
方法。從 2.7 版本開始,預設情況下永遠不會清除本機狀態。
標頭豐富器
3.0 版本新增了 ContextualProcessor
的 HeaderEnricherProcessor
擴充功能;提供與已棄用的 HeaderEnricher
相同的功能,後者實作了已棄用的 Transformer
介面。這可以用於在串流處理中新增標頭;標頭值是 SpEL 表達式;表達式評估的根物件具有 3 個屬性
-
record
-org.apache.kafka.streams.processor.api.Record
(key
、value
、timestamp
、headers
) -
key
- 目前記錄的鍵 -
value
- 目前記錄的值 -
context
-ProcessorContext
,允許存取目前記錄中繼資料
表達式必須傳回 byte[]
或 String
(將使用 UTF-8
轉換為 byte[]
)。
若要在串流中使用豐富器
.process(() -> new HeaderEnricherProcessor(expressions))
處理器不會變更 key
或 value
;它只會新增標頭。
每個記錄都需要一個新實例。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
以下是一個簡單的範例,新增一個常值標頭和一個變數
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
3.0 版本新增了 ContextualProcessor
的 MessagingProcessor
擴充功能,提供與已棄用的 MessagingTransformer
相同的功能,後者實作了已棄用的 Transformer
介面。這允許 Kafka Streams 拓撲與 Spring Messaging 組件互動,例如 Spring Integration flow。轉換器需要實作 MessagingFunction
。
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 使用其 GatewayProxyFactoryBean
自動提供實作。它也需要 MessagingMessageConverter
將鍵、值和中繼資料(包括標頭)轉換為/從 Spring Messaging Message<?>
。如需更多資訊,請參閱 [從 KStream
呼叫 Spring Integration Flow]。
從反序列化例外中復原
2.3 版本引入了 RecoveringDeserializationExceptionHandler
,它可以在發生反序列化例外時採取一些動作。請參閱 Kafka 文件中關於 DeserializationExceptionHandler
的說明,其中 RecoveringDeserializationExceptionHandler
是實作之一。RecoveringDeserializationExceptionHandler
使用 ConsumerRecordRecoverer
實作進行組態。框架提供了 DeadLetterPublishingRecoverer
,它會將失敗的記錄傳送到死信主題。如需有關此復原程式的更多資訊,請參閱 發佈死信記錄。
若要組態復原程式,請將以下屬性新增至您的串流組態
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
當然,recoverer()
bean 可以是您自己的 ConsumerRecordRecoverer
實作。
互動式查詢支援
從 3.2 版本開始,Spring for Apache Kafka 提供了 Kafka Streams 中互動式查詢所需的基本設施。互動式查詢在具狀態的 Kafka Streams 應用程式中非常有用,因為它們提供了一種持續查詢應用程式中具狀態儲存的方式。因此,如果應用程式想要具體化正在考慮的系統的目前檢視,互動式查詢提供了一種方法來做到這一點。若要瞭解有關互動式查詢的更多資訊,請參閱此文章。Spring for Apache Kafka 中的支援以名為 KafkaStreamsInteractiveQueryService
的 API 為中心,它是 Kafka Streams 程式庫中互動式查詢 API 的外觀模式。應用程式可以建立此服務的實例作為 bean,然後稍後使用它來依名稱檢索狀態儲存。
以下程式碼片段顯示了一個範例。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
假設 Kafka Streams 應用程式具有名為 app-store
的狀態儲存,則可以透過 KafkStreamsInteractiveQuery
API 檢索該儲存,如下所示。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
一旦應用程式獲得對狀態儲存的存取權,它就可以從中查詢鍵值資訊。
在本例中,應用程式使用的狀態儲存是唯讀鍵值儲存。Kafka Streams 應用程式可以使用其他類型的狀態儲存。例如,如果應用程式偏好查詢基於視窗的儲存,它可以在 Kafka Streams 應用程式業務邏輯中建構該儲存,然後稍後檢索它。基於這個原因,在 KafkaStreamsInteractiveQueryService
中檢索可查詢儲存的 API 具有泛型儲存類型簽章,以便最終使用者可以指派正確的類型。
以下是來自 API 的類型簽章。
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
呼叫此方法時,使用者可以特別要求正確的狀態儲存類型,就像我們在上面的範例中所做的那樣。
重試狀態儲存檢索
嘗試使用 KafkaStreamsInteractiveQueryService
檢索狀態儲存時,狀態儲存可能因各種原因而找不到。如果這些原因是暫時性的,KafkaStreamsInteractiveQueryService
提供了一個選項,透過允許注入自訂 RetryTemplate
來重試狀態儲存的檢索。預設情況下,KafkaStreamsInteractiveQueryService
中使用的 RetryTemmplate
使用最多三次嘗試,並固定退避一秒。
以下是如何將自訂 RetryTemmplate
注入到 KafkaStreamsInteractiveQueryService
中,最多嘗試十次。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
查詢遠端狀態儲存
上面顯示的用於檢索狀態儲存的 API - retrieveQueryableStore
適用於本機可用的鍵值狀態儲存。在生產環境設定中,Kafka Streams 應用程式最有可能根據分割區數量分佈。如果一個主題有四個分割區,並且有四個相同 Kafka Streams 處理器的實例正在執行,那麼每個實例可能負責處理來自該主題的單一分割區。在這種情況下,呼叫 retrieveQueryableStore
可能不會給出實例正在尋找的正確結果,儘管它可能會傳回有效的儲存。假設具有四個分割區的主題具有關於各種鍵的資料,並且單一分割區始終負責特定鍵。如果呼叫 retrieveQueryableStore
的實例正在尋找關於此實例未託管的鍵的資訊,那麼它將不會收到任何資料。這是因為目前的 Kafka Streams 實例對此鍵一無所知。為了修正此問題,呼叫實例首先需要確保它們具有託管特定鍵的 Kafka Streams 處理器實例的主機資訊。這可以從相同 application.id
下的任何 Kafka Streams 實例中檢索,如下所示。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
在上面的範例程式碼中,呼叫實例正在從名為 app-store
的狀態儲存中查詢特定鍵 12345
。API 也需要對應的鍵序列化程式,在本例中為 IntegerSerializer
。Kafka Streams 會在相同 application.id
下的所有實例中尋找,並嘗試找出哪個實例託管此特定鍵,一旦找到,它會將該主機資訊作為 HostInfo
物件傳回。
這就是 API 的外觀
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
當以這種分散式方式使用相同 application.id
的 Kafka Streams 處理器的多個實例時,應用程式應提供 RPC 層,其中可以透過 RPC 端點(例如 REST 端點)查詢狀態儲存。如需更多詳細資訊,請參閱此文章。當使用 Spring for Apache Kafka 時,使用 spring-web 技術非常容易新增基於 Spring 的 REST 端點。一旦有了 REST 端點,那麼就可以使用它從任何 Kafka Streams 實例查詢狀態儲存,前提是實例知道鍵託管所在的 HostInfo
。
如果託管實例的鍵是目前的實例,那麼應用程式不需要呼叫 RPC 機制,而是進行 JVM 內呼叫。但是,問題在於應用程式可能不知道發出呼叫的實例是鍵託管的位置,因為特定伺服器可能會因消費者重新平衡而遺失分割區。為了修正此問題,KafkaStreamsInteractiveQueryService
提供了一個方便的 API,用於透過 API 方法 getCurrentKafkaStreamsApplicationHostInfo()
查詢目前的主機資訊,該方法會傳回目前的 HostInfo
。想法是應用程式可以先取得有關鍵在哪裡持有的資訊,然後將 HostInfo
與有關目前實例的資訊進行比較。如果 HostInfo
資料符合,那麼它可以透過 retrieveQueryableStore
繼續進行簡單的 JVM 呼叫,否則使用 RPC 選項。
Kafka Streams 範例
以下範例結合了我們在本章中涵蓋的各種主題
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}