組態選項

本節包含 Apache Kafka Binder 使用的組態選項。

關於 Binder 的常見組態選項與屬性,請參閱核心文件中的繫結屬性

Kafka Binder 屬性

spring.cloud.stream.kafka.binder.brokers

Kafka binder 連接的 brokers 伺服器清單。

預設值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允許指定包含或不包含 port 資訊的主機 (例如,host1,host2:port2)。當 broker 清單中未組態 port 時,此設定會設定預設 port。

預設值:9092

spring.cloud.stream.kafka.binder.configuration

用戶端屬性 (生產者和消費者) 的 Key/Value 對應,傳遞至 binder 建立的所有用戶端。由於這些屬性同時被生產者和消費者使用,因此使用上應限制於通用屬性,例如安全性設定。透過此組態提供的未知 Kafka 生產者或消費者屬性會被過濾掉,且不允許傳播。此處的屬性會取代 boot 中設定的任何屬性。

預設值:空對應。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 用戶端消費者屬性的 Key/Value 對應。除了支援已知的 Kafka 消費者屬性外,此處也允許未知的消費者屬性。此處的屬性會取代 boot 和上述 configuration 屬性中設定的任何屬性。

預設值:空對應。

spring.cloud.stream.kafka.binder.headers

binder 傳輸的自訂標頭清單。僅在與較舊的應用程式 (⇐ 1.3.x) 通訊且 kafka-clients 版本 < 0.11.0.0 時才需要。較新版本原生支援標頭。

預設值:空。

spring.cloud.stream.kafka.binder.healthTimeout

等待取得分割區資訊的時間 (秒)。如果此計時器過期,健康檢查會報告為失敗。

預設值:10。

spring.cloud.stream.kafka.binder.requiredAcks

broker 上所需的應答數。請參閱 Kafka 文件中關於生產者 acks 屬性的說明。

預設值:1

spring.cloud.stream.kafka.binder.minPartitionCount

僅在設定 autoCreateTopicsautoAddPartitions 時有效。binder 在其生產或消費資料的主題上組態的全域最小分割區數。它可以被生產者的 partitionCount 設定或生產者的 instanceCount * concurrency 設定值 (以較大者為準) 取代。

預設值:1

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 用戶端生產者屬性的 Key/Value 對應。除了支援已知的 Kafka 生產者屬性外,此處也允許未知的生產者屬性。此處的屬性會取代 boot 和上述 configuration 屬性中設定的任何屬性。

預設值:空對應。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 處於啟用狀態,則自動建立主題的複寫因數。可以在每個繫結上覆寫。

如果您使用的是 2.4 之前的 Kafka broker 版本,則此值應設定為至少 1。從 3.0.8 版本開始,binder 使用 -1 作為預設值,表示 broker 'default.replication.factor' 屬性將用於決定複本數。請諮詢您的 Kafka broker 管理員,以了解是否有要求最小複寫因數的政策,如果有的話,通常 default.replication.factor 會符合該值,且應使用 -1,除非您需要大於最小值的複寫因數。

預設值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

若設為 true,binder 會自動建立新主題。若設為 false,binder 會依賴已組態的主題。在後一種情況下,如果主題不存在,binder 將無法啟動。

此設定與 broker 的 auto.create.topics.enable 設定無關,且不會影響它。如果伺服器設定為自動建立主題,則可能會在 metadata 擷取請求中建立主題,並使用預設 broker 設定。

預設值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

若設為 true,binder 會在需要時建立新的分割區。若設為 false,binder 會依賴已組態的主題分割區大小。如果目標主題的分割區計數小於預期值,binder 將無法啟動。

預設值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在 binder 中啟用交易。請參閱 Kafka 文件中的 transaction.id 以及 spring-kafka 文件中的交易。啟用交易後,個別 producer 屬性會被忽略,且所有生產者都會使用 spring.cloud.stream.kafka.binder.transaction.producer.* 屬性。

預設值 null (無交易)

spring.cloud.stream.kafka.binder.transaction.producer.*

事務性 binder 中生產者的全域生產者屬性。請參閱 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生產者屬性以及所有 binder 支援的通用生產者屬性。

預設值:請參閱個別生產者屬性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用於將 spring-messaging 標頭對應至 Kafka 標頭和從 Kafka 標頭對應的 KafkaHeaderMapper 的 bean 名稱。例如,如果您想要在為標頭使用 JSON 反序列化的 BinderHeaderMapper bean 中自訂受信任套件,請使用此屬性。如果未使用此屬性將此自訂 BinderHeaderMapper bean 提供給 binder,則 binder 將尋找名稱為 kafkaBinderHeaderMapper 且類型為 BinderHeaderMapper 的標頭對應器 bean,然後再回退到 binder 建立的預設 BinderHeaderMapper

預設值:無。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

當主題上的任何分割區 (無論哪個消費者正在從中接收資料) 發現沒有 leader 時,將 binder 健康狀態設定為 down 的旗標。

預設值:true

spring.cloud.stream.kafka.binder.certificateStoreDirectory

當 truststore 或 keystore 憑證位置作為非本機檔案系統資源 (org.springframework.core.io.Resource 支援的資源,例如 CLASSPATH、HTTP 等) 給定時,binder 會將資源從路徑 (可轉換為 org.springframework.core.io.Resource) 複製到檔案系統上的位置。broker 層級憑證 (ssl.truststore.locationssl.keystore.location) 以及用於 schema registry 的憑證 (schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location) 皆是如此。請記住,truststore 和 keystore 位置路徑必須在 spring.cloud.stream.kafka.binder.configuration…​ 下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.locationspring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location 等。檔案將複製到指定為此屬性值的 location,該位置必須是檔案系統上應用程式執行程序可寫入的現有目錄。如果未設定此值,且憑證檔案是非本機檔案系統資源,則會將其複製到 System 的 temp 目錄,如 System.getProperty("java.io.tmpdir") 所傳回。如果此值存在,但檔案系統上找不到該目錄或該目錄不可寫入,情況亦是如此。

預設值:無。

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

若設為 true,則每次存取指標時,都會計算每個消費者主題的偏移量延遲指標。若設為 false,則僅使用定期計算的偏移量延遲。

預設值:true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

計算每個消費者主題偏移量延遲的間隔。當 metrics.defaultOffsetLagMetricsEnabled 停用或其計算時間過長時,會使用此值。

預設值:60 秒

spring.cloud.stream.kafka.binder.enableObservation

在此 binder 中的所有繫結上啟用 Micrometer observation registry。

預設值:false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator metadata 消費者 group.id。此消費者由 HealthIndicator 用於查詢使用中主題的 metadata。

預設值:無。

Kafka 消費者屬性

以下屬性僅適用於 Kafka 消費者,且必須以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 為前綴。

為了避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.consumer.<property>=<value>
admin.configuration

自 2.1.1 版起,此屬性已被取代,改用 topic.properties,且未來版本將移除對其的支援。

admin.replicas-assignment

自 2.1.1 版起,此屬性已被取代,改用 topic.replicas-assignment,且未來版本將移除對其的支援。

admin.replication-factor

自 2.1.1 版起,此屬性已被取代,改用 topic.replication-factor,且未來版本將移除對其的支援。

autoRebalanceEnabled

true 時,主題分割區會在消費者群組的成員之間自動重新平衡。當 false 時,每個消費者都會根據 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 指派一組固定的分割區。這需要每個啟動的實例都適當地設定 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 屬性。在這種情況下,spring.cloud.stream.instanceCount 屬性的值通常必須大於 1。

預設值:true

ackEachRecord

autoCommitOffsettrue 時,此設定決定是否在處理完每筆記錄後提交偏移量。預設情況下,偏移量會在處理完 consumer.poll() 傳回的批次記錄中的所有記錄後提交。可以使用 Kafka 屬性 max.poll.records 控制輪詢傳回的記錄數,該屬性是透過消費者 configuration 屬性設定的。將此設定為 true 可能會導致效能下降,但這樣做可以降低發生故障時重新傳遞記錄的可能性。另請參閱 binder requiredAcks 屬性,它也會影響提交偏移量的效能。自 3.1 版起,此屬性已被取代,改用 ackMode。如果未設定 ackMode 且未啟用批次模式,則將使用 RECORD ackMode。

預設值:false

autoCommitOffset

從 3.1 版開始,此屬性已被取代。請參閱 ackMode 以取得關於替代方案的更多詳細資訊。是否在處理訊息後自動提交偏移量。若設為 false,則 inbound 訊息中會存在一個標頭,其 key 為 kafka_acknowledgment,類型為 org.springframework.kafka.support.Acknowledgment 標頭。應用程式可以使用此標頭來確認訊息。詳細資訊請參閱範例章節。當此屬性設為 false 時,Kafka binder 會將 ack 模式設為 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,且應用程式負責確認記錄。另請參閱 ackEachRecord

預設值:true

ackMode

指定容器 ack 模式。這基於 Spring Kafka 中定義的 AckMode 列舉。如果 ackEachRecord 屬性設為 true 且消費者不在批次模式下,則將使用 RECORD 的 ack 模式,否則,請使用透過此屬性提供的 ack 模式。

autoCommitOnError

在可輪詢的消費者中,若設為 true,則始終在發生錯誤時自動提交。若未設定 (預設值) 或 false,則在可輪詢的消費者中不會自動提交。請注意,此屬性僅適用於可輪詢的消費者。

預設值:未設定。

resetOffsets

是否將消費者上的偏移量重設為 startOffset 提供的值。如果提供了 KafkaBindingRebalanceListener,則必須為 false;請參閱 重新平衡監聽器。有關此屬性的更多資訊,請參閱 重設偏移量

預設值:false

startOffset

新群組的啟始偏移量。允許的值:earliestlatest。如果為消費者 'binding' 明確設定了消費者群組 (透過 spring.cloud.stream.bindings.<channelName>.group),則 'startOffset' 會設為 earliest。否則,對於 anonymous 消費者群組,它會設為 latest。有關此屬性的更多資訊,請參閱 重設偏移量

預設值:null (相當於 earliest)。

enableDlq

若設為 true,則為消費者啟用 DLQ 行為。預設情況下,導致錯誤的訊息會轉發到名為 error.<destination>.<group> 的主題。DLQ 主題名稱可以透過設定 dlqName 屬性或定義 DlqDestinationResolver 類型的 @Bean 來組態。這為更常見的 Kafka 重播場景提供了一個替代選項,適用於錯誤數量相對較少且重播整個原始主題可能過於繁瑣的情況。有關更多資訊,請參閱 kafka dlq 處理。從 2.0 版開始,傳送至 DLQ 主題的訊息會使用以下標頭增強:x-original-topicx-exception-messagex-exception-stacktrace 作為 byte[]。預設情況下,失敗的記錄會傳送至 DLQ 主題中與原始記錄相同的分割區編號。有關如何變更該行為,請參閱 dlq 分割區選擇destinationIsPatterntrue 時不允許使用。

預設值:false

dlqPartitions

enableDlq 為 true 且未設定此屬性時,將建立一個死信主題,其分割區數與主要主題相同。通常,死信記錄會傳送至死信主題中與原始記錄相同的分割區。此行為可以變更;請參閱 dlq 分割區選擇。如果此屬性設為 1 且沒有 DqlPartitionFunction bean,則所有死信記錄都將寫入分割區 0。如果此屬性大於 1,則必須提供 DlqPartitionFunction bean。請注意,實際分割區計數會受到 binder 的 minPartitionCount 屬性的影響。

預設值:none

configuration

包含通用 Kafka 消費者屬性的 key/value 對應的 Map。除了擁有 Kafka 消費者屬性外,其他組態屬性也可以在此處傳遞。例如,應用程式需要的一些屬性,例如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=barbootstrap.servers 屬性不能在此處設定;如果您需要連線到多個叢集,請使用多 binder 支援。

預設值:空對應。

dlqName

接收錯誤訊息的 DLQ 主題名稱。

預設值:null (若未指定,導致錯誤的訊息會轉發到名為 error.<destination>.<group> 的主題)。

dlqProducerProperties

使用此屬性,可以設定 DLQ 特定的生產者屬性。所有可透過 kafka 生產者屬性取得的屬性都可以透過此屬性設定。當在消費者上啟用原生解碼 (即 useNativeDecoding: true) 時,應用程式必須為 DLQ 提供對應的 key/value 序列化器。這必須以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

預設值:預設 Kafka 生產者屬性。

standardHeaders

指示 inbound 通道配接器填入哪些標準標頭。允許的值:noneidtimestampboth。如果使用原生反序列化,且第一個接收訊息的元件需要 id (例如組態為使用 JDBC 訊息儲存區的 aggregator),則此屬性很有用。

預設值:none

converterBeanName

實作 RecordMessageConverter 的 bean 名稱。用於 inbound 通道配接器中以取代預設的 MessagingMessageConverter

預設值:null

idleEventInterval

指示最近未收到訊息的事件之間的時間間隔 (毫秒)。使用 ApplicationListener<ListenerContainerIdleEvent> 來接收這些事件。有關使用範例,請參閱 暫停-恢復

預設值:30000

destinationIsPattern

若為 true,則目的地會被視為用於比對 broker 的主題名稱的正規表示式 Pattern。若為 true,則不會佈建主題,且不允許使用 enableDlq,因為 binder 在佈建階段不知道主題名稱。請注意,偵測符合模式的新主題所需的時間由消費者屬性 metadata.max.age.ms 控制,該屬性 (在撰寫本文時) 預設為 300,000 毫秒 (5 分鐘)。可以使用上述 configuration 屬性組態此屬性。

預設值:false

topic.properties

佈建新主題時使用的 Kafka 主題屬性 Map,例如 spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

預設值:無。

topic.replicas-assignment

複本指派的 Map<Integer, List<Integer>>,其中 key 為分割區,value 為指派。佈建新主題時使用。請參閱 kafka-clients jar 中的 NewTopic Javadoc。

預設值:無。

topic.replication-factor

佈建主題時要使用的複寫因數。覆寫 binder 全域設定。如果存在 replicas-assignments,則忽略此屬性。

預設值:無 (使用 binder 全域預設值 -1)。

pollTimeout

用於可輪詢消費者中輪詢的逾時。

預設值:5 秒。

transactionManager

用於覆寫此繫結的 binder 交易管理器的 KafkaAwareTransactionManager 的 Bean 名稱。如果您想要使用 ChainedKafkaTransactionManaager 將另一個交易與 Kafka 交易同步,通常需要此屬性。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須組態為相同的交易管理器。

預設值:無。

txCommitRecovered

使用事務性 binder 時,預設情況下,恢復記錄 (例如,當重試耗盡且記錄傳送至死信主題時) 的偏移量將透過新交易提交。將此屬性設為 false 會抑制提交恢復記錄的偏移量。

預設值:true。

commonErrorHandlerBeanName

每個消費者繫結要使用的 CommonErrorHandler bean 名稱。如果存在,此使用者提供的 CommonErrorHandler 會優先於 binder 定義的任何其他錯誤處理器。如果應用程式不想使用 ListenerContainerCustomizer,然後檢查目的地/群組組合以設定錯誤處理器,這是一種表達錯誤處理器的便捷方式。

預設值:無。

Kafka 生產者屬性

以下屬性僅適用於 Kafka 生產者,且必須以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 為前綴。

為了避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.producer.<property>=<value>
admin.configuration

自 2.1.1 版起,此屬性已被取代,改用 topic.properties,且未來版本將移除對其的支援。

admin.replicas-assignment

自 2.1.1 版起,此屬性已被取代,改用 topic.replicas-assignment,且未來版本將移除對其的支援。

admin.replication-factor

自 2.1.1 版起,此屬性已被取代,改用 topic.replication-factor,且未來版本將移除對其的支援。

bufferSize

Kafka 生產者在傳送前嘗試批次處理多少資料 (以位元組為單位) 的上限。

預設值:16384

sync

生產者是否為同步。

預設值:false

sendTimeoutExpression

針對傳出訊息評估的 SpEL 表達式,用於評估啟用同步發布時等待應答的時間,例如 headers['mySendTimeout']。逾時值以毫秒為單位。在 3.0 之前的版本中,除非使用原生編碼,否則無法使用 payload,因為在評估此表達式時,payload 已為 byte[] 形式。現在,表達式會在轉換 payload 之前評估。

預設值:none

batchTimeout

生產者等待多長時間以允許更多訊息累積在同一批次中,然後再傳送訊息。(通常,生產者根本不會等待,而只是傳送先前傳送正在進行時累積的所有訊息。) 非零值可能會提高輸送量,但會犧牲延遲。

預設值:0

messageKeyExpression

針對傳出訊息評估的 SpEL 表達式,用於填入生產的 Kafka 訊息的 key,例如 headers['myKey']。在 3.0 之前的版本中,除非使用原生編碼,否則無法使用 payload,因為在評估此表達式時,payload 已為 byte[] 形式。現在,表達式會在轉換 payload 之前評估。在常規處理器 (Function<String, String>Function<Message<?>, Message<?>) 的情況下,如果生產的 key 需要與來自主題的傳入 key 相同,則可以將此屬性設定如下。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] 對於反應式函式,有一個重要的注意事項需要記住。在這種情況下,由應用程式手動將標頭從傳入訊息複製到傳出訊息。您可以設定標頭,例如 myKey 並使用如上建議的 headers['myKey'],或者,為了方便起見,只需設定 KafkaHeaders.MESSAGE_KEY 標頭,您就不需要設定此屬性。

預設值:none

headerPatterns

以逗號分隔的簡單模式清單,用於比對要對應到 ProducerRecord 中的 Kafka Headers 的 Spring messaging 標頭。模式可以以萬用字元 (星號) 開頭或結尾。模式可以使用 ! 作為前綴來否定。比對在第一次比對 (正面或負面) 後停止。例如 !ask,as* 將通過 ash 但不會通過 askidtimestamp 永遠不會被對應。

預設值:* (所有標頭 - 除了 idtimestamp)

configuration

包含通用 Kafka 生產者屬性的 key/value 對應的 Map。bootstrap.servers 屬性不能在此處設定;如果您需要連線到多個叢集,請使用多 binder 支援。

預設值:空對應。

topic.properties

佈建新主題時使用的 Kafka 主題屬性 Map,例如 spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

複本指派的 Map<Integer, List<Integer>>,其中 key 為分割區,value 為指派。佈建新主題時使用。請參閱 kafka-clients jar 中的 NewTopic Javadoc。

預設值:無。

topic.replication-factor

佈建主題時要使用的複寫因數。覆寫 binder 全域設定。如果存在 replicas-assignments,則忽略此屬性。

預設值:無 (使用 binder 全域預設值 -1)。

useTopicHeader

設為 true 以使用傳出訊息中 KafkaHeaders.TOPIC 訊息標頭的值覆寫預設繫結目的地 (主題名稱)。如果標頭不存在,則使用預設繫結目的地。

預設值:false

recordMetadataChannel

應將成功傳送結果傳送至的 MessageChannel 的 bean 名稱;bean 必須存在於應用程式上下文中。傳送至通道的訊息是已傳送的訊息 (如果有的話,在轉換之後),並帶有一個額外的標頭 KafkaHeaders.RECORD_METADATA。標頭包含 Kafka 用戶端提供的 RecordMetadata 物件;它包括記錄寫入主題中的分割區和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

傳送失敗會轉到生產者錯誤通道 (如果已組態);請參閱 Kafka 錯誤通道

預設值:null。

Kafka binder 使用生產者的 partitionCount 設定作為提示,以使用給定的分割區計數建立主題 (與 minPartitionCount 結合使用,兩者中的最大值將作為使用值)。在為 binder 組態 minPartitionCount 和為應用程式組態 partitionCount 時要謹慎,因為會使用較大的值。如果主題已存在且分割區計數較小,且 autoAddPartitions 已停用 (預設值),則 binder 將無法啟動。如果主題已存在且分割區計數較小,且 autoAddPartitions 已啟用,則會新增分割區。如果主題已存在且分割區數大於 (minPartitionCountpartitionCount) 的最大值,則會使用現有的分割區計數。
compression

設定 compression.type 生產者屬性。支援的值為 nonegzipsnappylz4zstd。如果您將 kafka-clients jar 覆寫為 2.1.0 (或更新版本),如 Spring for Apache Kafka 文件中所述,並希望使用 zstd 壓縮,請使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

預設值:none

transactionManager

用於覆寫此繫結的 binder 交易管理器的 KafkaAwareTransactionManager 的 Bean 名稱。如果您想要使用 ChainedKafkaTransactionManaager 將另一個交易與 Kafka 交易同步,通常需要此屬性。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須組態為相同的交易管理器。

預設值:無。

closeTimeout

關閉生產者時等待的逾時秒數。

預設值:30

allowNonTransactional

通常,與事務性 binder 關聯的所有輸出繫結都會在新交易中發布 (如果尚未進行交易)。此屬性允許您覆寫該行為。若設為 true,則發布到此輸出繫結的記錄將不會在交易中執行,除非交易已在進行中。

預設值:false