組態選項

本節包含 Kafka Streams binder 使用的組態選項。

如需與 binder 相關的通用組態選項和屬性,請參閱核心文件

Kafka Streams Binder 屬性

以下屬性在 binder 層級可用,且必須以 spring.cloud.stream.kafka.streams.binder. 為前綴。任何在 Kafka Streams binder 中重複使用的 Kafka binder 提供屬性,都必須以 spring.cloud.stream.kafka.streams.binder 而非 spring.cloud.stream.kafka.binder 為前綴。此規則的唯一例外是定義 Kafka bootstrap server 屬性,在這種情況下,任一前綴皆可使用。

configuration

包含與 Apache Kafka Streams API 相關屬性的鍵/值對應 Map。此屬性必須以 spring.cloud.stream.kafka.streams.binder. 為前綴。以下是一些使用此屬性的範例。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

如需關於可能進入 streams 組態的所有屬性的更多資訊,請參閱 Apache Kafka Streams 文件中的 StreamsConfig JavaDocs。所有您可以從 StreamsConfig 設定的組態,都可以透過此屬性設定。當使用此屬性時,它適用於整個應用程式,因為這是 binder 層級的屬性。如果應用程式中有一個以上的處理器,則所有處理器都將獲取這些屬性。對於諸如 application.id 之類的屬性,這將變得有問題,因此您必須仔細檢查如何使用此 binder 層級 configuration 屬性映射 StreamsConfig 中的屬性。

functions.<function-bean-name>.applicationId

僅適用於函式樣式處理器。這可用於設定應用程式中每個函式的應用程式 ID。在多個函式的情況下,這是設定應用程式 ID 的便捷方式。

functions.<function-bean-name>.configuration

僅適用於函式樣式處理器。包含與 Apache Kafka Streams API 相關屬性的鍵/值對應 Map。這與上面描述的 binder 層級 configuration 屬性類似,但此層級的 configuration 屬性僅限於指定的函式。當您有多個處理器,並且想要根據特定函式限制對組態的存取時,您可能需要使用此屬性。所有 StreamsConfig 屬性都可以在此處使用。

brokers

Broker URL

預設值:localhost

zkNodes

Zookeeper URL

預設值:localhost

deserializationExceptionHandler

反序列化錯誤處理器類型。此處理器在 binder 層級應用,因此適用於應用程式中的所有輸入綁定。有一種方法可以在消費者綁定層級以更精細的方式控制它。可能的值為 - logAndContinuelogAndFailskipAndContinuesendToDlq

預設值:logAndFail

applicationId

在 binder 層級全域設定 Kafka Streams 應用程式的 application.id 的便捷方式。如果應用程式包含多個函式,則應用程式 ID 應設定為不同值。請參閱上面詳細討論設定應用程式 ID 的部分。

預設值:應用程式將產生靜態應用程式 ID。請參閱應用程式 ID 章節以取得更多詳細資訊。

stateStoreRetry.maxAttempts

嘗試連線到狀態儲存的最大嘗試次數。

預設值:1

stateStoreRetry.backoffPeriod

在重試時嘗試連線到狀態儲存的回退期間。

預設值:1000 毫秒

consumerProperties

binder 層級的任意消費者屬性。

producerProperties

binder 層級的任意生產者屬性。

includeStoppedProcessorsForHealthCheck

當處理器的綁定通過 actuator 停止時,預設情況下,此處理器將不參與健康檢查。將此屬性設定為 true 以啟用所有處理器的健康檢查,包括目前通過綁定 actuator 端點停止的處理器。

預設值:false

Kafka Streams 生產者屬性

以下屬性適用於 Kafka Streams 生產者,且必須以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 為前綴。為了方便起見,如果有多個輸出綁定且它們都要求一個通用值,則可以使用前綴 spring.cloud.stream.kafka.streams.default.producer. 來組態。

keySerde

要使用的 key serde

預設值:請參閱上面關於訊息反/序列化的討論

valueSerde

要使用的 value serde

預設值:請參閱上面關於訊息反/序列化的討論

useNativeEncoding

啟用/停用原生編碼的標誌

預設值:true

streamPartitionerBeanName

要在消費者端使用的自訂出站分割器 bean 名稱。應用程式可以提供自訂的 StreamPartitioner 作為 Spring bean,並且可以將此 bean 的名稱提供給生產者以取代預設分割器。

預設值:請參閱上面關於出站分割支援的討論。

producedAs

處理器生產到的 sink 元件的自訂名稱。

預設值:none (由 Kafka Streams 產生)

Kafka Streams 消費者屬性

以下屬性適用於 Kafka Streams 消費者,且必須以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 為前綴。為了方便起見,如果有多個輸入綁定且它們都要求一個通用值,則可以使用前綴 spring.cloud.stream.kafka.streams.default.consumer. 來組態。

applicationId

設定每個輸入綁定的 application.id。

預設值:請參閱上方。

keySerde

要使用的 key serde

預設值:請參閱上面關於訊息反/序列化的討論

valueSerde

要使用的 value serde

預設值:請參閱上面關於訊息反/序列化的討論

materializedAs

使用傳入的 KTable 類型時要實現的狀態儲存

預設值:none

useNativeDecoding

啟用/停用原生解碼的標誌

預設值:true

dlqName

DLQ 主題名稱。

預設值:請參閱上面關於錯誤處理和 DLQ 的討論。

startOffset

如果沒有已提交的偏移量可供消費,則從此偏移量開始。這主要用於消費者首次從主題消費時。Kafka Streams 使用 earliest 作為預設策略,而 binder 也使用相同的預設值。可以使用此屬性將其覆寫為 latest

預設值:earliest

注意:在消費者上使用 resetOffsets 對 Kafka Streams binder 沒有任何影響。與基於訊息通道的 binder 不同,Kafka Streams binder 不會根據需求搜尋到開頭或結尾。

deserializationExceptionHandler

反序列化錯誤處理器類型。此處理器是針對每個消費者綁定應用的,而不是之前描述的 binder 層級屬性。可能的值為 - logAndContinuelogAndFailskipAndContinuesendToDlq

預設值:logAndFail

timestampExtractorBeanName

要在消費者端使用的特定時間戳記提取器 bean 名稱。應用程式可以提供 TimestampExtractor 作為 Spring bean,並且可以將此 bean 的名稱提供給消費者以取代預設提取器。

預設值:請參閱上面關於時間戳記提取器的討論。

eventTypes

此綁定支援的事件類型逗號分隔清單。

預設值:none

eventTypeHeaderKey

通過此綁定傳入的每個記錄上的事件類型標頭鍵。

預設值:event_type

consumedAs

處理器從中消費的 source 元件的自訂名稱。

預設值:none (由 Kafka Streams 產生)

關於並行的特別注意事項

在 Kafka Streams 中,您可以使用 num.stream.threads 屬性控制處理器可以建立的執行緒數。您可以通過上面在 binder、函式、生產者或消費者層級下描述的各種 configuration 選項來執行此操作。您也可以使用核心 Spring Cloud Stream 為此目的提供的 concurrency 屬性。當使用此屬性時,您需要在消費者端使用它。當您有多個輸入綁定時,請在第一個輸入綁定上設定此屬性。例如,當設定 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 時,binder 會將其轉換為 num.stream.threads。如果您有多個處理器,且一個處理器定義了綁定層級的並行性,但其他處理器沒有,則那些沒有綁定層級並行性的處理器將預設回通過 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的 binder 範圍屬性。如果此 binder 組態不可用,則應用程式將使用 Kafka Streams 設定的預設值。