Listener Container 屬性

表 1. ContainerProperties 屬性
屬性 預設值 描述

ackCount

1

ackModeCOUNTCOUNT_TIME 時,在提交待處理的偏移量之前的記錄數。

adviceChain

null

Advice 物件的鏈 (例如,環繞訊息監聽器的 MethodInterceptor advice),依序調用。

ackMode

BATCH

控制偏移量提交的頻率 - 請參閱 提交偏移量

ackTime

5000

ackModeTIMECOUNT_TIME 時,在提交待處理的偏移量之後的時間 (以毫秒為單位)。

assignmentCommitOption

LATEST_ONLY _NO_TX

是否在指派時提交初始位置;預設情況下,只有在 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest 時才會提交初始偏移量,即使存在交易管理器,也不會在交易中執行。有關可用選項的更多資訊,請參閱 ContainerProperties.AssignmentCommitOption 的 JavaDocs。

asyncAcks

false

啟用亂序提交 (請參閱 手動提交偏移量);Consumer 會暫停,並且提交會延遲到填補間隙為止。

authExceptionRetryInterval

null

當不為 null 時,在 Kafka Client 擲回 AuthenticationExceptionAuthorizationException 時,輪詢之間要睡眠的 Duration。當為 null 時,此類例外會被視為致命錯誤,容器將停止。

batchRecoverAfterRollback

false

設定為 true 以啟用批次復原,請參閱 Rollback 後處理器

clientId

(空字串)

client.id Consumer 屬性的前綴。覆寫 Consumer Factory client.id 屬性;在並行容器中,會為每個 Consumer 實例新增 -n 作為後綴。

checkDeserExWhenKeyNull

false

設定為 true,以便在收到 null key 時,始終檢查 DeserializationException 標頭。當 Consumer 程式碼無法判斷是否已設定 ErrorHandlingDeserializer 時 (例如使用委派反序列化器時),這很有用。

checkDeserExWhenValueNull

false

設定為 true,以便在收到 null value 時,始終檢查 DeserializationException 標頭。當 Consumer 程式碼無法判斷是否已設定 ErrorHandlingDeserializer 時 (例如使用委派反序列化器時),這很有用。

commitCallback

null

當存在且 syncCommitsfalse 時,提交完成後調用的回呼。

commitLogLevel

DEBUG

與提交偏移量相關的日誌記錄層級。

consumerRebalanceListener

null

重新平衡監聽器;請參閱 重新平衡監聽器

commitRetries

3

當使用設定為 true 的 syncCommits 時,設定 RetriableCommitFailedException 的重試次數。預設值為 3 (總共 4 次嘗試)。

consumerStartTimeout

30 秒

Consumer 在記錄錯誤之前等待啟動的時間;如果 (例如) 您使用的任務執行器執行緒不足,則可能會發生這種情況。

deliveryAttemptHeader

false

請參閱 傳遞嘗試標頭

eosMode

V2

精確一次語意模式;請參閱 精確一次語意

fixTxOffsets

false

當使用交易 Producer 生產的記錄,且 Consumer 位於分割區的末端時,由於用於指示交易提交/Rollback 的偽記錄,以及可能存在的 Rollback 記錄,延遲可能會錯誤地報告為大於零。這在功能上不會影響 Consumer,但某些使用者已表示擔心「延遲」不為零。將此屬性設定為 true,容器將更正此類錯誤報告的偏移量。檢查會在下一次輪詢之前執行,以避免為提交處理增加顯著的複雜性。在撰寫本文時,只有在 Consumer 設定了 isolation.level=read_committedmax.poll.records 大於 1 時,才會更正延遲。有關更多資訊,請參閱 KAFKA-10683

groupId

null

覆寫 Consumer group.id 屬性;由 @KafkaListener idgroupId 屬性自動設定。

idleBeforeDataMultiplier

5.0

idleEventInterval 的乘數,在收到任何記錄之前套用。收到記錄後,不再套用乘數。自 2.8 版起可用。

idleBetweenPolls

0

用於透過在輪詢之間睡眠執行緒來減慢傳遞速度。處理一批記錄的時間加上此值必須小於 max.poll.interval.ms Consumer 屬性。

idleEventInterval

null

設定後,啟用 ListenerContainerIdleEvent 的發佈,請參閱 應用程式事件偵測閒置和無回應的 Consumer。另請參閱 idleBeforeDataMultiplier

idlePartitionEventInterval

null

設定後,啟用 ListenerContainerIdlePartitionEvent 的發佈,請參閱 應用程式事件偵測閒置和無回應的 Consumer

kafkaConsumerProperties

用於覆寫在 Consumer Factory 上設定的任何任意 Consumer 屬性。

kafkaAwareTransactionManager

null

請參閱 交易

listenerTaskExecutor

SimpleAsyncTaskExecutor

用於執行 Consumer 執行緒的任務執行器。預設執行器建立名為 <name>-C-n 的執行緒;對於 KafkaMessageListenerContainer,名稱是 Bean 名稱;對於 ConcurrentMessageListenerContainer,名稱是 Bean 名稱,後綴為 -n,其中 n 會針對每個子容器遞增。

logContainerConfig

false

設定為 true 以在 INFO 層級記錄所有容器屬性。

messageListener

null

訊息監聽器。

micrometerEnabled

true

是否為 Consumer 執行緒維護 Micrometer 計時器。

micrometerTags

要新增至 Micrometer 指標的靜態標籤對應。

micrometerTagsProvider

null

根據 Consumer 記錄提供動態標籤的函數。

missingTopicsFatal

false

如果 Broker 上不存在已設定的主題,則為 true 時會阻止容器啟動。

monitorInterval

30 秒

檢查 Consumer 執行緒的狀態以取得 NonResponsiveConsumerEvent 的頻率。請參閱 noPollThresholdpollTimeout

noPollThreshold

3.0

乘以 pollTimeOut 以判斷是否發佈 NonResponsiveConsumerEvent。請參閱 monitorInterval

observationConvention

null

設定後,根據 Consumer 記錄中的資訊,將動態標籤新增至計時器和追蹤。

observationEnabled

false

設定為 true 以透過 Micrometer 啟用觀察。

offsetAndMetadataProvider

null

OffsetAndMetadata 的提供者;預設情況下,提供者會建立具有空元資料的偏移量和元資料。提供者提供了一種自訂元資料的方式。

onlyLogRecordMetadata

false

設定為 false 以記錄完整的 Consumer 記錄 (在錯誤、偵錯記錄等中),而不僅僅是 topic-partition@offset

pauseImmediate

false

當容器暫停時,在目前記錄之後停止處理,而不是在處理完前一次輪詢的所有記錄之後停止;剩餘的記錄會保留在記憶體中,並在容器恢復時傳遞給監聽器。

pollTimeout

5000

傳遞至 Consumer.poll() 的逾時時間 (以毫秒為單位)。

pollTimeoutWhilePaused

100

當容器處於暫停狀態時,傳遞至 Consumer.poll() 的逾時時間 (以毫秒為單位)。

restartAfterAuthExceptions

false

如果容器因授權/驗證例外而停止,則為 True 以重新啟動容器。

scheduler

ThreadPoolTaskScheduler

要在其上執行 Consumer 監視器工作的排程器。

shutdownTimeout

10000

stop() 方法的最大封鎖時間 (以毫秒為單位),直到所有 Consumer 停止且發佈容器停止事件之前。

stopContainerWhenFenced

false

如果擲回 ProducerFencedException,則停止監聽器容器。有關更多資訊,請參閱 Rollback 後處理器

stopImmediate

false

當容器停止時,在目前記錄之後停止處理,而不是在處理完前一次輪詢的所有記錄之後停止。

subBatchPerPartition

請參閱描述。

當使用批次監聽器時,如果此值為 true,則會使用輪詢結果分割成子批次 (每個分割區一個) 來調用監聽器。預設值為 false

syncCommitTimeout

null

syncCommitstrue 時要使用的逾時時間。如果未設定,容器將嘗試判斷 default.api.timeout.ms Consumer 屬性並使用該屬性;否則將使用 60 秒。

syncCommits

true

是否對偏移量使用同步或非同步提交;請參閱 commitCallback

topics topicPattern topicPartitions

不適用

已設定的主題、主題模式或明確指派的主題/分割區。互斥;必須至少提供一個;由 ContainerProperties 建構函式強制執行。

transactionManager

null

自 3.2 版起已棄用,請參閱 [kafkaAwareTransactionManager]其他交易管理器

表 2. AbstractListenerContainer 屬性
屬性 預設值 描述

afterRollbackProcessor

DefaultAfterRollbackProcessor

在交易 Rollback 後調用的 AfterRollbackProcessor

applicationEventPublisher

應用程式內容

事件發佈者。

batchErrorHandler

請參閱描述。

已棄用 - 請參閱 commonErrorHandler

batchInterceptor

null

設定要在調用批次監聽器之前調用的 BatchInterceptor;不適用於記錄監聽器。另請參閱 interceptBeforeTx

beanName

Bean 名稱

容器的 Bean 名稱;子容器的後綴為 -n

commonErrorHandler

請參閱描述。

DefaultErrorHandlernull (當在使用 DefaultAfterRollbackProcessor 時提供 transactionManager 時)。請參閱 容器錯誤處理器

containerProperties

ContainerProperties

容器屬性實例。

groupId

請參閱描述。

containerProperties.groupId (如果存在),否則為 Consumer Factory 中的 group.id 屬性。

interceptBeforeTx

true

判斷是在交易開始之前還是之後調用 recordInterceptor

listenerId

請參閱描述。

使用者設定容器的 Bean 名稱或 @KafkaListenerid 屬性。

listenerInfo

null

要在 KafkaHeaders.LISTENER_INFO 標頭中填入的值。對於 @KafkaListener,此值是從 info 屬性取得的。此標頭可用於各種位置,例如 RecordInterceptorRecordFilterStrategy 和監聽器程式碼本身。

pauseRequested

(唯讀)

如果已請求 Consumer 暫停,則為 True。

recordInterceptor

null

設定要在調用記錄監聽器之前調用的 RecordInterceptor;不適用於批次監聽器。另請參閱 interceptBeforeTx

topicCheckTimeout

30 秒

missingTopicsFatal 容器屬性為 true 時,等待 describeTopics 作業完成的時間 (以秒為單位)。

表 3. KafkaMessageListenerContainer 屬性
屬性 預設值 描述

assignedPartitions

(唯讀)

目前指派給此容器的分割區 (明確或非明確)。

assignedPartitionsByClientId

(唯讀)

目前指派給此容器的分割區 (明確或非明確)。

clientIdSuffix

null

由並行容器使用,為每個子容器的 Consumer 提供唯一的 client.id

containerPaused

不適用

如果已請求暫停且 Consumer 實際上已暫停,則為 True。

表 4. ConcurrentMessageListenerContainer 屬性
屬性 預設值 描述

alwaysClientIdSuffix

true

concurrency 僅為 1 時,設定為 false 以禁止將後綴新增至 client.id Consumer 屬性。

assignedPartitions

(唯讀)

目前指派給此容器的子 KafkaMessageListenerContainer 的分割區彙總 (明確或非明確)。

assignedPartitionsByClientId

(唯讀)

目前指派給此容器的子 KafkaMessageListenerContainer 的分割區 (明確或非明確),依子容器 Consumer 的 client.id 屬性鍵控。

concurrency

1

要管理的子 KafkaMessageListenerContainer 的數量。

containerPaused

不適用

如果已請求暫停且所有子容器的 Consumer 實際上都已暫停,則為 True。

containers

不適用

對所有子 KafkaMessageListenerContainer 的參考。