取用批次

從 3.0 版本開始,當 spring.cloud.stream.bindings.<name>.consumer.batch-mode 設定為 true 時,輪詢 Kafka Consumer 收到的所有記錄將以 List<?> 的形式呈現給接聽器方法。否則,該方法將一次呼叫一筆記錄。批次大小由 Kafka 消費者屬性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;請參閱 Kafka 文件以取得更多資訊。

接收批次時,允許以下類型簽章

List<Person>
Message<List<Person>>

List<Person> 的第一個選項中,接聽器將不會取得任何訊息標頭。如果使用第二個類型簽章 (Message<List<Person>>),則可以存取標頭;但是,所有標頭仍然以 Collection 的形式存在。讓我們看以下範例。

假設 Message 包含一個包含十個 Person 物件的列表。MessageMessageHeaders 包含一個標頭對應,其中鍵為標頭名稱,值為列表。此列表包含該標頭的標頭值,其順序與酬載列表相同。因此,應用程式必須根據酬載列表的迭代,從 MessageHeaders 對應中正確存取標頭。

請注意,以 List<Message<Person>> 形式的類型簽章在批次模式取用時不允許。

4.0.2 版本開始,binder 在批次模式取用時支援 DLQ 功能。請記住,當在批次模式下的消費者繫結上使用 DLQ 時,從先前輪詢接收的所有記錄都將傳遞到 DLQ 主題。

當使用批次模式時,binder 內部的重試不受支援,因此 maxAttempts 將被覆寫為 1。您可以組態 DefaultErrorHandler (使用 ListenerContainerCustomizer) 以實現與 binder 中重試類似的功能。您也可以使用手動 AckMode 並呼叫 Ackowledgment.nack(index, sleep) 來提交部分批次的偏移量,並重新傳遞剩餘的記錄。請參閱 Spring for Apache Kafka 文件,以取得有關這些技術的更多資訊。
當在批次模式下接收 KafkaNull 物件時,接收到的列表將包含對應 KafkaNull 物件的 null 元素。對於 List<Person>Message<List<Person>> 樣式的類型簽章,情況皆是如此。

批次模式取用時的可觀察性

當以批次方式取用記錄時,不直接支援觀察追蹤傳播功能。這是因為 Kafka binder 使用的 Spring for Apache Kafka 程式庫不支援批次接聽器上的追蹤;它僅支援記錄接聽器。在批次接聽器中,接收到的記錄可能來自多個主題/分割區以及多個生產者,在這些生產者中,新增追蹤資訊是可選的。由於批次中的記錄之間可能沒有任何關聯性,因此架構無法對追蹤它們做出任何假設,例如將它們作為單一追蹤 ID 等提供。如果您使用 Message<List<String>> 的類型簽章,則可以取得一個名為 kafka_batchConvertedHeaders 的標頭,其中包含一個列表,其條目數與您的酬載相同。此列表具有一個 Map,其中包含追蹤標頭。但是,應用程式必須正確地迭代此列表並開始觀察。