使用 Kafka Streams 基礎綁定器和常規 Kafka 綁定器的多重綁定器
您可以有一個應用程式,其中同時具有基於常規 Kafka 綁定器的函數/消費者/供應商和基於 Kafka Streams 的處理器。但是,您不能在單個函數或消費者中混合使用它們。
這是一個範例,其中在同一個應用程式中同時具有基於綁定器的組件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
這是配置中的相關部分
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您的應用程式與上述相同,但正在處理兩個不同的 Kafka 集群,情況會變得有點複雜,例如,常規 `process` 同時作用於 Kafka 集群 1 和集群 2(從集群 1 接收資料並發送到集群 2),而 Kafka Streams 處理器作用於 Kafka 集群 2。那麼您必須使用 Spring Cloud Stream 提供的多重綁定器功能。
以下是您的配置在該情境下可能發生的變化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
請注意以上配置。我們有兩種綁定器類型,但總共有 3 個綁定器,第一個是基於集群 1 的常規 Kafka 綁定器 (`kafka1`),然後是另一個基於集群 2 的 Kafka 綁定器 (`kafka2`),最後是 `kstream` 綁定器 (`kafka3`)。應用程式中的第一個處理器從 `kafka1` 接收資料並發布到 `kafka2`,其中兩個綁定器都基於常規 Kafka 綁定器,但集群不同。第二個處理器,即 Kafka Streams 處理器,從 `kafka3` 消費資料,`kafka3` 與 `kafka2` 是同一個集群,但綁定器類型不同。
由於 Kafka Streams 系列綁定器中有三種不同的綁定器類型 - `kstream`、`ktable` 和 `globalktable` - 如果您的應用程式有多個基於其中任何一種綁定器的綁定,則需要明確提供作為綁定器類型。
例如,如果您有如下處理器:
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
那麼,這必須在多重綁定器情境中配置如下。請注意,這僅在您擁有真正的多重綁定器情境時才需要,其中有多個處理器在單個應用程式中處理多個集群。在這種情況下,需要明確為綁定提供綁定器,以區分於其他處理器的綁定器類型和集群。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.