取用批次
從 3.0 版本開始,當 spring.cloud.stream.bindings.<name>.consumer.batch-mode
設定為 true
時,輪詢 Kafka Consumer
收到的所有記錄將以 List<?>
的形式呈現給接聽器方法。否則,該方法將一次呼叫一筆記錄。批次大小由 Kafka 消費者屬性 max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
控制;請參閱 Kafka 文件以取得更多資訊。
接收批次時,允許以下類型簽章
List<Person>
Message<List<Person>>
在 List<Person>
的第一個選項中,接聽器將不會取得任何訊息標頭。如果使用第二個類型簽章 (Message<List<Person>>
),則可以存取標頭;但是,所有標頭仍然以 Collection
的形式存在。讓我們看以下範例。
假設 Message
包含一個包含十個 Person
物件的列表。Message
的 MessageHeaders
包含一個標頭對應,其中鍵為標頭名稱,值為列表。此列表包含該標頭的標頭值,其順序與酬載列表相同。因此,應用程式必須根據酬載列表的迭代,從 MessageHeaders
對應中正確存取標頭。
請注意,以 List<Message<Person>>
形式的類型簽章在批次模式取用時不允許。
從 4.0.2
版本開始,binder 在批次模式取用時支援 DLQ 功能。請記住,當在批次模式下的消費者繫結上使用 DLQ 時,從先前輪詢接收的所有記錄都將傳遞到 DLQ 主題。
當使用批次模式時,binder 內部的重試不受支援,因此 maxAttempts 將被覆寫為 1。您可以組態 DefaultErrorHandler (使用 ListenerContainerCustomizer ) 以實現與 binder 中重試類似的功能。您也可以使用手動 AckMode 並呼叫 Ackowledgment.nack(index, sleep) 來提交部分批次的偏移量,並重新傳遞剩餘的記錄。請參閱 Spring for Apache Kafka 文件,以取得有關這些技術的更多資訊。 |
當在批次模式下接收 KafkaNull 物件時,接收到的列表將包含對應 KafkaNull 物件的 null 元素。對於 List<Person> 和 Message<List<Person>> 樣式的類型簽章,情況皆是如此。 |
批次模式取用時的可觀察性
當以批次方式取用記錄時,不直接支援觀察追蹤傳播功能。這是因為 Kafka binder 使用的 Spring for Apache Kafka 程式庫不支援批次接聽器上的追蹤;它僅支援記錄接聽器。在批次接聽器中,接收到的記錄可能來自多個主題/分割區以及多個生產者,在這些生產者中,新增追蹤資訊是可選的。由於批次中的記錄之間可能沒有任何關聯性,因此架構無法對追蹤它們做出任何假設,例如將它們作為單一追蹤 ID 等提供。如果您使用 Message<List<String>>
的類型簽章,則可以取得一個名為 kafka_batchConvertedHeaders
的標頭,其中包含一個列表,其條目數與您的酬載相同。此列表具有一個 Map
,其中包含追蹤標頭。但是,應用程式必須正確地迭代此列表並開始觀察。