Apache Kafka Streams 支援

從 1.1.4 版本開始,Spring for Apache Kafka 為 Kafka Streams 提供一流的支援。若要從 Spring 應用程式中使用它,kafka-streams jar 必須存在於類別路徑中。它是 Spring for Apache Kafka 專案的可選相依性,不會以傳遞方式下載。

基礎知識

參考 Apache Kafka Streams 文件建議以下使用 API 的方式

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我們有兩個主要組件

  • StreamsBuilder:具有建構 KStream(或 KTable)實例的 API。

  • KafkaStreams:用於管理這些實例的生命週期。

由單一 StreamsBuilder 暴露給 KafkaStreams 實例的所有 KStream 實例都會同時啟動和停止,即使它們具有不同的邏輯。換句話說,由 StreamsBuilder 定義的所有串流都與單一生命週期控制繫結。一旦 KafkaStreams 實例被 streams.close() 關閉,就無法重新啟動。相反地,必須建立新的 KafkaStreams 實例才能重新啟動串流處理。

Spring 管理

為了簡化從 Spring 應用程式上下文角度使用 Kafka Streams,並透過容器使用生命週期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。這是 AbstractFactoryBean 實作,用於將 StreamsBuilder 單例實例公開為 bean。以下範例建立這樣的 bean

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
從 2.2 版本開始,串流組態現在以 KafkaStreamsConfiguration 物件而非 StreamsConfig 的形式提供。

StreamsBuilderFactoryBean 也實作了 SmartLifecycle 來管理內部 KafkaStreams 實例的生命週期。與 Kafka Streams API 類似,您必須在啟動 KafkaStreams 之前定義 KStream 實例。這也適用於 Kafka Streams 的 Spring API。因此,當您在 StreamsBuilderFactoryBean 上使用預設的 autoStartup = true 時,您必須在應用程式上下文重新整理之前在 StreamsBuilder 上宣告 KStream 實例。例如,KStream 可以是常規的 bean 定義,同時 Kafka Streams API 的使用不會產生任何影響。以下範例示範如何執行此操作

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果您想手動控制生命週期(例如,透過某些條件停止和啟動),您可以透過使用 factory bean (&) 前綴 直接參考 StreamsBuilderFactoryBean bean。由於 StreamsBuilderFactoryBean 使用其內部 KafkaStreams 實例,因此可以安全地再次停止和重新啟動它。每次 start() 都會建立新的 KafkaStreams。如果您想分別控制 KStream 實例的生命週期,您也可以考慮使用不同的 StreamsBuilderFactoryBean 實例。

您也可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener 選項,這些選項會委派給內部 KafkaStreams 實例。此外,除了間接在 StreamsBuilderFactoryBean 上設定這些選項之外,從版本 2.1.5 開始,您可以使用 KafkaStreamsCustomizer 回呼介面來組態內部 KafkaStreams 實例。請注意,KafkaStreamsCustomizer 會覆寫 StreamsBuilderFactoryBean 提供的選項。如果您需要直接執行某些 KafkaStreams 操作,您可以使用 StreamsBuilderFactoryBean.getKafkaStreams() 存取該內部 KafkaStreams 實例。您可以依類型自動裝配 StreamsBuilderFactoryBean bean,但您應確保在 bean 定義中使用完整類型,如下列範例所示

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,如果您使用介面 bean 定義,您可以新增 @Qualifier 以按名稱注入。以下範例示範如何執行此操作

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

從 2.4.1 版本開始,factory bean 有一個新的屬性 infrastructureCustomizer,其類型為 KafkaStreamsInfrastructureCustomizer;這允許在建立串流之前自訂 StreamsBuilder(例如,新增狀態儲存)和/或 Topology

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

提供預設的 no-op 實作,以避免在不需要時實作兩種方法。

提供 CompositeKafkaStreamsInfrastructureCustomizer,以便在需要套用多個自訂程式時使用。

KafkaStreams Micrometer 支援

在 2.5.3 版本中引入,您可以組態 KafkaStreamsMicrometerListener,以自動為 factory bean 管理的 KafkaStreams 物件註冊 micrometer 指標

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

串流 JSON 序列化和反序列化

為了在以 JSON 格式讀取或寫入主題或狀態儲存時序列化和反序列化資料,Spring for Apache Kafka 提供了 JsonSerde 實作,它使用 JSON,委派給 序列化、反序列化和訊息轉換 中描述的 JsonSerializerJsonDeserializerJsonSerde 實作透過其建構子(目標類型或 ObjectMapper)提供相同的組態選項。在以下範例中,我們使用 JsonSerde 來序列化和反序列化 Kafka 串流的 Cat 酬載(JsonSerde 可以以類似的方式在需要實例的任何地方使用)

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

當以程式方式建構序列化程式/反序列化程式以在生產者/消費者工廠中使用時,從 2.3 版本開始,您可以使用 fluent API,這簡化了組態。

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

使用 KafkaStreamBrancher

KafkaStreamBrancher 類別引入了一種更方便的方式,可以在 KStream 之上建構條件分支。

考慮以下未使用 KafkaStreamBrancher 的範例

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下範例使用 KafkaStreamBrancher

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

組態

若要組態 Kafka Streams 環境,StreamsBuilderFactoryBean 需要 KafkaStreamsConfiguration 實例。請參閱 Apache Kafka 文件,以取得所有可能的選項。

從 2.2 版本開始,串流組態現在以 KafkaStreamsConfiguration 物件而非 StreamsConfig 的形式提供。

為了避免在大多數情況下使用樣板程式碼,尤其是在您開發微服務時,Spring for Apache Kafka 提供了 @EnableKafkaStreams 註解,您應將其放置在 @Configuration 類別上。您只需要宣告一個名為 defaultKafkaStreamsConfigKafkaStreamsConfiguration bean 即可。名為 defaultKafkaStreamsBuilderStreamsBuilderFactoryBean bean 會自動在應用程式上下文中宣告。您也可以宣告和使用任何額外的 StreamsBuilderFactoryBean bean。您可以透過提供實作 StreamsBuilderFactoryBeanConfigurer 的 bean 來執行該 bean 的其他自訂。如果有多個這樣的 bean,它們將根據其 Ordered.order 屬性套用。

清除 & 停止組態

當工廠停止時,會使用 2 個參數呼叫 KafkaStreams.close()

  • closeTimeout:等待執行緒關閉的時間長度(預設為設定為 10 秒的 DEFAULT_CLOSE_TIMEOUT)。可以使用 StreamsBuilderFactoryBean.setCloseTimeout() 進行組態。

  • leaveGroupOnClose:觸發來自群組的消費者離開呼叫(預設為 false)。可以使用 StreamsBuilderFactoryBean.setLeaveGroupOnClose() 進行組態。

預設情況下,當 factory bean 停止時,會呼叫 KafkaStreams.cleanUp() 方法。從 2.1.2 版本開始,factory bean 具有額外的建構子,採用 CleanupConfig 物件,該物件具有屬性,可讓您控制是否在 start()stop() 期間或兩者都不呼叫 cleanUp() 方法。從 2.7 版本開始,預設情況下永遠不會清除本機狀態。

標頭豐富器

3.0 版本新增了 ContextualProcessorHeaderEnricherProcessor 擴充功能;提供與已棄用的 HeaderEnricher 相同的功能,後者實作了已棄用的 Transformer 介面。這可以用於在串流處理中新增標頭;標頭值是 SpEL 表達式;表達式評估的根物件具有 3 個屬性

  • record - org.apache.kafka.streams.processor.api.Record (keyvaluetimestampheaders)

  • key - 目前記錄的鍵

  • value - 目前記錄的值

  • context - ProcessorContext,允許存取目前記錄中繼資料

表達式必須傳回 byte[]String(將使用 UTF-8 轉換為 byte[])。

若要在串流中使用豐富器

.process(() -> new HeaderEnricherProcessor(expressions))

處理器不會變更 keyvalue;它只會新增標頭。

每個記錄都需要一個新實例。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

以下是一個簡單的範例,新增一個常值標頭和一個變數

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

3.0 版本新增了 ContextualProcessorMessagingProcessor 擴充功能,提供與已棄用的 MessagingTransformer 相同的功能,後者實作了已棄用的 Transformer 介面。這允許 Kafka Streams 拓撲與 Spring Messaging 組件互動,例如 Spring Integration flow。轉換器需要實作 MessagingFunction

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration 使用其 GatewayProxyFactoryBean 自動提供實作。它也需要 MessagingMessageConverter 將鍵、值和中繼資料(包括標頭)轉換為/從 Spring Messaging Message<?>。如需更多資訊,請參閱 [從 KStream 呼叫 Spring Integration Flow]。

從反序列化例外中復原

2.3 版本引入了 RecoveringDeserializationExceptionHandler,它可以在發生反序列化例外時採取一些動作。請參閱 Kafka 文件中關於 DeserializationExceptionHandler 的說明,其中 RecoveringDeserializationExceptionHandler 是實作之一。RecoveringDeserializationExceptionHandler 使用 ConsumerRecordRecoverer 實作進行組態。框架提供了 DeadLetterPublishingRecoverer,它會將失敗的記錄傳送到死信主題。如需有關此復原程式的更多資訊,請參閱 發佈死信記錄

若要組態復原程式,請將以下屬性新增至您的串流組態

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

當然,recoverer() bean 可以是您自己的 ConsumerRecordRecoverer 實作。

互動式查詢支援

從 3.2 版本開始,Spring for Apache Kafka 提供了 Kafka Streams 中互動式查詢所需的基本設施。互動式查詢在具狀態的 Kafka Streams 應用程式中非常有用,因為它們提供了一種持續查詢應用程式中具狀態儲存的方式。因此,如果應用程式想要具體化正在考慮的系統的目前檢視,互動式查詢提供了一種方法來做到這一點。若要瞭解有關互動式查詢的更多資訊,請參閱此文章。Spring for Apache Kafka 中的支援以名為 KafkaStreamsInteractiveQueryService 的 API 為中心,它是 Kafka Streams 程式庫中互動式查詢 API 的外觀模式。應用程式可以建立此服務的實例作為 bean,然後稍後使用它來依名稱檢索狀態儲存。

以下程式碼片段顯示了一個範例。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

假設 Kafka Streams 應用程式具有名為 app-store 的狀態儲存,則可以透過 KafkStreamsInteractiveQuery API 檢索該儲存,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

一旦應用程式獲得對狀態儲存的存取權,它就可以從中查詢鍵值資訊。

在本例中,應用程式使用的狀態儲存是唯讀鍵值儲存。Kafka Streams 應用程式可以使用其他類型的狀態儲存。例如,如果應用程式偏好查詢基於視窗的儲存,它可以在 Kafka Streams 應用程式業務邏輯中建構該儲存,然後稍後檢索它。基於這個原因,在 KafkaStreamsInteractiveQueryService 中檢索可查詢儲存的 API 具有泛型儲存類型簽章,以便最終使用者可以指派正確的類型。

以下是來自 API 的類型簽章。

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

呼叫此方法時,使用者可以特別要求正確的狀態儲存類型,就像我們在上面的範例中所做的那樣。

重試狀態儲存檢索

嘗試使用 KafkaStreamsInteractiveQueryService 檢索狀態儲存時,狀態儲存可能因各種原因而找不到。如果這些原因是暫時性的,KafkaStreamsInteractiveQueryService 提供了一個選項,透過允許注入自訂 RetryTemplate 來重試狀態儲存的檢索。預設情況下,KafkaStreamsInteractiveQueryService 中使用的 RetryTemmplate 使用最多三次嘗試,並固定退避一秒。

以下是如何將自訂 RetryTemmplate 注入到 KafkaStreamsInteractiveQueryService 中,最多嘗試十次。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

查詢遠端狀態儲存

上面顯示的用於檢索狀態儲存的 API - retrieveQueryableStore 適用於本機可用的鍵值狀態儲存。在生產環境設定中,Kafka Streams 應用程式最有可能根據分割區數量分佈。如果一個主題有四個分割區,並且有四個相同 Kafka Streams 處理器的實例正在執行,那麼每個實例可能負責處理來自該主題的單一分割區。在這種情況下,呼叫 retrieveQueryableStore 可能不會給出實例正在尋找的正確結果,儘管它可能會傳回有效的儲存。假設具有四個分割區的主題具有關於各種鍵的資料,並且單一分割區始終負責特定鍵。如果呼叫 retrieveQueryableStore 的實例正在尋找關於此實例未託管的鍵的資訊,那麼它將不會收到任何資料。這是因為目前的 Kafka Streams 實例對此鍵一無所知。為了修正此問題,呼叫實例首先需要確保它們具有託管特定鍵的 Kafka Streams 處理器實例的主機資訊。這可以從相同 application.id 下的任何 Kafka Streams 實例中檢索,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

在上面的範例程式碼中,呼叫實例正在從名為 app-store 的狀態儲存中查詢特定鍵 12345。API 也需要對應的鍵序列化程式,在本例中為 IntegerSerializer。Kafka Streams 會在相同 application.id 下的所有實例中尋找,並嘗試找出哪個實例託管此特定鍵,一旦找到,它會將該主機資訊作為 HostInfo 物件傳回。

這就是 API 的外觀

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

當以這種分散式方式使用相同 application.id 的 Kafka Streams 處理器的多個實例時,應用程式應提供 RPC 層,其中可以透過 RPC 端點(例如 REST 端點)查詢狀態儲存。如需更多詳細資訊,請參閱此文章。當使用 Spring for Apache Kafka 時,使用 spring-web 技術非常容易新增基於 Spring 的 REST 端點。一旦有了 REST 端點,那麼就可以使用它從任何 Kafka Streams 實例查詢狀態儲存,前提是實例知道鍵託管所在的 HostInfo

如果託管實例的鍵是目前的實例,那麼應用程式不需要呼叫 RPC 機制,而是進行 JVM 內呼叫。但是,問題在於應用程式可能不知道發出呼叫的實例是鍵託管的位置,因為特定伺服器可能會因消費者重新平衡而遺失分割區。為了修正此問題,KafkaStreamsInteractiveQueryService 提供了一個方便的 API,用於透過 API 方法 getCurrentKafkaStreamsApplicationHostInfo() 查詢目前的主機資訊,該方法會傳回目前的 HostInfo。想法是應用程式可以先取得有關鍵在哪裡持有的資訊,然後將 HostInfo 與有關目前實例的資訊進行比較。如果 HostInfo 資料符合,那麼它可以透過 retrieveQueryableStore 繼續進行簡單的 JVM 呼叫,否則使用 RPC 選項。

Kafka Streams 範例

以下範例結合了我們在本章中涵蓋的各種主題

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}