最新消息?

3.2 相較於 3.1 的新功能

本節涵蓋了從 3.1 版到 3.2 版所做的變更。如需較早版本的變更,請參閱變更記錄

Kafka Client 版本

此版本需要 3.7.0 kafka-clients。Kafka client 的 3.7.0 版本引入了新的消費者群組協定。如需更多詳細資訊及其限制,請參閱KIP-848。新的消費者群組協定是搶先體驗版本,不適用於生產環境。建議僅在此版本中用於測試目的。因此,Spring for Apache Kafka 僅在 kafka-client 本身提供的此類測試層級支援範圍內支援此新的消費者群組協定。預設情況下,Spring for Apache Kafka 使用傳統的消費者群組協定,而在測試新的消費者群組協定時,需要透過 consumer 上的 group.protocol 屬性選擇加入。

測試支援變更

EmbeddedKafka 中,kraft 模式預設為停用,想要使用 kraft 模式的使用者必須啟用它。這是因為在使用 kraft 模式的 EmbeddedKafka 時觀察到某些不穩定性,尤其是在測試新的消費者群組協定時。新的消費者群組協定僅在 kraft 模式下受支援,因此,當測試新的協定時,需要針對真實的 Kafka 叢集進行,而不是基於 KafkaClusterTestKit 的叢集,而 EmbeddedKafka 正是基於 KafkaClusterTestKit。此外,在使用 kraft 模式的 EmbeddedKafka 執行多個 KafkaListener 方法時,也觀察到一些其他競爭條件。在這些問題解決之前,EmbeddedKafka 上的 kraft 預設值將保持為 false

Kafka Streams 互動式查詢支援

新的 API KafkaStreamsInteractiveQuerySupport 用於存取 Kafka Streams 互動式查詢中使用的可查詢儲存區。請參閱Kafka Streams 互動式支援以取得更多詳細資訊。

TransactionIdSuffixStrategy

引入了新的 TransactionIdSuffixStrategy 介面來管理 transactional.id 後綴。預設實作是 DefaultTransactionIdSuffixStrategy,當設定 maxCache 大於零時,可以在特定範圍內重複使用 transactional.id,否則將透過遞增計數器即時產生後綴。請參閱固定 TransactionIdSuffix以取得更多資訊。

Async @KafkaListener 傳回

@KafkaListener(和 @KafkaHandler)方法現在可以傳回非同步傳回類型,包括 CompletableFuture<?>Mono<?> 和 Kotlin suspend 函數。請參閱Async Returns以取得更多資訊。

根據擲出的例外狀況將訊息路由到自訂 DLT

現在可以根據訊息處理期間擲出的例外狀況類型,將訊息重新導向到自訂 DLT。重新導向的規則可以透過 RetryableTopic.exceptionBasedDltRoutingRetryTopicConfigurationBuilder.dltRoutingRules 設定。自訂 DLT 以及其他重試和死信主題也會自動建立。請參閱根據擲出的例外狀況將訊息路由到自訂 DLT以取得更多資訊。

棄用 ContainerProperties transactionManager 屬性

棄用 ContainerProperties 中的 transactionManager 屬性,改用 KafkaAwareTransactionManager,這是一種比一般 PlatformTransactionManager 更窄的類型。請參閱ContainerProperties交易同步

Rollback 後處理

提供新的 AfterRollbackProcessor API processBatch。請參閱Rollback 後處理器以取得更多資訊。

變更 @RetryableTopic SameIntervalTopicReuseStrategy 預設值

@RetryableTopic 屬性 SameIntervalTopicReuseStrategy 預設值變更為 SINGLE_TOPIC。請參閱maxInterval 指數延遲的單一主題

非阻塞重試支援類別層級 @KafkaListener

非阻塞重試支援Class 上的 @KafkaListener。請參閱非阻塞重試

在 RetryTopicConfigurationProvider 中支援處理類別上的 @RetryableTopic。

提供新的 public API 以尋找 RetryTopicConfiguration。請參閱尋找 RetryTopicConfiguration

RetryTopicConfigurer 支援處理 MultiMethodKafkaListenerEndpoint。

RetryTopicConfigurer 支援處理和註冊 MultiMethodKafkaListenerEndpointMultiMethodKafkaListenerEndpoint 為屬性 defaultMethodmethods 提供 getter/setter。修改嚴格用於 MethodKafkaListenerEndpoint 類型的 EndpointCustomizerEndpointHandlerMethod 新增建構子,為提供的 bean 建構一個實例。提供新的類別 EndpointHandlerMultiMethod 以處理重試端點的多個方法。

新的 API 方法,可根據使用者提供的函數搜尋偏移量

ConsumerCallback 提供新的 API,可根據使用者定義的函數搜尋偏移量,該函數將消費者中的目前偏移量作為引數。請參閱Seek API 文件以取得更多詳細資訊。

@PartitionOffset 支援 SeekPosition

seekPosition 屬性新增至 @PartitionOffset,以支援 TopicPartitionOffset.SeekPosition。請參閱手動指派以取得更多詳細資訊。

TopicPartitionOffset 中的新建構子,可接受函數以計算要搜尋的偏移量

TopicPartitionOffset 有一個新的建構子,它接受使用者提供的函數來計算要搜尋的偏移量。當使用此建構子時,框架會使用目前的消費者偏移量位置的輸入引數呼叫函數。請參閱Seek API 文件以取得更多詳細資訊。

Spring Boot 應用程式名稱作為預設 client ID 前綴

對於定義應用程式名稱的 Spring Boot 應用程式,此名稱現在用作某些用戶端類型自動產生的用戶端 ID 的預設前綴。請參閱預設用戶端 ID 前綴以取得更多詳細資訊。

增強型 MessageListenerContainers 檢索

ListenerContainerRegistry 提供兩個新的 API,可動態尋找和篩選 MessageListenerContainer 實例。getListenerContainersMatching(Predicate<String> idMatcher) 可依 ID 篩選,另一個是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) 可依 ID 和容器屬性篩選。

請參閱@KafkaListener 生命周期管理的 API 文件以取得更多資訊。

透過提供更多追蹤標籤來增強觀察能力

KafkaTemplateObservation 提供更多追蹤標籤(低基數)。KafkaListenerObservation 提供新的 API 以尋找高基數金鑰名稱和更多追蹤標籤(高或低基數)。請參閱Micrometer Observation