組態選項
本節包含 Apache Kafka Binder 使用的組態選項。
關於 Binder 的常見組態選項與屬性,請參閱核心文件中的繫結屬性。
Kafka Binder 屬性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka binder 連接的 brokers 伺服器清單。
預設值:
localhost
。 - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers
允許指定包含或不包含 port 資訊的主機 (例如,host1,host2:port2
)。當 broker 清單中未組態 port 時,此設定會設定預設 port。預設值:
9092
。 - spring.cloud.stream.kafka.binder.configuration
-
用戶端屬性 (生產者和消費者) 的 Key/Value 對應,傳遞至 binder 建立的所有用戶端。由於這些屬性同時被生產者和消費者使用,因此使用上應限制於通用屬性,例如安全性設定。透過此組態提供的未知 Kafka 生產者或消費者屬性會被過濾掉,且不允許傳播。此處的屬性會取代 boot 中設定的任何屬性。
預設值:空對應。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 用戶端消費者屬性的 Key/Value 對應。除了支援已知的 Kafka 消費者屬性外,此處也允許未知的消費者屬性。此處的屬性會取代 boot 和上述
configuration
屬性中設定的任何屬性。預設值:空對應。
- spring.cloud.stream.kafka.binder.headers
-
binder 傳輸的自訂標頭清單。僅在與較舊的應用程式 (⇐ 1.3.x) 通訊且
kafka-clients
版本 < 0.11.0.0 時才需要。較新版本原生支援標頭。預設值:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
等待取得分割區資訊的時間 (秒)。如果此計時器過期,健康檢查會報告為失敗。
預設值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
broker 上所需的應答數。請參閱 Kafka 文件中關於生產者
acks
屬性的說明。預設值:
1
。 - spring.cloud.stream.kafka.binder.minPartitionCount
-
僅在設定
autoCreateTopics
或autoAddPartitions
時有效。binder 在其生產或消費資料的主題上組態的全域最小分割區數。它可以被生產者的partitionCount
設定或生產者的instanceCount * concurrency
設定值 (以較大者為準) 取代。預設值:
1
。 - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 用戶端生產者屬性的 Key/Value 對應。除了支援已知的 Kafka 生產者屬性外,此處也允許未知的生產者屬性。此處的屬性會取代 boot 和上述
configuration
屬性中設定的任何屬性。預設值:空對應。
- spring.cloud.stream.kafka.binder.replicationFactor
-
如果
autoCreateTopics
處於啟用狀態,則自動建立主題的複寫因數。可以在每個繫結上覆寫。如果您使用的是 2.4 之前的 Kafka broker 版本,則此值應設定為至少 1
。從 3.0.8 版本開始,binder 使用-1
作為預設值,表示 broker 'default.replication.factor' 屬性將用於決定複本數。請諮詢您的 Kafka broker 管理員,以了解是否有要求最小複寫因數的政策,如果有的話,通常default.replication.factor
會符合該值,且應使用-1
,除非您需要大於最小值的複寫因數。預設值:
-1
。 - spring.cloud.stream.kafka.binder.autoCreateTopics
-
若設為
true
,binder 會自動建立新主題。若設為false
,binder 會依賴已組態的主題。在後一種情況下,如果主題不存在,binder 將無法啟動。此設定與 broker 的 auto.create.topics.enable
設定無關,且不會影響它。如果伺服器設定為自動建立主題,則可能會在 metadata 擷取請求中建立主題,並使用預設 broker 設定。預設值:
true
。 - spring.cloud.stream.kafka.binder.autoAddPartitions
-
若設為
true
,binder 會在需要時建立新的分割區。若設為false
,binder 會依賴已組態的主題分割區大小。如果目標主題的分割區計數小於預期值,binder 將無法啟動。預設值:
false
。 - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在 binder 中啟用交易。請參閱 Kafka 文件中的
transaction.id
以及spring-kafka
文件中的交易。啟用交易後,個別producer
屬性會被忽略,且所有生產者都會使用spring.cloud.stream.kafka.binder.transaction.producer.*
屬性。預設值
null
(無交易) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
事務性 binder 中生產者的全域生產者屬性。請參閱
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
和 Kafka 生產者屬性以及所有 binder 支援的通用生產者屬性。預設值:請參閱個別生產者屬性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
用於將
spring-messaging
標頭對應至 Kafka 標頭和從 Kafka 標頭對應的KafkaHeaderMapper
的 bean 名稱。例如,如果您想要在為標頭使用 JSON 反序列化的BinderHeaderMapper
bean 中自訂受信任套件,請使用此屬性。如果未使用此屬性將此自訂BinderHeaderMapper
bean 提供給 binder,則 binder 將尋找名稱為kafkaBinderHeaderMapper
且類型為BinderHeaderMapper
的標頭對應器 bean,然後再回退到 binder 建立的預設BinderHeaderMapper
。預設值:無。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
當主題上的任何分割區 (無論哪個消費者正在從中接收資料) 發現沒有 leader 時,將 binder 健康狀態設定為
down
的旗標。預設值:
true
。 - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
當 truststore 或 keystore 憑證位置作為非本機檔案系統資源 (org.springframework.core.io.Resource 支援的資源,例如 CLASSPATH、HTTP 等) 給定時,binder 會將資源從路徑 (可轉換為 org.springframework.core.io.Resource) 複製到檔案系統上的位置。broker 層級憑證 (
ssl.truststore.location
和ssl.keystore.location
) 以及用於 schema registry 的憑證 (schema.registry.ssl.truststore.location
和schema.registry.ssl.keystore.location
) 皆是如此。請記住,truststore 和 keystore 位置路徑必須在spring.cloud.stream.kafka.binder.configuration…
下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
、spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
等。檔案將複製到指定為此屬性值的 location,該位置必須是檔案系統上應用程式執行程序可寫入的現有目錄。如果未設定此值,且憑證檔案是非本機檔案系統資源,則會將其複製到 System 的 temp 目錄,如System.getProperty("java.io.tmpdir")
所傳回。如果此值存在,但檔案系統上找不到該目錄或該目錄不可寫入,情況亦是如此。預設值:無。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
若設為 true,則每次存取指標時,都會計算每個消費者主題的偏移量延遲指標。若設為 false,則僅使用定期計算的偏移量延遲。
預設值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
計算每個消費者主題偏移量延遲的間隔。當
metrics.defaultOffsetLagMetricsEnabled
停用或其計算時間過長時,會使用此值。預設值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此 binder 中的所有繫結上啟用 Micrometer observation registry。
預設值:false
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
-
KafkaHealthIndicator
metadata 消費者group.id
。此消費者由HealthIndicator
用於查詢使用中主題的 metadata。預設值:無。
Kafka 消費者屬性
以下屬性僅適用於 Kafka 消費者,且必須以 spring.cloud.stream.kafka.bindings.<channelName>.consumer.
為前綴。
為了避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.consumer.<property>=<value> 。 |
- admin.configuration
-
自 2.1.1 版起,此屬性已被取代,改用
topic.properties
,且未來版本將移除對其的支援。 - admin.replicas-assignment
-
自 2.1.1 版起,此屬性已被取代,改用
topic.replicas-assignment
,且未來版本將移除對其的支援。 - admin.replication-factor
-
自 2.1.1 版起,此屬性已被取代,改用
topic.replication-factor
,且未來版本將移除對其的支援。 - autoRebalanceEnabled
-
當
true
時,主題分割區會在消費者群組的成員之間自動重新平衡。當false
時,每個消費者都會根據spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
指派一組固定的分割區。這需要每個啟動的實例都適當地設定spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
屬性。在這種情況下,spring.cloud.stream.instanceCount
屬性的值通常必須大於 1。預設值:
true
。 - ackEachRecord
-
當
autoCommitOffset
為true
時,此設定決定是否在處理完每筆記錄後提交偏移量。預設情況下,偏移量會在處理完consumer.poll()
傳回的批次記錄中的所有記錄後提交。可以使用 Kafka 屬性max.poll.records
控制輪詢傳回的記錄數,該屬性是透過消費者configuration
屬性設定的。將此設定為true
可能會導致效能下降,但這樣做可以降低發生故障時重新傳遞記錄的可能性。另請參閱 binderrequiredAcks
屬性,它也會影響提交偏移量的效能。自 3.1 版起,此屬性已被取代,改用ackMode
。如果未設定ackMode
且未啟用批次模式,則將使用RECORD
ackMode。預設值:
false
。 - autoCommitOffset
-
從 3.1 版開始,此屬性已被取代。請參閱
ackMode
以取得關於替代方案的更多詳細資訊。是否在處理訊息後自動提交偏移量。若設為false
,則 inbound 訊息中會存在一個標頭,其 key 為kafka_acknowledgment
,類型為org.springframework.kafka.support.Acknowledgment
標頭。應用程式可以使用此標頭來確認訊息。詳細資訊請參閱範例章節。當此屬性設為false
時,Kafka binder 會將 ack 模式設為org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
,且應用程式負責確認記錄。另請參閱ackEachRecord
。預設值:
true
。 - ackMode
-
指定容器 ack 模式。這基於 Spring Kafka 中定義的 AckMode 列舉。如果
ackEachRecord
屬性設為true
且消費者不在批次模式下,則將使用RECORD
的 ack 模式,否則,請使用透過此屬性提供的 ack 模式。 - autoCommitOnError
-
在可輪詢的消費者中,若設為
true
,則始終在發生錯誤時自動提交。若未設定 (預設值) 或 false,則在可輪詢的消費者中不會自動提交。請注意,此屬性僅適用於可輪詢的消費者。預設值:未設定。
- resetOffsets
-
是否將消費者上的偏移量重設為 startOffset 提供的值。如果提供了
KafkaBindingRebalanceListener
,則必須為 false;請參閱 重新平衡監聽器。有關此屬性的更多資訊,請參閱 重設偏移量。預設值:
false
。 - startOffset
-
新群組的啟始偏移量。允許的值:
earliest
和latest
。如果為消費者 'binding' 明確設定了消費者群組 (透過spring.cloud.stream.bindings.<channelName>.group
),則 'startOffset' 會設為earliest
。否則,對於anonymous
消費者群組,它會設為latest
。有關此屬性的更多資訊,請參閱 重設偏移量。預設值:null (相當於
earliest
)。 - enableDlq
-
若設為 true,則為消費者啟用 DLQ 行為。預設情況下,導致錯誤的訊息會轉發到名為
error.<destination>.<group>
的主題。DLQ 主題名稱可以透過設定dlqName
屬性或定義DlqDestinationResolver
類型的@Bean
來組態。這為更常見的 Kafka 重播場景提供了一個替代選項,適用於錯誤數量相對較少且重播整個原始主題可能過於繁瑣的情況。有關更多資訊,請參閱 kafka dlq 處理。從 2.0 版開始,傳送至 DLQ 主題的訊息會使用以下標頭增強:x-original-topic
、x-exception-message
和x-exception-stacktrace
作為byte[]
。預設情況下,失敗的記錄會傳送至 DLQ 主題中與原始記錄相同的分割區編號。有關如何變更該行為,請參閱 dlq 分割區選擇。當destinationIsPattern
為true
時不允許使用。預設值:
false
。 - dlqPartitions
-
當
enableDlq
為 true 且未設定此屬性時,將建立一個死信主題,其分割區數與主要主題相同。通常,死信記錄會傳送至死信主題中與原始記錄相同的分割區。此行為可以變更;請參閱 dlq 分割區選擇。如果此屬性設為1
且沒有DqlPartitionFunction
bean,則所有死信記錄都將寫入分割區0
。如果此屬性大於1
,則必須提供DlqPartitionFunction
bean。請注意,實際分割區計數會受到 binder 的minPartitionCount
屬性的影響。預設值:
none
- configuration
-
包含通用 Kafka 消費者屬性的 key/value 對應的 Map。除了擁有 Kafka 消費者屬性外,其他組態屬性也可以在此處傳遞。例如,應用程式需要的一些屬性,例如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
。bootstrap.servers
屬性不能在此處設定;如果您需要連線到多個叢集,請使用多 binder 支援。預設值:空對應。
- dlqName
-
接收錯誤訊息的 DLQ 主題名稱。
預設值:null (若未指定,導致錯誤的訊息會轉發到名為
error.<destination>.<group>
的主題)。 - dlqProducerProperties
-
使用此屬性,可以設定 DLQ 特定的生產者屬性。所有可透過 kafka 生產者屬性取得的屬性都可以透過此屬性設定。當在消費者上啟用原生解碼 (即 useNativeDecoding: true) 時,應用程式必須為 DLQ 提供對應的 key/value 序列化器。這必須以
dlqProducerProperties.configuration.key.serializer
和dlqProducerProperties.configuration.value.serializer
的形式提供。預設值:預設 Kafka 生產者屬性。
- standardHeaders
-
指示 inbound 通道配接器填入哪些標準標頭。允許的值:
none
、id
、timestamp
或both
。如果使用原生反序列化,且第一個接收訊息的元件需要id
(例如組態為使用 JDBC 訊息儲存區的 aggregator),則此屬性很有用。預設值:
none
- converterBeanName
-
實作
RecordMessageConverter
的 bean 名稱。用於 inbound 通道配接器中以取代預設的MessagingMessageConverter
。預設值:
null
- idleEventInterval
-
指示最近未收到訊息的事件之間的時間間隔 (毫秒)。使用
ApplicationListener<ListenerContainerIdleEvent>
來接收這些事件。有關使用範例,請參閱 暫停-恢復。預設值:
30000
- destinationIsPattern
-
若為 true,則目的地會被視為用於比對 broker 的主題名稱的正規表示式
Pattern
。若為 true,則不會佈建主題,且不允許使用enableDlq
,因為 binder 在佈建階段不知道主題名稱。請注意,偵測符合模式的新主題所需的時間由消費者屬性metadata.max.age.ms
控制,該屬性 (在撰寫本文時) 預設為 300,000 毫秒 (5 分鐘)。可以使用上述configuration
屬性組態此屬性。預設值:
false
- topic.properties
-
佈建新主題時使用的 Kafka 主題屬性
Map
,例如spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
預設值:無。
- topic.replicas-assignment
-
複本指派的 Map<Integer, List<Integer>>,其中 key 為分割區,value 為指派。佈建新主題時使用。請參閱
kafka-clients
jar 中的NewTopic
Javadoc。預設值:無。
- topic.replication-factor
-
佈建主題時要使用的複寫因數。覆寫 binder 全域設定。如果存在
replicas-assignments
,則忽略此屬性。預設值:無 (使用 binder 全域預設值 -1)。
- pollTimeout
-
用於可輪詢消費者中輪詢的逾時。
預設值:5 秒。
- transactionManager
-
用於覆寫此繫結的 binder 交易管理器的
KafkaAwareTransactionManager
的 Bean 名稱。如果您想要使用ChainedKafkaTransactionManaager
將另一個交易與 Kafka 交易同步,通常需要此屬性。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須組態為相同的交易管理器。預設值:無。
- txCommitRecovered
-
使用事務性 binder 時,預設情況下,恢復記錄 (例如,當重試耗盡且記錄傳送至死信主題時) 的偏移量將透過新交易提交。將此屬性設為
false
會抑制提交恢復記錄的偏移量。預設值:true。
- commonErrorHandlerBeanName
-
每個消費者繫結要使用的
CommonErrorHandler
bean 名稱。如果存在,此使用者提供的CommonErrorHandler
會優先於 binder 定義的任何其他錯誤處理器。如果應用程式不想使用ListenerContainerCustomizer
,然後檢查目的地/群組組合以設定錯誤處理器,這是一種表達錯誤處理器的便捷方式。預設值:無。
Kafka 生產者屬性
以下屬性僅適用於 Kafka 生產者,且必須以 spring.cloud.stream.kafka.bindings.<channelName>.producer.
為前綴。
為了避免重複,Spring Cloud Stream 支援為所有通道設定值,格式為 spring.cloud.stream.kafka.default.producer.<property>=<value> 。 |
- admin.configuration
-
自 2.1.1 版起,此屬性已被取代,改用
topic.properties
,且未來版本將移除對其的支援。 - admin.replicas-assignment
-
自 2.1.1 版起,此屬性已被取代,改用
topic.replicas-assignment
,且未來版本將移除對其的支援。 - admin.replication-factor
-
自 2.1.1 版起,此屬性已被取代,改用
topic.replication-factor
,且未來版本將移除對其的支援。 - bufferSize
-
Kafka 生產者在傳送前嘗試批次處理多少資料 (以位元組為單位) 的上限。
預設值:
16384
。 - sync
-
生產者是否為同步。
預設值:
false
。 - sendTimeoutExpression
-
針對傳出訊息評估的 SpEL 表達式,用於評估啟用同步發布時等待應答的時間,例如
headers['mySendTimeout']
。逾時值以毫秒為單位。在 3.0 之前的版本中,除非使用原生編碼,否則無法使用 payload,因為在評估此表達式時,payload 已為byte[]
形式。現在,表達式會在轉換 payload 之前評估。預設值:
none
。 - batchTimeout
-
生產者等待多長時間以允許更多訊息累積在同一批次中,然後再傳送訊息。(通常,生產者根本不會等待,而只是傳送先前傳送正在進行時累積的所有訊息。) 非零值可能會提高輸送量,但會犧牲延遲。
預設值:
0
。 - messageKeyExpression
-
針對傳出訊息評估的 SpEL 表達式,用於填入生產的 Kafka 訊息的 key,例如
headers['myKey']
。在 3.0 之前的版本中,除非使用原生編碼,否則無法使用 payload,因為在評估此表達式時,payload 已為byte[]
形式。現在,表達式會在轉換 payload 之前評估。在常規處理器 (Function<String, String>
或Function<Message<?>, Message<?>
) 的情況下,如果生產的 key 需要與來自主題的傳入 key 相同,則可以將此屬性設定如下。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
對於反應式函式,有一個重要的注意事項需要記住。在這種情況下,由應用程式手動將標頭從傳入訊息複製到傳出訊息。您可以設定標頭,例如myKey
並使用如上建議的headers['myKey']
,或者,為了方便起見,只需設定KafkaHeaders.MESSAGE_KEY
標頭,您就不需要設定此屬性。預設值:
none
。 - headerPatterns
-
以逗號分隔的簡單模式清單,用於比對要對應到
ProducerRecord
中的 KafkaHeaders
的 Spring messaging 標頭。模式可以以萬用字元 (星號) 開頭或結尾。模式可以使用!
作為前綴來否定。比對在第一次比對 (正面或負面) 後停止。例如!ask,as*
將通過ash
但不會通過ask
。id
和timestamp
永遠不會被對應。預設值:
*
(所有標頭 - 除了id
和timestamp
) - configuration
-
包含通用 Kafka 生產者屬性的 key/value 對應的 Map。
bootstrap.servers
屬性不能在此處設定;如果您需要連線到多個叢集,請使用多 binder 支援。預設值:空對應。
- topic.properties
-
佈建新主題時使用的 Kafka 主題屬性
Map
,例如spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
- topic.replicas-assignment
-
複本指派的 Map<Integer, List<Integer>>,其中 key 為分割區,value 為指派。佈建新主題時使用。請參閱
kafka-clients
jar 中的NewTopic
Javadoc。預設值:無。
- topic.replication-factor
-
佈建主題時要使用的複寫因數。覆寫 binder 全域設定。如果存在
replicas-assignments
,則忽略此屬性。預設值:無 (使用 binder 全域預設值 -1)。
- useTopicHeader
-
設為
true
以使用傳出訊息中KafkaHeaders.TOPIC
訊息標頭的值覆寫預設繫結目的地 (主題名稱)。如果標頭不存在,則使用預設繫結目的地。預設值:
false
。 - recordMetadataChannel
-
應將成功傳送結果傳送至的
MessageChannel
的 bean 名稱;bean 必須存在於應用程式上下文中。傳送至通道的訊息是已傳送的訊息 (如果有的話,在轉換之後),並帶有一個額外的標頭KafkaHeaders.RECORD_METADATA
。標頭包含 Kafka 用戶端提供的RecordMetadata
物件;它包括記錄寫入主題中的分割區和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
傳送失敗會轉到生產者錯誤通道 (如果已組態);請參閱 Kafka 錯誤通道。
預設值:null。
Kafka binder 使用生產者的 partitionCount 設定作為提示,以使用給定的分割區計數建立主題 (與 minPartitionCount 結合使用,兩者中的最大值將作為使用值)。在為 binder 組態 minPartitionCount 和為應用程式組態 partitionCount 時要謹慎,因為會使用較大的值。如果主題已存在且分割區計數較小,且 autoAddPartitions 已停用 (預設值),則 binder 將無法啟動。如果主題已存在且分割區計數較小,且 autoAddPartitions 已啟用,則會新增分割區。如果主題已存在且分割區數大於 (minPartitionCount 或 partitionCount ) 的最大值,則會使用現有的分割區計數。 |
- compression
-
設定
compression.type
生產者屬性。支援的值為none
、gzip
、snappy
、lz4
和zstd
。如果您將kafka-clients
jar 覆寫為 2.1.0 (或更新版本),如 Spring for Apache Kafka 文件中所述,並希望使用zstd
壓縮,請使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
。預設值:
none
。 - transactionManager
-
用於覆寫此繫結的 binder 交易管理器的
KafkaAwareTransactionManager
的 Bean 名稱。如果您想要使用ChainedKafkaTransactionManaager
將另一個交易與 Kafka 交易同步,通常需要此屬性。為了實現記錄的精確一次消費和生產,消費者和生產者繫結都必須組態為相同的交易管理器。預設值:無。
- closeTimeout
-
關閉生產者時等待的逾時秒數。
預設值:
30
- allowNonTransactional
-
通常,與事務性 binder 關聯的所有輸出繫結都會在新交易中發布 (如果尚未進行交易)。此屬性允許您覆寫該行為。若設為 true,則發布到此輸出繫結的記錄將不會在交易中執行,除非交易已在進行中。
預設值:
false