訊息監聽器容器

提供了兩種 MessageListenerContainer 實作

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 從單一執行緒上的所有主題或分割區接收所有訊息。ConcurrentMessageListenerContainer 委派給一個或多個 KafkaMessageListenerContainer 實例,以提供多執行緒消費。

從 2.2.7 版開始,您可以將 RecordInterceptor 新增至監聽器容器;它將在呼叫監聽器之前調用,允許檢查或修改記錄。如果攔截器返回 null,則不會呼叫監聽器。從 2.7 版開始,它具有在監聽器退出後(正常或透過拋出例外)調用的其他方法。此外,從 2.7 版開始,現在有一個 BatchInterceptor,為批次監聽器提供類似的功能。此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供對 Consumer<?, ?> 的存取權。例如,這可用於存取攔截器中的 Consumer 指標。

您不應在這些攔截器中執行任何會影響 Consumer 位置和/或已提交偏移量的方法;容器需要管理此類資訊。
如果攔截器變更了記錄(透過建立新記錄),則 topicpartitionoffset 必須保持不變,以避免意外的副作用,例如記錄遺失。

CompositeRecordInterceptorCompositeBatchInterceptor 可用於調用多個攔截器。

預設情況下,從 2.8 版開始,當使用交易時,攔截器會在交易開始之前調用。您可以將監聽器容器的 interceptBeforeTx 屬性設定為 false,以便在交易開始後調用攔截器。從 2.9 版開始,這將適用於任何交易管理器,而不僅僅是 KafkaAwareTransactionManager。例如,這允許攔截器參與由容器啟動的 JDBC 交易。

從 2.3.8、2.4.6 版開始,當並行性大於 1 時,ConcurrentMessageListenerContainer 現在支援靜態成員資格group.instance.id 的字尾為 -n,其中 n1 開始。這與增加的 session.timeout.ms 一起使用,可用於減少重新平衡事件,例如,當應用程式實例重新啟動時。

使用 KafkaMessageListenerContainer

以下建構子可用

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它在 ContainerProperties 物件中接收 ConsumerFactory 以及關於主題和分割區的資訊,以及其他設定。ContainerProperties 具有以下建構子

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一個建構子採用 TopicPartitionOffset 參數陣列,明確指示容器要使用的分割區(使用 Consumer assign() 方法)以及可選的初始偏移量。預設情況下,正值是絕對偏移量。預設情況下,負值相對於分割區內的目前最後偏移量。提供了採用額外 boolean 參數的 TopicPartitionOffset 建構子。如果這是 true,則初始偏移量(正數或負數)相對於此 Consumer 的目前位置。偏移量在容器啟動時套用。第二個建構子採用主題陣列,Kafka 根據 group.id 屬性分配分割區 — 跨群組分配分割區。第三個建構子使用 regex Pattern 來選擇主題。

若要將 MessageListener 指派給容器,您可以在建立容器時使用 ContainerProps.setMessageListener 方法。以下範例示範如何執行此操作

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

請注意,在建立 DefaultKafkaConsumerFactory 時,使用僅採用屬性的建構子(如上所述)表示從設定中選取索引鍵和值 Deserializer 類別。或者,Deserializer 實例可以傳遞至索引鍵和/或值的 DefaultKafkaConsumerFactory 建構子,在這種情況下,所有 Consumer 共用相同的實例。另一個選項是提供 Supplier<Deserializer> s(從 2.3 版開始),它將用於為每個 Consumer 取得個別的 Deserializer 實例

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

請參閱 ContainerPropertiesJavadoc,以取得關於您可以設定的各種屬性的更多資訊。

自 2.1.1 版以來,提供了一個名為 logContainerConfig 的新屬性。當 true 且啟用 INFO 日誌記錄時,每個監聽器容器都會寫入日誌訊息,總結其設定屬性。

預設情況下,主題偏移量提交的日誌記錄在 DEBUG 日誌記錄層級執行。從 2.1.2 版開始,ContainerProperties 中名為 commitLogLevel 的屬性可讓您指定這些訊息的日誌層級。例如,若要將日誌層級變更為 INFO,您可以使用 containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

從 2.2 版開始,新增了一個名為 missingTopicsFatal 的新容器屬性(預設值:自 2.3.4 版起為 false)。如果任何已設定的主題在 Broker 上不存在,則這會阻止容器啟動。如果容器設定為監聽主題模式 (regex),則不適用。先前,容器執行緒在 consumer.poll() 方法中循環,等待主題出現,同時記錄許多訊息。除了日誌之外,沒有任何跡象表明存在問題。

從 2.8 版開始,引入了一個新的容器屬性 authExceptionRetryInterval。這會導致容器在從 KafkaConsumer 取得任何 AuthenticationExceptionAuthorizationException 後重試提取訊息。例如,當設定的使用者被拒絕存取讀取特定主題或認證不正確時,可能會發生這種情況。定義 authExceptionRetryInterval 允許容器在授予適當的權限時恢復。

預設情況下,未設定任何間隔 - 驗證和授權錯誤被視為致命錯誤,這會導致容器停止。

從 2.8 版開始,當建立 Consumer Factory 時,如果您以物件形式提供 Deserializer(在建構子中或透過 setter),Factory 將調用 configure() 方法,以使用設定屬性來設定它們。

使用 ConcurrentMessageListenerContainer

單一建構子與 KafkaListenerContainer 建構子類似。以下清單顯示建構子的簽章

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它也具有 concurrency 屬性。例如,container.setConcurrency(3) 會建立三個 KafkaMessageListenerContainer 實例。

對於第一個建構子,Kafka 使用其群組管理功能在 Consumer 之間分配分割區。

當監聽多個主題時,預設分割區分配可能不是您預期的。例如,如果您有三個主題,每個主題有五個分割區,並且您想要使用 concurrency=15,您只會看到五個作用中的 Consumer,每個 Consumer 從每個主題指派一個分割區,而其他 10 個 Consumer 處於閒置狀態。這是因為預設 Kafka PartitionAssignorRangeAssignor(請參閱其 Javadoc)。對於這種情況,您可能想要考慮改用 RoundRobinAssignor,它會在所有 Consumer 之間分配分割區。然後,每個 Consumer 都會指派一個主題或分割區。若要變更 PartitionAssignor,您可以在提供給 DefaultKafkaConsumerFactory 的屬性中設定 partition.assignment.strategy Consumer 屬性 (ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

當使用 Spring Boot 時,您可以如下所示指派策略

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

當使用 TopicPartitionOffset 設定容器屬性時,ConcurrentMessageListenerContainer 會在委派 KafkaMessageListenerContainer 實例之間分配 TopicPartitionOffset 實例。

例如,如果提供了六個 TopicPartitionOffset 實例,且 concurrency3;則每個容器取得兩個分割區。對於五個 TopicPartitionOffset 實例,兩個容器取得兩個分割區,而第三個容器取得一個分割區。如果 concurrency 大於 TopicPartitions 的數量,則會向下調整 concurrency,以便每個容器取得一個分割區。

client.id 屬性(如果已設定)會附加 -n,其中 n 是對應於並行性的 Consumer 實例。當啟用 JMX 時,這對於為 MBean 提供唯一名稱是必要的。

從 1.3 版開始,MessageListenerContainer 提供對底層 KafkaConsumer 指標的存取權。在 ConcurrentMessageListenerContainer 的情況下,metrics() 方法會傳回所有目標 KafkaMessageListenerContainer 實例的指標。指標依 client-id 分組到 Map<MetricName, ? extends Metric> 中,該 client-id 是為底層 KafkaConsumer 提供的。

從 2.3 版開始,ContainerProperties 提供 idleBetweenPolls 選項,讓監聽器容器中的主迴圈在 KafkaConsumer.poll() 呼叫之間休眠。實際的休眠間隔會選取為所提供選項中的最小值,以及 max.poll.interval.ms Consumer 設定與目前記錄批次處理時間之間的差異。

提交偏移量

提供了多個選項來提交偏移量。如果 enable.auto.commit Consumer 屬性為 true,則 Kafka 會根據其設定自動提交偏移量。如果為 false,則容器支援多個 AckMode 設定(在下一個清單中說明)。預設 AckModeBATCH。從 2.3 版開始,除非在設定中明確設定,否則架構會將 enable.auto.commit 設定為 false。先前,如果未設定屬性,則會使用 Kafka 預設值 (true)。

Consumer poll() 方法傳回一個或多個 ConsumerRecords。針對每個記錄呼叫 MessageListener。以下清單說明容器針對每個 AckMode 採取的動作(當未使用交易時)

  • RECORD:在監聽器在處理記錄後返回時提交偏移量。

  • BATCH:當已處理完 poll() 返回的所有記錄時,提交偏移量。

  • TIME:當已處理完 poll() 返回的所有記錄時提交偏移量,只要自上次提交以來的 ackTime 已超過。

  • COUNT:當已處理完 poll() 返回的所有記錄時提交偏移量,只要自上次提交以來已收到 ackCount 記錄。

  • COUNT_TIME:類似於 TIMECOUNT,但如果任一條件為 true,則執行提交。

  • MANUAL:訊息監聽器負責 acknowledge() Acknowledgment。之後,套用與 BATCH 相同的語義。

  • MANUAL_IMMEDIATE:當監聽器呼叫 Acknowledgment.acknowledge() 方法時立即提交偏移量。

當使用交易時,偏移量會傳送至交易,且語義等同於 RECORDBATCH,具體取決於監聽器類型(記錄或批次)。

MANUALMANUAL_IMMEDIATE 需要監聽器為 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。請參閱訊息監聽器

根據 syncCommits 容器屬性,會使用 Consumer 上的 commitSync()commitAsync() 方法。syncCommits 預設為 true;另請參閱 setSyncCommitTimeout。請參閱 setCommitCallback 以取得非同步提交的結果;預設回呼是 LoggingCommitCallback,它會記錄錯誤(以及偵錯層級的成功)。

由於監聽器容器具有其自己的提交偏移量機制,因此它偏好 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGfalse。從 2.3 版開始,除非在 Consumer Factory 或容器的 Consumer 屬性覆寫中明確設定,否則它會無條件地將其設定為 false。

Acknowledgment 具有以下方法

public interface Acknowledgment {

    void acknowledge();

}

此方法讓監聽器控制何時提交偏移量。

從 2.3 版開始,Acknowledgment 介面有兩個額外的方法 nack(long sleep)nack(int index, long sleep)。第一個用於記錄監聽器,第二個用於批次監聽器。針對您的監聽器類型呼叫錯誤的方法會擲回 IllegalStateException

如果您想要提交部分批次,請使用 nack()。當使用交易時,請將 AckMode 設定為 MANUAL;調用 nack() 會將成功處理的記錄的偏移量傳送至交易。
nack() 只能在調用您的監聽器的 Consumer 執行緒上呼叫。
當使用亂序提交時,不允許使用 nack()

使用記錄監聽器時,當呼叫 nack() 時,會提交任何擱置中的偏移量,捨棄上次輪詢中的剩餘記錄,並在其分割區上執行搜尋,以便在下一次 poll() 上重新傳遞失敗的記錄和未處理的記錄。可以在重新傳遞之前暫停 Consumer,方法是設定 sleep 引數。這與在容器設定了 DefaultErrorHandler 時擲回例外的功能類似。

nack() 會暫停整個監聽器指定的休眠持續時間,包括所有指派的分割區。

當使用批次監聽器時,您可以指定批次中發生失敗的索引。當呼叫 nack() 時,將提交索引之前的記錄的偏移量,並在其分割區上執行失敗和捨棄記錄的搜尋,以便在下一次 poll() 上重新傳遞它們。

請參閱容器錯誤處理常式以取得更多資訊。

在休眠期間,Consumer 會暫停,以便我們繼續輪詢 Broker 以保持 Consumer 存活。實際的休眠時間及其解析度取決於容器的 pollTimeout,預設值為 5 秒。最小休眠時間等於 pollTimeout,所有休眠時間都將是它的倍數。對於較小的休眠時間,或為了提高其準確性,請考慮減少容器的 pollTimeout

從 3.0.10 版開始,批次監聽器可以使用 Acknowledgment 引數上的 acknowledge(index) 來提交批次部分的偏移量。當呼叫此方法時,將提交索引處的記錄(以及所有先前的記錄)的偏移量。在執行部分批次提交後呼叫 acknowledge() 將提交批次剩餘部分的偏移量。以下限制適用

  • 需要 AckMode.MANUAL_IMMEDIATE

  • 必須在監聽器執行緒上呼叫該方法

  • 監聽器必須使用 List 而不是原始 ConsumerRecords

  • 索引必須在清單元素範圍內

  • 索引必須大於先前呼叫中使用的索引

強制執行這些限制,並且該方法將擲回 IllegalArgumentExceptionIllegalStateException,具體取決於違規情況。

監聽器容器自動啟動

監聽器容器實作 SmartLifecycle,且 autoStartup 預設為 true。容器在較晚階段啟動 (Integer.MAX-VALUE - 100)。實作 SmartLifecycle 的其他元件(用於處理來自監聽器的資料)應在較早階段啟動。- 100 為稍後階段留下空間,以啟用在容器之後自動啟動的元件。