最新消息?
3.2 相較於 3.1 的新功能
本節涵蓋了從 3.1 版到 3.2 版所做的變更。如需較早版本的變更,請參閱變更記錄。
Kafka Client 版本
此版本需要 3.7.0 kafka-clients
。Kafka client 的 3.7.0 版本引入了新的消費者群組協定。如需更多詳細資訊及其限制,請參閱KIP-848。新的消費者群組協定是搶先體驗版本,不適用於生產環境。建議僅在此版本中用於測試目的。因此,Spring for Apache Kafka 僅在 kafka-client
本身提供的此類測試層級支援範圍內支援此新的消費者群組協定。預設情況下,Spring for Apache Kafka 使用傳統的消費者群組協定,而在測試新的消費者群組協定時,需要透過 consumer 上的 group.protocol
屬性選擇加入。
測試支援變更
在 EmbeddedKafka
中,kraft
模式預設為停用,想要使用 kraft
模式的使用者必須啟用它。這是因為在使用 kraft
模式的 EmbeddedKafka
時觀察到某些不穩定性,尤其是在測試新的消費者群組協定時。新的消費者群組協定僅在 kraft
模式下受支援,因此,當測試新的協定時,需要針對真實的 Kafka 叢集進行,而不是基於 KafkaClusterTestKit
的叢集,而 EmbeddedKafka
正是基於 KafkaClusterTestKit
。此外,在使用 kraft
模式的 EmbeddedKafka
執行多個 KafkaListener
方法時,也觀察到一些其他競爭條件。在這些問題解決之前,EmbeddedKafka
上的 kraft
預設值將保持為 false
。
Kafka Streams 互動式查詢支援
新的 API KafkaStreamsInteractiveQuerySupport
用於存取 Kafka Streams 互動式查詢中使用的可查詢儲存區。請參閱Kafka Streams 互動式支援以取得更多詳細資訊。
TransactionIdSuffixStrategy
引入了新的 TransactionIdSuffixStrategy
介面來管理 transactional.id
後綴。預設實作是 DefaultTransactionIdSuffixStrategy
,當設定 maxCache
大於零時,可以在特定範圍內重複使用 transactional.id
,否則將透過遞增計數器即時產生後綴。請參閱固定 TransactionIdSuffix以取得更多資訊。
Async @KafkaListener 傳回
@KafkaListener
(和 @KafkaHandler
)方法現在可以傳回非同步傳回類型,包括 CompletableFuture<?>
、Mono<?>
和 Kotlin suspend
函數。請參閱Async Returns以取得更多資訊。
根據擲出的例外狀況將訊息路由到自訂 DLT
現在可以根據訊息處理期間擲出的例外狀況類型,將訊息重新導向到自訂 DLT。重新導向的規則可以透過 RetryableTopic.exceptionBasedDltRouting
或 RetryTopicConfigurationBuilder.dltRoutingRules
設定。自訂 DLT 以及其他重試和死信主題也會自動建立。請參閱根據擲出的例外狀況將訊息路由到自訂 DLT以取得更多資訊。
棄用 ContainerProperties transactionManager 屬性
棄用 ContainerProperties
中的 transactionManager
屬性,改用 KafkaAwareTransactionManager
,這是一種比一般 PlatformTransactionManager
更窄的類型。請參閱ContainerProperties和交易同步。
Rollback 後處理
提供新的 AfterRollbackProcessor
API processBatch
。請參閱Rollback 後處理器以取得更多資訊。
變更 @RetryableTopic SameIntervalTopicReuseStrategy 預設值
將 @RetryableTopic
屬性 SameIntervalTopicReuseStrategy
預設值變更為 SINGLE_TOPIC
。請參閱maxInterval 指數延遲的單一主題。
非阻塞重試支援類別層級 @KafkaListener
非阻塞重試支援Class 上的 @KafkaListener。請參閱非阻塞重試。
在 RetryTopicConfigurationProvider 中支援處理類別上的 @RetryableTopic。
提供新的 public API 以尋找 RetryTopicConfiguration
。請參閱尋找 RetryTopicConfiguration
RetryTopicConfigurer 支援處理 MultiMethodKafkaListenerEndpoint。
RetryTopicConfigurer
支援處理和註冊 MultiMethodKafkaListenerEndpoint
。MultiMethodKafkaListenerEndpoint
為屬性 defaultMethod
和 methods
提供 getter/setter
。修改嚴格用於 MethodKafkaListenerEndpoint
類型的 EndpointCustomizer
。EndpointHandlerMethod
新增建構子,為提供的 bean 建構一個實例。提供新的類別 EndpointHandlerMultiMethod
以處理重試端點的多個方法。
新的 API 方法,可根據使用者提供的函數搜尋偏移量
ConsumerCallback
提供新的 API,可根據使用者定義的函數搜尋偏移量,該函數將消費者中的目前偏移量作為引數。請參閱Seek API 文件以取得更多詳細資訊。
@PartitionOffset 支援 SeekPosition
將 seekPosition
屬性新增至 @PartitionOffset
,以支援 TopicPartitionOffset.SeekPosition
。請參閱手動指派以取得更多詳細資訊。
TopicPartitionOffset 中的新建構子,可接受函數以計算要搜尋的偏移量
TopicPartitionOffset
有一個新的建構子,它接受使用者提供的函數來計算要搜尋的偏移量。當使用此建構子時,框架會使用目前的消費者偏移量位置的輸入引數呼叫函數。請參閱Seek API 文件以取得更多詳細資訊。
Spring Boot 應用程式名稱作為預設 client ID 前綴
對於定義應用程式名稱的 Spring Boot 應用程式,此名稱現在用作某些用戶端類型自動產生的用戶端 ID 的預設前綴。請參閱預設用戶端 ID 前綴以取得更多詳細資訊。
增強型 MessageListenerContainers 檢索
ListenerContainerRegistry
提供兩個新的 API,可動態尋找和篩選 MessageListenerContainer
實例。getListenerContainersMatching(Predicate<String> idMatcher)
可依 ID 篩選,另一個是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
可依 ID 和容器屬性篩選。
請參閱@KafkaListener
生命周期管理的 API 文件以取得更多資訊。
透過提供更多追蹤標籤來增強觀察能力
KafkaTemplateObservation
提供更多追蹤標籤(低基數)。KafkaListenerObservation
提供新的 API 以尋找高基數金鑰名稱和更多追蹤標籤(高或低基數)。請參閱Micrometer Observation