輸出綁定的分割區支援

Kafka Streams 處理器通常將處理後的輸出發送到出站 Kafka 主題。如果出站主題已分割,且處理器需要將傳出的資料發送到特定的分割區,則應用程式需要提供 StreamPartitioner 類型的 bean。詳情請參閱 StreamPartitioner。讓我們看一些範例。

這是我們已經看過多次的相同處理器,

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

這是輸出綁定目的地

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主題 outputTopic 有 4 個分割區,如果您沒有提供分割策略,Kafka Streams 將使用預設分割策略,這可能不是您想要的結果,具體取決於特定的使用案例。假設您想要將任何符合 spring 的金鑰發送到分割區 0,將 cloud 發送到分割區 1,將 stream 發送到分割區 2,以及將其他所有內容發送到分割區 3。這是您需要在應用程式中執行的操作。

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

這是一個基本的實作,但是,您可以存取記錄的金鑰/值、主題名稱和分割區總數。因此,如果需要,您可以實作複雜的分割策略。

您還需要提供此 bean 名稱以及應用程式組態。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

應用程式中的每個輸出主題都需要像這樣單獨配置。