訊息監聽器容器
提供了兩種 MessageListenerContainer
實作
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
從單一執行緒上的所有主題或分割區接收所有訊息。ConcurrentMessageListenerContainer
委派給一個或多個 KafkaMessageListenerContainer
實例,以提供多執行緒消費。
從 2.2.7 版開始,您可以將 RecordInterceptor
新增至監聽器容器;它將在呼叫監聽器之前調用,允許檢查或修改記錄。如果攔截器返回 null,則不會呼叫監聽器。從 2.7 版開始,它具有在監聽器退出後(正常或透過拋出例外)調用的其他方法。此外,從 2.7 版開始,現在有一個 BatchInterceptor
,為批次監聽器提供類似的功能。此外,ConsumerAwareRecordInterceptor
(和 BatchInterceptor
)提供對 Consumer<?, ?>
的存取權。例如,這可用於存取攔截器中的 Consumer 指標。
您不應在這些攔截器中執行任何會影響 Consumer 位置和/或已提交偏移量的方法;容器需要管理此類資訊。 |
如果攔截器變更了記錄(透過建立新記錄),則 topic 、partition 和 offset 必須保持不變,以避免意外的副作用,例如記錄遺失。 |
CompositeRecordInterceptor
和 CompositeBatchInterceptor
可用於調用多個攔截器。
預設情況下,從 2.8 版開始,當使用交易時,攔截器會在交易開始之前調用。您可以將監聽器容器的 interceptBeforeTx
屬性設定為 false
,以便在交易開始後調用攔截器。從 2.9 版開始,這將適用於任何交易管理器,而不僅僅是 KafkaAwareTransactionManager
。例如,這允許攔截器參與由容器啟動的 JDBC 交易。
從 2.3.8、2.4.6 版開始,當並行性大於 1 時,ConcurrentMessageListenerContainer
現在支援靜態成員資格。group.instance.id
的字尾為 -n
,其中 n
從 1
開始。這與增加的 session.timeout.ms
一起使用,可用於減少重新平衡事件,例如,當應用程式實例重新啟動時。
使用 KafkaMessageListenerContainer
以下建構子可用
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它在 ContainerProperties
物件中接收 ConsumerFactory
以及關於主題和分割區的資訊,以及其他設定。ContainerProperties
具有以下建構子
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一個建構子採用 TopicPartitionOffset
參數陣列,明確指示容器要使用的分割區(使用 Consumer assign()
方法)以及可選的初始偏移量。預設情況下,正值是絕對偏移量。預設情況下,負值相對於分割區內的目前最後偏移量。提供了採用額外 boolean
參數的 TopicPartitionOffset
建構子。如果這是 true
,則初始偏移量(正數或負數)相對於此 Consumer 的目前位置。偏移量在容器啟動時套用。第二個建構子採用主題陣列,Kafka 根據 group.id
屬性分配分割區 — 跨群組分配分割區。第三個建構子使用 regex Pattern
來選擇主題。
若要將 MessageListener
指派給容器,您可以在建立容器時使用 ContainerProps.setMessageListener
方法。以下範例示範如何執行此操作
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
請注意,在建立 DefaultKafkaConsumerFactory
時,使用僅採用屬性的建構子(如上所述)表示從設定中選取索引鍵和值 Deserializer
類別。或者,Deserializer
實例可以傳遞至索引鍵和/或值的 DefaultKafkaConsumerFactory
建構子,在這種情況下,所有 Consumer 共用相同的實例。另一個選項是提供 Supplier<Deserializer>
s(從 2.3 版開始),它將用於為每個 Consumer
取得個別的 Deserializer
實例
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
請參閱 ContainerProperties
的 Javadoc,以取得關於您可以設定的各種屬性的更多資訊。
自 2.1.1 版以來,提供了一個名為 logContainerConfig
的新屬性。當 true
且啟用 INFO
日誌記錄時,每個監聽器容器都會寫入日誌訊息,總結其設定屬性。
預設情況下,主題偏移量提交的日誌記錄在 DEBUG
日誌記錄層級執行。從 2.1.2 版開始,ContainerProperties
中名為 commitLogLevel
的屬性可讓您指定這些訊息的日誌層級。例如,若要將日誌層級變更為 INFO
,您可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
。
從 2.2 版開始,新增了一個名為 missingTopicsFatal
的新容器屬性(預設值:自 2.3.4 版起為 false
)。如果任何已設定的主題在 Broker 上不存在,則這會阻止容器啟動。如果容器設定為監聽主題模式 (regex),則不適用。先前,容器執行緒在 consumer.poll()
方法中循環,等待主題出現,同時記錄許多訊息。除了日誌之外,沒有任何跡象表明存在問題。
從 2.8 版開始,引入了一個新的容器屬性 authExceptionRetryInterval
。這會導致容器在從 KafkaConsumer
取得任何 AuthenticationException
或 AuthorizationException
後重試提取訊息。例如,當設定的使用者被拒絕存取讀取特定主題或認證不正確時,可能會發生這種情況。定義 authExceptionRetryInterval
允許容器在授予適當的權限時恢復。
預設情況下,未設定任何間隔 - 驗證和授權錯誤被視為致命錯誤,這會導致容器停止。 |
從 2.8 版開始,當建立 Consumer Factory 時,如果您以物件形式提供 Deserializer(在建構子中或透過 setter),Factory 將調用 configure()
方法,以使用設定屬性來設定它們。
使用 ConcurrentMessageListenerContainer
單一建構子與 KafkaListenerContainer
建構子類似。以下清單顯示建構子的簽章
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它也具有 concurrency
屬性。例如,container.setConcurrency(3)
會建立三個 KafkaMessageListenerContainer
實例。
對於第一個建構子,Kafka 使用其群組管理功能在 Consumer 之間分配分割區。
當監聽多個主題時,預設分割區分配可能不是您預期的。例如,如果您有三個主題,每個主題有五個分割區,並且您想要使用 當使用 Spring Boot 時,您可以如下所示指派策略
|
當使用 TopicPartitionOffset
設定容器屬性時,ConcurrentMessageListenerContainer
會在委派 KafkaMessageListenerContainer
實例之間分配 TopicPartitionOffset
實例。
例如,如果提供了六個 TopicPartitionOffset
實例,且 concurrency
為 3
;則每個容器取得兩個分割區。對於五個 TopicPartitionOffset
實例,兩個容器取得兩個分割區,而第三個容器取得一個分割區。如果 concurrency
大於 TopicPartitions
的數量,則會向下調整 concurrency
,以便每個容器取得一個分割區。
client.id 屬性(如果已設定)會附加 -n ,其中 n 是對應於並行性的 Consumer 實例。當啟用 JMX 時,這對於為 MBean 提供唯一名稱是必要的。 |
從 1.3 版開始,MessageListenerContainer
提供對底層 KafkaConsumer
指標的存取權。在 ConcurrentMessageListenerContainer
的情況下,metrics()
方法會傳回所有目標 KafkaMessageListenerContainer
實例的指標。指標依 client-id
分組到 Map<MetricName, ? extends Metric>
中,該 client-id
是為底層 KafkaConsumer
提供的。
從 2.3 版開始,ContainerProperties
提供 idleBetweenPolls
選項,讓監聽器容器中的主迴圈在 KafkaConsumer.poll()
呼叫之間休眠。實際的休眠間隔會選取為所提供選項中的最小值,以及 max.poll.interval.ms
Consumer 設定與目前記錄批次處理時間之間的差異。
提交偏移量
提供了多個選項來提交偏移量。如果 enable.auto.commit
Consumer 屬性為 true
,則 Kafka 會根據其設定自動提交偏移量。如果為 false
,則容器支援多個 AckMode
設定(在下一個清單中說明)。預設 AckMode
為 BATCH
。從 2.3 版開始,除非在設定中明確設定,否則架構會將 enable.auto.commit
設定為 false
。先前,如果未設定屬性,則會使用 Kafka 預設值 (true
)。
Consumer poll()
方法傳回一個或多個 ConsumerRecords
。針對每個記錄呼叫 MessageListener
。以下清單說明容器針對每個 AckMode
採取的動作(當未使用交易時)
-
RECORD
:在監聽器在處理記錄後返回時提交偏移量。 -
BATCH
:當已處理完poll()
返回的所有記錄時,提交偏移量。 -
TIME
:當已處理完poll()
返回的所有記錄時提交偏移量,只要自上次提交以來的ackTime
已超過。 -
COUNT
:當已處理完poll()
返回的所有記錄時提交偏移量,只要自上次提交以來已收到ackCount
記錄。 -
COUNT_TIME
:類似於TIME
和COUNT
,但如果任一條件為true
,則執行提交。 -
MANUAL
:訊息監聽器負責acknowledge()
Acknowledgment
。之後,套用與BATCH
相同的語義。 -
MANUAL_IMMEDIATE
:當監聽器呼叫Acknowledgment.acknowledge()
方法時立即提交偏移量。
當使用交易時,偏移量會傳送至交易,且語義等同於 RECORD
或 BATCH
,具體取決於監聽器類型(記錄或批次)。
MANUAL 和 MANUAL_IMMEDIATE 需要監聽器為 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener 。請參閱訊息監聽器。 |
根據 syncCommits
容器屬性,會使用 Consumer 上的 commitSync()
或 commitAsync()
方法。syncCommits
預設為 true
;另請參閱 setSyncCommitTimeout
。請參閱 setCommitCallback
以取得非同步提交的結果;預設回呼是 LoggingCommitCallback
,它會記錄錯誤(以及偵錯層級的成功)。
由於監聽器容器具有其自己的提交偏移量機制,因此它偏好 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
為 false
。從 2.3 版開始,除非在 Consumer Factory 或容器的 Consumer 屬性覆寫中明確設定,否則它會無條件地將其設定為 false。
Acknowledgment
具有以下方法
public interface Acknowledgment {
void acknowledge();
}
此方法讓監聽器控制何時提交偏移量。
從 2.3 版開始,Acknowledgment
介面有兩個額外的方法 nack(long sleep)
和 nack(int index, long sleep)
。第一個用於記錄監聽器,第二個用於批次監聽器。針對您的監聽器類型呼叫錯誤的方法會擲回 IllegalStateException
。
如果您想要提交部分批次,請使用 nack() 。當使用交易時,請將 AckMode 設定為 MANUAL ;調用 nack() 會將成功處理的記錄的偏移量傳送至交易。 |
nack() 只能在調用您的監聽器的 Consumer 執行緒上呼叫。 |
當使用亂序提交時,不允許使用 nack() 。 |
使用記錄監聽器時,當呼叫 nack()
時,會提交任何擱置中的偏移量,捨棄上次輪詢中的剩餘記錄,並在其分割區上執行搜尋,以便在下一次 poll()
上重新傳遞失敗的記錄和未處理的記錄。可以在重新傳遞之前暫停 Consumer,方法是設定 sleep
引數。這與在容器設定了 DefaultErrorHandler
時擲回例外的功能類似。
nack() 會暫停整個監聽器指定的休眠持續時間,包括所有指派的分割區。 |
當使用批次監聽器時,您可以指定批次中發生失敗的索引。當呼叫 nack()
時,將提交索引之前的記錄的偏移量,並在其分割區上執行失敗和捨棄記錄的搜尋,以便在下一次 poll()
上重新傳遞它們。
請參閱容器錯誤處理常式以取得更多資訊。
在休眠期間,Consumer 會暫停,以便我們繼續輪詢 Broker 以保持 Consumer 存活。實際的休眠時間及其解析度取決於容器的 pollTimeout ,預設值為 5 秒。最小休眠時間等於 pollTimeout ,所有休眠時間都將是它的倍數。對於較小的休眠時間,或為了提高其準確性,請考慮減少容器的 pollTimeout 。 |
從 3.0.10 版開始,批次監聽器可以使用 Acknowledgment
引數上的 acknowledge(index)
來提交批次部分的偏移量。當呼叫此方法時,將提交索引處的記錄(以及所有先前的記錄)的偏移量。在執行部分批次提交後呼叫 acknowledge()
將提交批次剩餘部分的偏移量。以下限制適用
-
需要
AckMode.MANUAL_IMMEDIATE
-
必須在監聽器執行緒上呼叫該方法
-
監聽器必須使用
List
而不是原始ConsumerRecords
-
索引必須在清單元素範圍內
-
索引必須大於先前呼叫中使用的索引
強制執行這些限制,並且該方法將擲回 IllegalArgumentException
或 IllegalStateException
,具體取決於違規情況。