變更記錄
自 3.0 以來 3.1 的新功能
本節涵蓋從 3.0 版到 3.1 版所做的變更。如需先前版本的變更,請參閱變更記錄。
EmbeddedKafkaBroker
現在提供額外的實作來使用 Kraft
而非 Zookeeper。如需更多資訊,請參閱嵌入式 Kafka Broker。
JsonDeserializer
當發生還原序列化例外時,SerializationException
訊息不再包含格式為 Can’t deserialize data [[123, 34, 98, 97, 122, …
的資料;每個資料位元組的數值陣列沒有用處,而且對於大型資料可能會很冗長。當與 ErrorHandlingDeserializer
搭配使用時,傳送至錯誤處理常式的 DeserializationException
包含 data
屬性,其中包含無法還原序列化的原始資料。當不與 ErrorHandlingDeserializer
搭配使用時,KafkaConsumer
將持續針對相同的記錄發出例外,顯示主題/分割區/偏移量以及 Jackson 擲回的原因。
ContainerPostProcessor
透過在 @KafkaListener
註解上指定 ContainerPostProcessor
的 Bean 名稱,可以在接聽器容器上套用後處理。這會在建立容器之後,以及在容器 Factory 上設定的任何已組態 ContainerCustomizer
之後發生。如需更多資訊,請參閱容器 Factory。
ErrorHandlingDeserializer
您現在可以將 Validator
新增至此還原序列化程式;如果委派 Deserializer
成功還原序列化物件,但該物件未通過驗證,則會擲回類似於還原序列化例外發生的例外。這允許將原始原始資料傳遞至錯誤處理常式。如需更多資訊,請參閱使用 ErrorHandlingDeserializer
。
可重試主題
當 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
時,將字尾 -retry-5000
變更為 -retry
。如果您想要保留字尾 -retry-5000
,請使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
。如需更多資訊,請參閱主題命名。
接聽器容器變更
當手動指派分割區時,使用 null
消費者 group.id
,AckMode
現在會自動強制轉換為 MANUAL
。如需更多資訊,請參閱手動指派所有分割區。
自 2.9 以來 3.0 的新功能
Observation
現在支援使用 Micrometer 啟用計時器和追蹤的觀察。如需更多資訊,請參閱Observation。
原生映像檔
提供建立原生映像檔的支援。如需更多資訊,請參閱原生映像檔。
全域單一嵌入式 Kafka
嵌入式 Kafka (EmbeddedKafkaBroker
) 現在可以作為整個測試計畫的單一全域執行個體啟動。如需更多資訊,請參閱針對多個測試類別使用相同的 Broker。
可重試主題變更
此功能不再被視為實驗性功能 (就其 API 而言),該功能本身自 2.7 版起即已支援,但 API 變更的可能性高於正常情況。
非阻塞重試基礎結構 Bean 的引導已在此版本中變更,以避免某些應用程式在應用程式初始化方面發生的計時問題。
您現在可以為重試容器設定不同的 concurrency
;依預設,concurrency 與主要容器相同。
@RetryableTopic
現在可以用作自訂註解上的 Meta 註解,包括支援 @AliasFor
屬性。
如需更多資訊,請參閱組態。
重試主題的預設複寫因數現在為 -1
(使用 Broker 預設值)。如果您的 Broker 早於 2.4 版,您現在將需要明確設定屬性。
您現在可以在相同的應用程式內容中,針對相同的主題設定多個 @RetryableTopic
接聽器。先前,這是無法做到的。如需更多資訊,請參閱多個接聽器,相同主題。
RetryTopicConfigurationSupport
中有重大 API 變更;具體而言,如果您覆寫 destinationTopicResolver
、kafkaConsumerBackoffManager
和/或 retryTopicConfigurer
的 Bean 定義方法;這些方法現在需要 ObjectProvider<RetryTopicComponentFactory>
參數。
接聽器容器變更
容器現在會發佈與消費者驗證和授權失敗相關的事件。如需更多資訊,請參閱應用程式事件。
您現在可以自訂消費者執行緒使用的執行緒名稱。如需更多資訊,請參閱容器執行緒命名。
已新增容器屬性 restartAfterAuthException
。如需更多資訊,請參閱接聽器容器屬性。
KafkaTemplate
變更
此類別傳回的 Future 現在是 CompletableFuture
,而不是 ListenableFuture
。請參閱使用 KafkaTemplate
。
ReplyingKafkaTemplate
變更
此類別傳回的 Future 現在是 CompletableFuture
,而不是 ListenableFuture
。請參閱使用 ReplyingKafkaTemplate
和 使用 Message<?>
交換訊息。
@KafkaListener
變更
您現在可以使用自訂關聯標頭,該標頭將在任何回覆訊息中回顯。如需更多資訊,請參閱使用 ReplyingKafkaTemplate
結尾的附註。
您現在可以在處理完整批次之前,手動提交批次的部分。如需更多資訊,請參閱提交偏移量。
KafkaHeaders
變更
KafkaHeaders
中在 2.9.x 版中已棄用的四個常數現在已移除。
-
請勿使用
MESSAGE_KEY
,改用KEY
。 -
請勿使用
PARTITION_ID
,改用PARTITION
同樣地,RECEIVED_MESSAGE_KEY
由 RECEIVED_KEY
取代,而 RECEIVED_PARTITION_ID
由 RECEIVED_PARTITION
取代。
測試變更
3.0.7 版導入了 MockConsumerFactory
和 MockProducerFactory
。如需更多資訊,請參閱模擬消費者和 Producer。
從 3.0.10 版開始,依預設,嵌入式 Kafka Broker 會將 Spring Boot 屬性 spring.kafka.bootstrap-servers
設定為嵌入式 Broker 的位址。
自 2.8 以來 2.9 的新功能
錯誤處理常式變更
現在可以將 DefaultErrorHandler
設定為暫停容器一次輪詢,並使用先前輪詢中的剩餘結果,而不是搜尋剩餘記錄的偏移量。如需更多資訊,請參閱DefaultErrorHandler。
DefaultErrorHandler
現在具有 BackOffHandler
屬性。如需更多資訊,請參閱Back Off 處理常式。
接聽器容器變更
interceptBeforeTx
現在適用於所有交易管理員 (先前僅在使用 KafkaAwareTransactionManager
時套用)。請參閱 [interceptBeforeTx]。
提供新的容器屬性 pauseImmediate
,允許容器在處理目前記錄後暫停消費者,而不是在處理完先前輪詢中的所有記錄後暫停。請參閱 [pauseImmediate]。
與消費者驗證和授權相關的事件
標頭對應器變更
您現在可以設定應對應哪些輸入標頭。在 2.8.8 版或更新版本中也提供。如需更多資訊,請參閱訊息標頭。
KafkaTemplate
變更
在 3.0 版中,此類別傳回的 Future 將會是 CompletableFuture
,而不是 ListenableFuture
。當使用此版本進行轉換時,請參閱使用 KafkaTemplate
以取得協助。
ReplyingKafkaTemplate
變更
範本現在提供一種方法來等待回覆容器上的指派,以避免在回覆容器初始化之前傳送要求時發生競爭狀況。在 2.8.8 版或更新版本中也提供。請參閱使用 ReplyingKafkaTemplate
。
在 3.0 版中,此類別傳回的 Future 將會是 CompletableFuture
,而不是 ListenableFuture
。當使用此版本進行轉換時,請參閱使用 ReplyingKafkaTemplate
和 使用 Message<?>
交換訊息 以取得協助。
自 2.7 以來 2.8 的新功能
本節涵蓋從 2.7 版到 2.8 版所做的變更。如需先前版本的變更,請參閱變更記錄。
套件變更
與類型對應相關的類別和介面已從 …support.converter
移至 …support.mapping
。
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
無序手動提交
現在可以將接聽器容器設定為接受無序 (通常為非同步) 的手動偏移量提交。容器將延遲提交,直到確認遺失的偏移量為止。如需更多資訊,請參閱手動提交偏移量。
@KafkaListener
變更
現在可以指定接聽器方法是否為方法本身的批次接聽器。這允許相同的容器 Factory 用於記錄和批次接聽器。
如需更多資訊,請參閱 [batch-listeners]。
批次接聽器現在可以處理轉換例外。
如需更多資訊,請參閱批次錯誤處理常式的轉換錯誤。
RecordFilterStrategy
在與批次接聽器搭配使用時,現在可以在一次呼叫中篩選整個批次。如需更多資訊,請參閱 [batch-listeners] 結尾的附註。
@KafkaListener
註解現在具有 filter
屬性,可僅針對此接聽器覆寫容器 Factory 的 RecordFilterStrategy
。
KafkaTemplate
變更
您現在可以接收單一記錄,前提是已指定主題、分割區和偏移量。如需更多資訊,請參閱使用 KafkaTemplate
接收。
新增 CommonErrorHandler
記錄和批次接聽器的舊版 GenericErrorHandler
及其子介面階層已由新的單一介面 CommonErrorHandler
取代,其實作對應於 GenericErrorHandler
的大多數舊版實作。如需更多資訊,請參閱容器錯誤處理常式和 將自訂舊版錯誤處理常式實作移轉至 CommonErrorHandler
。
接聽器容器變更
interceptBeforeTx
容器屬性現在預設為 true
。
authorizationExceptionRetryInterval
屬性已重新命名為 authExceptionRetryInterval
,現在除了先前的 AuthorizationException
之外,也適用於 AuthenticationException
。這兩個例外都被視為嚴重錯誤,除非設定此屬性,否則容器預設會停止。
如需更多資訊,請參閱使用 KafkaMessageListenerContainer
和 接聽器容器屬性。
序列化程式/還原序列化程式變更
現在提供 DelegatingByTopicSerializer
和 DelegatingByTopicDeserializer
。如需更多資訊,請參閱委派序列化程式和還原序列化程式。
DeadLetterPublishingRecover
變更
屬性 stripPreviousExceptionHeaders
現在預設為 true
。
現在有多種技術可以自訂哪些標頭會新增至輸出記錄。
如需更多資訊,請參閱管理 Dead Letter 記錄標頭。
可重試主題變更
現在您可以將相同的 Factory 用於可重試和不可重試主題。如需更多資訊,請參閱指定 ListenerContainerFactory。
現在有一個可管理的嚴重錯誤例外全域清單,可讓失敗的記錄直接進入 DLT。請參閱例外分類器以瞭解如何管理它。
您現在可以同時使用阻塞和非阻塞重試。如需更多資訊,請參閱結合阻塞和非阻塞重試。
使用可重試主題功能時擲回的 KafkaBackOffException 現在會在 DEBUG 層級記錄。如果您需要將記錄層級變更回 WARN 或將其設定為任何其他層級,請參閱變更 KafkaBackOffException 記錄層級。
2.6 和 2.7 之間的變更
Kafka Client 版本
此版本需要 2.7.0 kafka-clients
。它也與 2.8.0 Client 相容,因為版本 2.7.1;請參閱覆寫 Spring Boot 相依性。
使用主題的非阻塞延遲重試
此版本中新增了這項重要的全新功能。當嚴格排序不重要時,失敗的傳遞可以傳送至另一個主題,以便稍後取用。可以組態一系列此類重試主題,並增加延遲。如需更多資訊,請參閱非阻塞重試。
接聽器容器變更
onlyLogRecordMetadata
容器屬性現在預設為 true
。
新的容器屬性 stopImmediate
現在可用。
如需更多資訊,請參閱接聽器容器屬性。
在傳遞嘗試之間使用 BackOff
的錯誤處理常式 (例如 SeekToCurrentErrorHandler
和 DefaultAfterRollbackProcessor
) 現在會在容器停止後立即結束 Back Off 間隔,而不是延遲停止。
擴充 FailedRecordProcessor
的錯誤處理常式和回溯後處理器現在可以使用一或多個 RetryListener
來接收有關重試和復原進度的資訊。
RecordInterceptor
現在具有在接聽器傳回 (正常或透過擲回例外) 後呼叫的其他方法。它也具有子介面 ConsumerAwareRecordInterceptor
。此外,現在還有批次接聽器的 BatchInterceptor
。如需更多資訊,請參閱訊息接聽器容器。
@KafkaListener
變更
您現在可以驗證 @KafkaHandler
方法 (類別層級接聽器) 的 Payload 參數。如需更多資訊,請參閱@KafkaListener
@Payload
驗證。
您現在可以在 MessagingMessageConverter
和 BatchMessagingMessageConverter
上設定 rawRecordHeader
屬性,這會導致原始 ConsumerRecord
新增至轉換後的 Message<?>
。例如,如果您想要在接聽器錯誤處理常式中使用 DeadLetterPublishingRecoverer
,這會很有用。如需更多資訊,請參閱接聽器錯誤處理常式。
您現在可以在應用程式初始化期間修改 @KafkaListener
註解。如需更多資訊,請參閱@KafkaListener
屬性修改。
DeadLetterPublishingRecover
變更
現在,如果索引鍵和值都無法還原序列化,則原始值會發佈至 DLT。先前,值已填入,但索引鍵 DeserializationException
仍保留在標頭中。如果您將 Recoverer 子類別化並覆寫 createProducerRecord
方法,則會有重大 API 變更。
此外,Recoverer 會在發佈至目的地分割區之前,驗證目的地解析器選取的分割區是否確實存在。
如需更多資訊,請參閱發佈 Dead-letter 記錄。
ChainedKafkaTransactionManager
已棄用
如需更多資訊,請參閱交易。
ReplyingKafkaTemplate
變更
現在有一種機制可以檢查回覆,並且在存在某些條件時,會異常地使 Future 失敗。
已新增傳送和接收 spring-messaging
Message<?>
的支援。
如需更多資訊,請參閱使用 ReplyingKafkaTemplate
。
Kafka Streams 變更
依預設,StreamsBuilderFactoryBean
現在設定為不清除本機狀態。如需更多資訊,請參閱組態。
KafkaAdmin
變更
已新增方法 createOrModifyTopics
和 describeTopics
。已新增 KafkaAdmin.NewTopics
以協助在單一 Bean 中組態多個主題。如需更多資訊,請參閱 [configuring-topics]。
MessageConverter
變更
現在可以將 spring-messaging
SmartMessageConverter
新增至 MessagingMessageConverter
,允許根據 contentType
標頭進行內容協商。如需更多資訊,請參閱Spring Messaging 訊息轉換。
排序 @KafkaListener
如需更多資訊,請參閱依序啟動 @KafkaListener
。
ExponentialBackOffWithMaxRetries
提供新的 BackOff
實作,使其更方便組態最大重試次數。如需更多資訊,請參閱ExponentialBackOffWithMaxRetries
實作。
條件式委派錯誤處理常式
可以將這些新的錯誤處理常式設定為委派給不同的錯誤處理常式,具體取決於例外類型。如需更多資訊,請參閱委派錯誤處理常式。
2.5 和 2.6 之間的變更
接聽器容器變更
預設 EOSMode
現在為 BETA
。如需更多資訊,請參閱精確一次語意。
各種錯誤處理常式 (擴充 FailedRecordProcessor
) 和 DefaultAfterRollbackProcessor
現在會在復原失敗時重設 BackOff
。此外,您現在可以根據失敗的記錄和/或例外選取要使用的 BackOff
。
您現在可以在容器屬性中組態 adviceChain
。如需更多資訊,請參閱接聽器容器屬性。
當容器設定為發佈 ListenerContainerIdleEvent
時,它現在會在收到記錄時發佈 ListenerContainerNoLongerIdleEvent
,然後發佈閒置事件。如需更多資訊,請參閱應用程式事件和 偵測閒置和無回應的消費者。
@KafkaListener 變更
使用手動分割區指派時,您現在可以指定萬用字元,以判斷應將哪些分割區重設為初始偏移量。此外,如果接聽器實作 ConsumerSeekAware
,則會在手動指派後呼叫 onPartitionsAssigned()
。(也在 2.5.5 版中新增)。如需更多資訊,請參閱明確分割區指派。
已將便利方法新增至 AbstractConsumerSeekAware
,以簡化搜尋。如需更多資訊,請參閱 [seek]。
ErrorHandler 變更
FailedRecordProcessor
的子類別 (例如 SeekToCurrentErrorHandler
、DefaultAfterRollbackProcessor
、RecoveringBatchErrorHandler
) 現在可以設定為在例外與先前使用此記錄發生的例外類型不同時,重設重試狀態。
Producer Factory 變更
您現在可以設定 Producer 的最大存留期,超過此存留期後,它們將會關閉並重新建立。如需更多資訊,請參閱交易。
您現在可以在建立 DefaultKafkaProducerFactory
後更新組態對應。例如,如果您必須在認證變更後更新 SSL 金鑰/信任儲存區位置,這可能會很有用。如需更多資訊,請參閱使用 DefaultKafkaProducerFactory
。
2.4 和 2.5 之間的變更
本節涵蓋從 2.4 版到 2.5 版所做的變更。如需先前版本的變更,請參閱變更記錄。
消費者/Producer Factory 變更
預設消費者和 Producer Factory 現在可以在每次建立或關閉消費者或 Producer 時叫用回呼。提供原生 Micrometer 指標的實作。如需更多資訊,請參閱Factory 接聽器。
您現在可以在執行階段變更 Bootstrap Server 屬性,以啟用容錯移轉至另一個 Kafka 叢集。如需更多資訊,請參閱連線至 Kafka。
StreamsBuilderFactoryBean
變更
Factory Bean 現在可以在每次建立或終結 KafkaStreams
時叫用回呼。提供原生 Micrometer 指標的實作。如需更多資訊,請參閱KafkaStreams Micrometer 支援。
傳遞嘗試標頭
現在有一個選項可以新增標頭,以追蹤在使用某些錯誤處理常式和回溯後處理器時的傳遞嘗試。如需更多資訊,請參閱傳遞嘗試標頭。
@KafkaListener 變更
如果 @KafkaListener
回傳類型為 Message<?>
,則預設回覆標頭現在將在需要時自動填入。請參閱 回覆類型 Message<?> 以取得更多資訊。
當傳入的記錄具有 null
鍵時,KafkaHeaders.RECEIVED_MESSAGE_KEY
不再填入 null
值;標頭將完全省略。
@KafkaListener
方法現在可以指定 ConsumerRecordMetadata
參數,而不是使用個別標頭來表示主題、分割區等中繼資料。請參閱 Consumer Record Metadata 以取得更多資訊。
Listener Container 變更
assignmentCommitOption
容器屬性現在預設為 LATEST_ONLY_NO_TX
。請參閱 Listener Container Properties 以取得更多資訊。
當使用交易時,subBatchPerPartition
容器屬性現在預設為 true
。請參閱 Transactions 以取得更多資訊。
現在提供新的 RecoveringBatchErrorHandler
。
現在支援靜態群組成員資格。請參閱 Message Listener Containers 以取得更多資訊。
當配置增量/合作重新平衡時,如果偏移量提交失敗並出現非致命的 RebalanceInProgressException
,容器將嘗試重新提交在重新平衡完成後仍分配給此實例的分割區的偏移量。
現在,預設錯誤處理常式對於記錄接聽器是 SeekToCurrentErrorHandler
,對於批次接聽器是 RecoveringBatchErrorHandler
。請參閱 Container Error Handlers 以取得更多資訊。
您現在可以控制標準錯誤處理常式有意拋出的例外記錄層級。請參閱 Container Error Handlers 以取得更多資訊。
已新增 getAssignmentsByClientId()
方法,使其更容易判斷並行容器中的哪些消費者被分配到哪些分割區。請參閱 Listener Container Properties 以取得更多資訊。
您現在可以禁止記錄錯誤、偵錯日誌等中的整個 ConsumerRecord
。請參閱 Listener Container Properties 中的 onlyLogRecordMetadata
。
KafkaTemplate 變更
KafkaTemplate
現在可以維護 Micrometer 計時器。請參閱 Monitoring 以取得更多資訊。
現在可以使用 ProducerConfig
屬性配置 KafkaTemplate
,以覆寫生產者工廠中的屬性。請參閱 使用 KafkaTemplate
以取得更多資訊。
現在已提供 RoutingKafkaTemplate
。請參閱 使用 RoutingKafkaTemplate
以取得更多資訊。
您現在可以使用 KafkaSendCallback
而不是 ListenerFutureCallback
來取得更窄的例外,使其更容易提取失敗的 ProducerRecord
。請參閱 使用 KafkaTemplate
以取得更多資訊。
Kafka String Serializer/Deserializer
現在提供新的 ToStringSerializer
/StringDeserializer
以及相關的 SerDe
。請參閱 String serialization 以取得更多資訊。
JsonDeserializer
JsonDeserializer
現在具有更大的彈性來決定反序列化類型。請參閱 使用方法來決定類型 以取得更多資訊。
Delegating Serializer/Deserializer
當傳出記錄沒有標頭時,DelegatingSerializer
現在可以處理「標準」類型。請參閱 Delegating Serializer and Deserializer 以取得更多資訊。
測試變更
KafkaTestUtils.consumerProps()
輔助記錄現在預設將 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
設定為 earliest
。請參閱 JUnit 以取得更多資訊。
2.3 與 2.4 之間的變更
ConsumerAwareRebalanceListener
與 ConsumerRebalanceListener
類似,此介面現在有一個額外的方法 onPartitionsLost
。請參閱 Apache Kafka 文件以取得更多資訊。
與 ConsumerRebalanceListener
不同,預設實作不會呼叫 onPartitionsRevoked
。相反地,接聽器容器將在呼叫 onPartitionsLost
之後呼叫該方法;因此,當實作 ConsumerAwareRebalanceListener
時,您不應執行相同的操作。
請參閱 Rebalancing Listeners 結尾的重要注意事項以取得更多資訊。
KafkaTemplate
KafkaTemplate
現在支援與交易式發布並行的非交易式發布。請參閱 KafkaTemplate
交易式和非交易式發布 以取得更多資訊。
AggregatingReplyingKafkaTemplate
releaseStrategy
現在是 BiConsumer
。它現在在逾時後(以及記錄到達時)被呼叫;在逾時後呼叫的情況下,第二個參數為 true
。
請參閱 彙總多個回覆 以取得更多資訊。
Listener Container
ContainerProperties
提供 authorizationExceptionRetryInterval
選項,讓接聽器容器在 KafkaConsumer
拋出任何 AuthorizationException
後重試。請參閱其 JavaDocs 和 使用 KafkaMessageListenerContainer
以取得更多資訊。
@KafkaListener
@KafkaListener
註釋有一個新的屬性 splitIterables
;預設為 true。當回覆接聽器傳回 Iterable
時,此屬性控制傳回結果是作為單一記錄傳送還是為每個元素傳送記錄。請參閱 使用 @SendTo
轉發接聽器結果 以取得更多資訊。
批次接聽器現在可以使用 BatchToRecordAdapter
進行配置;例如,這允許在交易中處理批次,而接聽器一次取得一個記錄。使用預設實作,可以使用 ConsumerRecordRecoverer
來處理批次內的錯誤,而不會停止整個批次的處理 - 這在使用交易時可能很有用。請參閱 具有批次接聽器的交易 以取得更多資訊。
Kafka Streams
StreamsBuilderFactoryBean
接受新的屬性 KafkaStreamsInfrastructureCustomizer
。這允許在建立串流之前配置建構器和/或拓撲。請參閱 Spring 管理 以取得更多資訊。
2.2 與 2.3 之間的變更
本節涵蓋從 2.2 版到 2.3 版所做的變更。
提示、技巧和範例
已新增新的章節 提示、技巧和範例。請提交 GitHub issue 和/或 pull request 以取得該章節中的其他條目。
組態變更
從 2.3.4 版開始,missingTopicsFatal
容器屬性預設為 false。當此屬性為 true 時,如果 broker 關閉,應用程式將無法啟動;許多使用者受到此變更的影響;鑑於 Kafka 是一個高可用性平台,我們預期在沒有活動 broker 的情況下啟動應用程式不會是常見的用例。
生產者和消費者工廠變更
現在可以將 DefaultKafkaProducerFactory
配置為為每個執行緒建立一個生產者。您也可以在建構函式中提供 Supplier<Serializer>
實例,作為配置類別(需要無引數建構函式)或使用 Serializer
實例建構的替代方案,這些實例隨後在所有生產者之間共用。請參閱 使用 DefaultKafkaProducerFactory
以取得更多資訊。
DefaultKafkaConsumerFactory
中的 Supplier<Deserializer>
實例也提供相同的選項。請參閱 使用 KafkaMessageListenerContainer
以取得更多資訊。
Listener Container 變更
先前,當使用接聽器配接器(例如 @KafkaListener
)調用接聽器時,錯誤處理常式會收到 ListenerExecutionFailedException
(實際接聽器例外作為 cause
)。原生 GenericMessageListener
拋出的例外會未經修改地傳遞給錯誤處理常式。現在 ListenerExecutionFailedException
始終是引數(實際接聽器例外作為 cause
),它提供對容器 group.id
屬性的存取權。
由於接聽器容器有自己的提交偏移機制,因此它偏好 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
為 false
。除非在消費者工廠或容器的消費者屬性覆寫中明確設定,否則它現在會自動將其設定為 false。
ackOnError
屬性現在預設為 false
。
現在可以在接聽器方法中取得消費者的 group.id
屬性。請參閱 取得消費者 group.id
以取得更多資訊。
容器有一個新的屬性 recordInterceptor
,允許在調用接聽器之前檢查或修改記錄。如果您需要調用多個攔截器,也提供了 CompositeRecordInterceptor
。請參閱 Message Listener Containers 以取得更多資訊。
ConsumerSeekAware
具有新的方法,允許您執行相對於開始、結束或目前位置的搜尋,以及搜尋大於或等於時間戳記的第一個偏移量。請參閱 [seek] 以取得更多資訊。
現在提供了一個方便的類別 AbstractConsumerSeekAware
來簡化搜尋。請參閱 [seek] 以取得更多資訊。
ContainerProperties
提供 idleBetweenPolls
選項,讓接聽器容器中的主迴圈在 KafkaConsumer.poll()
呼叫之間睡眠。請參閱其 JavaDocs 和 使用 KafkaMessageListenerContainer
以取得更多資訊。
當使用 AckMode.MANUAL
(或 MANUAL_IMMEDIATE
)時,您現在可以透過在 Acknowledgment
上呼叫 nack
來導致重新傳遞。請參閱 提交偏移量 以取得更多資訊。
現在可以使用 Micrometer Timer
來監控接聽器效能。請參閱 Monitoring 以取得更多資訊。
容器現在發布與啟動相關的其他消費者生命週期事件。請參閱 應用程式事件 以取得更多資訊。
交易式批次接聽器現在可以支援僵屍防護。請參閱 Transactions 以取得更多資訊。
現在可以使用 ContainerCustomizer
配置接聽器容器工廠,以便在建立和配置每個容器後進一步配置。請參閱 容器工廠 以取得更多資訊。
ErrorHandler 變更
SeekToCurrentErrorHandler
現在將某些例外視為致命,並停用這些例外的重試,在第一次失敗時調用 recoverer。
現在可以將 SeekToCurrentErrorHandler
和 SeekToCurrentBatchErrorHandler
配置為在傳遞嘗試之間應用 BackOff
(執行緒睡眠)。
從 2.3.2 版開始,當錯誤處理常式在復原失敗的記錄後傳回時,將提交復原記錄的偏移量。
DeadLetterPublishingRecoverer
在與 ErrorHandlingDeserializer
結合使用時,現在會將傳送到死信主題的訊息的 payload 設定為原始值,該值無法反序列化。先前,它是 null
,使用者程式碼需要從訊息標頭中提取 DeserializationException
。請參閱 發布死信記錄 以取得更多資訊。
TopicBuilder
提供了一個新的類別 TopicBuilder
,以便更方便地建立 NewTopic
@Bean
以進行自動主題佈建。請參閱 [configuring-topics] 以取得更多資訊。
Kafka Streams 變更
您現在可以對 @EnableKafkaStreams
建立的 StreamsBuilderFactoryBean
執行其他配置。請參閱 Streams 組態 以取得更多資訊。
現在提供 RecoveringDeserializationExceptionHandler
,它允許復原具有反序列化錯誤的記錄。它可以與 DeadLetterPublishingRecoverer
結合使用,以將這些記錄傳送到死信主題。請參閱 從反序列化例外中復原 以取得更多資訊。
已提供 HeaderEnricher
轉換器,使用 SpEL 來產生標頭值。請參閱 Header Enricher 以取得更多資訊。
已提供 MessagingTransformer
。這允許 Kafka streams 拓撲與 spring-messaging 元件(例如 Spring Integration flow)互動。請參閱 MessagingProcessor
和 [從 KStream
呼叫 Spring Integration Flow] 以取得更多資訊。
JSON 元件變更
現在所有 JSON 感知元件都預設使用 JacksonUtils.enhancedObjectMapper()
產生的 Jackson ObjectMapper
進行配置。JsonDeserializer
現在提供基於 TypeReference
的建構函式,以便更好地處理目標泛型容器類型。此外,還引入了 JacksonMimeTypeModule
用於將 org.springframework.util.MimeType
序列化為純字串。請參閱其 JavaDocs 和 序列化、反序列化和訊息轉換 以取得更多資訊。
已提供 ByteArrayJsonMessageConverter
以及所有 Json 轉換器的新超類別 JsonMessageConverter
。此外,現在可以使用 StringOrBytesSerializer
;它可以序列化 ProducerRecord
中的 byte[]
、Bytes
和 String
值。請參閱 Spring Messaging 訊息轉換 以取得更多資訊。
JsonSerializer
、JsonDeserializer
和 JsonSerde
現在具有流暢的 API,使程式化配置更簡單。請參閱 javadocs、序列化、反序列化和訊息轉換 以及 Streams JSON 序列化和反序列化 以取得更多資訊。
ReplyingKafkaTemplate
當回覆逾時時,future 將以 KafkaReplyTimeoutException
而不是 KafkaException
異常完成。
此外,現在提供了一個多載的 sendAndReceive
方法,允許在每個訊息的基礎上指定回覆逾時。
AggregatingReplyingKafkaTemplate
透過彙總來自多個接收器的回覆來擴展 ReplyingKafkaTemplate
。請參閱 彙總多個回覆 以取得更多資訊。
交易變更
您現在可以覆寫 KafkaTemplate
和 KafkaTransactionManager
上的生產者工廠的 transactionIdPrefix
。請參閱 transactionIdPrefix
以取得更多資訊。
新的 Delegating Serializer/Deserializer
框架現在提供委派 Serializer
和 Deserializer
,利用標頭來啟用使用多個金鑰/值類型產生和消費記錄。請參閱 Delegating Serializer and Deserializer 以取得更多資訊。
新的 Retrying Deserializer
框架現在提供委派 RetryingDeserializer
,以便在可能發生網路問題等暫時性錯誤時重試序列化。請參閱 Retrying Deserializer 以取得更多資訊。
2.1 與 2.2 之間的變更
類別和套件變更
ContainerProperties
類別已從 org.springframework.kafka.listener.config
移動到 org.springframework.kafka.listener
。
AckMode
列舉已從 AbstractMessageListenerContainer
移動到 ContainerProperties
。
setBatchErrorHandler()
和 setErrorHandler()
方法已從 ContainerProperties
移動到 AbstractMessageListenerContainer
和 AbstractKafkaListenerContainerFactory
。
Rollback 後處理
提供新的 AfterRollbackProcessor
策略。請參閱 After-rollback Processor 以取得更多資訊。
ConcurrentKafkaListenerContainerFactory
變更
您現在可以使用 ConcurrentKafkaListenerContainerFactory
來建立和配置任何 ConcurrentMessageListenerContainer
,而不僅僅是針對 @KafkaListener
註釋的容器。請參閱 容器工廠 以取得更多資訊。
Listener Container 變更
已新增新的容器屬性 (missingTopicsFatal
)。請參閱 使用 KafkaMessageListenerContainer
以取得更多資訊。
當消費者停止時,現在會發出 ConsumerStoppedEvent
。請參閱 執行緒安全 以取得更多資訊。
批次接聽器可以選擇性地接收完整的 ConsumerRecords<?, ?>
物件,而不是 List<ConsumerRecord<?, ?>
。請參閱 [batch-listeners] 以取得更多資訊。
DefaultAfterRollbackProcessor
和 SeekToCurrentErrorHandler
現在可以復原(跳過)持續失敗的記錄,並且預設情況下會在 10 次失敗後執行此操作。它們可以配置為將失敗的記錄發布到死信主題。
從 2.2.4 版開始,在選擇死信主題名稱時可以使用消費者的群組 ID。
已新增 ConsumerStoppingEvent
。請參閱 應用程式事件 以取得更多資訊。
SeekToCurrentErrorHandler
現在可以配置為在容器配置為 AckMode.MANUAL_IMMEDIATE
時提交復原記錄的偏移量(自 2.2.4 版起)。
@KafkaListener 變更
您現在可以透過在註釋上設定屬性來覆寫接聽器容器工廠的 concurrency
和 autoStartup
屬性。您現在可以新增配置以決定將哪些標頭(如果有的話)複製到回覆訊息。請參閱 @KafkaListener
註釋 以取得更多資訊。
您現在可以在您自己的註釋上使用 @KafkaListener
作為 meta-annotation。請參閱 @KafkaListener
作為 Meta Annotation 以取得更多資訊。
現在可以更輕鬆地為 @Payload
驗證配置 Validator
。請參閱 @KafkaListener
@Payload
驗證 以取得更多資訊。
您現在可以直接在註釋上指定 kafka 消費者屬性;這些屬性將覆寫在消費者工廠中定義的任何同名屬性(自 2.2.4 版起)。請參閱 註釋屬性 以取得更多資訊。
標頭對應變更
MimeType
和 MediaType
類型的標頭現在在 RecordHeader
值中對應為簡單字串。先前,它們被對應為 JSON 並且僅解碼 MimeType
。MediaType
無法解碼。現在它們是簡單字串以實現互通性。
此外,DefaultKafkaHeaderMapper
有一個新的 addToStringClasses
方法,允許指定應使用 toString()
而不是 JSON 進行對應的類型。請參閱 訊息標頭 以取得更多資訊。
Embedded Kafka 變更
KafkaEmbedded
類別及其 KafkaRule
介面已被棄用,改用 EmbeddedKafkaBroker
及其 JUnit 4 EmbeddedKafkaRule
包裝器。@EmbeddedKafka
註釋現在填入 EmbeddedKafkaBroker
bean 而不是已棄用的 KafkaEmbedded
。此變更允許在 JUnit 5 測試中使用 @EmbeddedKafka
。@EmbeddedKafka
註釋現在具有屬性 ports
,用於指定填入 EmbeddedKafkaBroker
的埠。請參閱 測試應用程式 以取得更多資訊。
JsonSerializer/Deserializer 增強功能
您現在可以使用生產者和消費者屬性來提供類型對應資訊。
反序列化器上提供新的建構函式,允許使用提供的目標類型覆寫類型標頭資訊。
JsonDeserializer
現在預設移除任何類型資訊標頭。
您現在可以配置 JsonDeserializer
以使用 Kafka 屬性忽略類型資訊標頭(自 2.2.3 版起)。
請參閱 序列化、反序列化和訊息轉換 以取得更多資訊。
Kafka Streams 變更
串流配置 bean 現在必須是 KafkaStreamsConfiguration
物件,而不是 StreamsConfig
物件。
StreamsBuilderFactoryBean
已從套件 …core
移動到 …config
。
已引入 KafkaStreamBrancher
,以便在條件分支建立在 KStream
實例之上時提供更好的最終使用者體驗。
請參閱 Apache Kafka Streams 支援 和 組態 以取得更多資訊。
交易 ID
當接聽器容器啟動交易時,transactional.id
現在是附加了 <group.id>.<topic>.<partition>
的 transactionIdPrefix
。此變更允許適當地防護僵屍,如此處所述。
2.0 與 2.1 之間的變更
JSON 改善
StringJsonMessageConverter
和 JsonSerializer
現在在 Headers
中新增類型資訊,讓轉換器和 JsonDeserializer
基於訊息本身而不是固定的配置類型在接收時建立特定類型。請參閱 序列化、反序列化和訊息轉換 以取得更多資訊。
容器停止錯誤處理常式
現在為記錄和批次接聽器提供容器錯誤處理常式,這些處理常式將接聽器拋出的任何例外視為致命。它們會停止容器。請參閱 處理例外 以取得更多資訊。
暫停和恢復容器
接聽器容器現在具有 pause()
和 resume()
方法(自 2.1.3 版起)。請參閱 暫停和恢復接聽器容器 以取得更多資訊。
具狀態重試
從 2.1.3 版開始,您可以配置具狀態重試。請參閱 具狀態重試 以取得更多資訊。
Client ID
從 2.1.1 版開始,您現在可以在 @KafkaListener
上設定 client.id
前綴。先前,若要自訂用戶端 ID,您需要每個接聽器一個單獨的消費者工廠(和容器工廠)。前綴會附加 -n
以在使用並行時提供唯一的用戶端 ID。
記錄偏移量提交
預設情況下,主題偏移量提交的記錄是以 DEBUG
記錄層級執行的。從 2.1.2 版開始,ContainerProperties
中的新屬性 commitLogLevel
可讓您指定這些訊息的記錄層級。請參閱 使用 KafkaMessageListenerContainer
以取得更多資訊。
預設 @KafkaHandler
從 2.1.3 版開始,您可以將類別層級 @KafkaListener
上的其中一個 @KafkaHandler
註釋指定為預設值。請參閱 類別上的 @KafkaListener
以取得更多資訊。
ReplyingKafkaTemplate
從 2.1.3 版開始,提供 KafkaTemplate
的子類別以支援請求/回覆語意。請參閱 使用 ReplyingKafkaTemplate
以取得更多資訊。
從 2.0 遷移指南
請參閱 2.0 到 2.1 遷移 指南。
1.3 與 2.0 之間的變更
@KafkaListener
變更
您現在可以使用 @SendTo
註釋 @KafkaListener
方法(以及類別和 @KafkaHandler
方法)。如果方法傳回結果,則會將其轉發到指定的主題。請參閱 使用 @SendTo
轉發接聽器結果 以取得更多資訊。
訊息接聽器
訊息接聽器現在可以感知 Consumer
物件。請參閱 [message-listeners] 以取得更多資訊。
使用 ConsumerAwareRebalanceListener
重新平衡接聽器現在可以在重新平衡通知期間存取 Consumer
物件。請參閱 重新平衡接聽器 以取得更多資訊。
1.2 與 1.3 之間的變更
交易支援
0.11.0.0 用戶端程式庫新增了交易支援。已新增 KafkaTransactionManager
和其他交易支援。請參閱 Transactions 以取得更多資訊。
標頭支援
0.11.0.0 用戶端程式庫新增了訊息標頭支援。現在可以將這些標頭對應到 spring-messaging
MessageHeaders
和從 spring-messaging
MessageHeaders
對應。請參閱 訊息標頭 以取得更多資訊。
Kafka 時間戳記支援
KafkaTemplate
現在支援新增具有時間戳記的記錄的 API。已引入關於 timestamp
支援的新 KafkaHeaders
。此外,還新增了新的 KafkaConditions.timestamp()
和 KafkaMatchers.hasTimestamp()
測試公用程式。請參閱 使用 KafkaTemplate
、@KafkaListener
註釋 和 測試應用程式 以取得更多詳細資訊。
@KafkaListener
變更
您現在可以配置 KafkaListenerErrorHandler
來處理例外。請參閱 處理例外 以取得更多資訊。
預設情況下,@KafkaListener
id
屬性現在用作 group.id
屬性,覆寫在消費者工廠中配置的屬性(如果存在)。此外,您可以在註釋上明確配置 groupId
。先前,您需要單獨的容器工廠(和消費者工廠)才能為接聽器使用不同的 group.id
值。若要還原使用工廠配置的 group.id
的先前行為,請將註釋上的 idIsGroup
屬性設定為 false
。
@EmbeddedKafka
註釋
為了方便起見,提供了一個測試類別層級 @EmbeddedKafka
註釋,以將 KafkaEmbedded
註冊為 bean。請參閱 測試應用程式 以取得更多資訊。
Kerberos 配置
現在提供配置 Kerberos 的支援。請參閱 JAAS 和 Kerberos 以取得更多資訊。
1.0 與 1.1 之間的變更
Seek
您現在可以查詢每個主題或分割區的位置。您可以使用此功能在群組管理啟用且 Kafka 指派分割區時,設定初始位置。您也可以在偵測到閒置容器時或在應用程式執行的任何時間點進行查詢。請參閱 [seek] 以取得更多資訊。