程式設計模型的輔助功能

單一應用程式內的多個 Kafka Streams 處理器

Binder 允許在單一 Spring Cloud Stream 應用程式內擁有多個 Kafka Streams 處理器。您可以擁有如下的應用程式。

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在這種情況下,binder 將建立 3 個具有不同應用程式 ID 的獨立 Kafka Streams 物件(更多資訊如下)。但是,如果應用程式中有超過一個處理器,您必須告知 Spring Cloud Stream 哪些函數需要啟用。以下是如何啟用函數。

spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess

如果您希望某些函數不要立即啟用,您可以從此列表中移除該函數。

當您在同一個應用程式中擁有單一 Kafka Streams 處理器和其他類型的 Function Bean 時,情況也是如此,這些 Bean 是透過不同的 binder 處理的(例如,基於常規 Kafka 訊息通道 binder 的函數 Bean)

Kafka Streams 應用程式 ID

應用程式 ID 是您需要為 Kafka Streams 應用程式提供的強制屬性。Spring Cloud Stream Kafka Streams binder 允許您透過多種方式組態此應用程式 ID。

如果您的應用程式中只有一個處理器,那麼您可以使用以下屬性在 binder 層級設定此 ID

spring.cloud.stream.kafka.streams.binder.applicationId.

為了方便起見,如果您只有一個處理器,您也可以使用 spring.application.name 作為屬性來委派應用程式 ID。

如果您的應用程式中有多個 Kafka Streams 處理器,那麼您需要為每個處理器設定應用程式 ID。在函數式模型的情況下,您可以將其作為屬性附加到每個函數。

例如,假設您有以下函數。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然後,您可以使用以下 binder 層級屬性為每個函數設定應用程式 ID。

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

對於基於函數的模型,在綁定層級設定應用程式 ID 的這種方法也有效。但是,如果您使用函數式模型,則如我們在上面看到的在 binder 層級為每個函數設定應用程式 ID 會更容易得多。

對於生產部署,強烈建議透過組態明確指定應用程式 ID。如果您要自動擴展應用程式,這種做法尤其重要,在這種情況下,您需要確保使用相同的應用程式 ID 部署每個實例。

如果應用程式未提供應用程式 ID,則 binder 會自動為您產生靜態應用程式 ID。這在開發情境中很方便,因為它避免了明確提供應用程式 ID 的需要。以這種方式產生的應用程式 ID 在應用程式重新啟動後將保持靜態。在函數式模型的情況下,產生的應用程式 ID 將是函數 Bean 名稱,後跟字面文字 applicationID,例如,如果 process 是函數 Bean 名稱,則為 process-applicationID

設定應用程式 ID 摘要

  • 預設情況下,binder 將為每個函數方法自動產生應用程式 ID。

  • 如果您只有一個處理器,那麼您可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationId

  • 如果您有多個處理器,則可以使用屬性為每個函數設定應用程式 ID - spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId

使用函數式樣式覆寫 binder 產生的預設綁定名稱

預設情況下,當使用函數式樣式時,binder 使用上面討論的策略來產生綁定名稱,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想要覆寫這些綁定名稱,您可以透過指定以下屬性來實現。

spring.cloud.stream.function.bindings.<default binding name>。預設綁定名稱是 binder 產生的原始綁定名稱。

例如,假設您有以下函數。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 將產生名稱為 process-in-0process-in-1process-out-0 的綁定。現在,如果您想將它們完全更改為其他名稱,也許是更特定於網域的綁定名稱,那麼您可以如下操作。

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

spring.cloud.stream.function.bindings.process-out-0=clicks

之後,您必須在這些新綁定名稱上設定所有綁定層級屬性。

請記住,使用上面描述的函數式程式設計模型,在大多數情況下,堅持預設綁定名稱是有意義的。您仍然可能想要進行此覆寫的唯一原因是當您有大量組態屬性並且想要將綁定對應到更易於網域理解的名稱時。

設定啟動伺服器組態

執行 Kafka Streams 應用程式時,您必須提供 Kafka broker 伺服器資訊。如果您未提供此資訊,binder 會預期您正在預設的 localhost:9092 上執行 broker。如果不是這種情況,那麼您需要覆寫它。有幾種方法可以做到這一點。

  • 使用啟動屬性 - spring.kafka.bootstrapServers

  • Binder 層級屬性 - spring.cloud.stream.kafka.streams.binder.brokers

就 binder 層級屬性而言,如果您使用透過常規 Kafka binder 提供的 broker 屬性 - spring.cloud.stream.kafka.binder.brokers,則沒有關係。Kafka Streams binder 將首先檢查是否設定了 Kafka Streams binder 特定的 broker 屬性 (spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,則會尋找 spring.cloud.stream.kafka.binder.brokers