功能
大多數功能適用於 @RetryableTopic
註解和 RetryTopicConfiguration
bean。
BackOff 組態
BackOff 組態依賴於 Spring Retry
專案的 BackOffPolicy
介面。
它包含
-
固定 Back Off
-
指數 Back Off
-
隨機指數 Back Off
-
均勻隨機 Back Off
-
無 Back Off
-
自訂 Back Off
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
您也可以提供 Spring Retry 的 SleepingBackOffPolicy
介面的自訂實作
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
預設的 back off 策略是 FixedBackOffPolicy ,最多嘗試 3 次,間隔為 1000 毫秒。 |
ExponentialBackOffPolicy 的預設最大延遲時間為 30 秒。如果您的 back off 策略需要更大的延遲值,請相應地調整 maxDelay 屬性。 |
第一次嘗試計入 maxAttempts ,因此如果您提供 maxAttempts 值為 4,則會有原始嘗試加上 3 次重試。 |
全域逾時
您可以設定重試流程的全域逾時時間。如果達到該時間,則下次消費者拋出例外時,訊息將直接傳送到 DLT,或者如果沒有 DLT 可用,則直接結束處理。
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
預設為未設定逾時,也可以透過提供 -1 作為逾時值來達成。 |
例外分類器
您可以指定要重試哪些例外,以及不要重試哪些例外。您也可以設定為遍歷原因以查找巢狀例外。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
預設行為是對所有例外進行重試,且不遍歷原因。 |
自 2.8.3 版起,有一個全域致命例外清單,這些例外會導致記錄直接傳送到 DLT,而不會進行任何重試。請參閱 DefaultErrorHandler 以取得預設的致命例外清單。您可以透過覆寫擴展 RetryTopicConfigurationSupport
的 @Configuration
類別中的 configureNonBlockingRetries
方法,在此清單中新增或移除例外。請參閱 Configuring Global Settings and Features 以取得更多資訊。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
若要停用致命例外的分類,只需清除提供的清單即可。 |
包含和排除主題
您可以透過 .includeTopic(String topic)、.includeTopics(Collection<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法,決定哪些主題將由 RetryTopicConfiguration
bean 處理,哪些主題不會被處理。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
預設行為是包含所有主題。 |
主題自動建立
除非另有說明,否則框架將使用 KafkaAdmin
bean 所使用的 NewTopic
bean 自動建立所需的主題。您可以指定將建立主題的分割區數量和複寫因數,並且您可以關閉此功能。從 3.0 版開始,預設複寫因數為 `-1`,表示使用 broker 預設值。如果您的 broker 版本早於 2.4,則需要設定明確的值。
請注意,如果您未使用 Spring Boot,則必須提供 KafkaAdmin bean 才能使用此功能。 |
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
預設情況下,主題會自動建立一個分割區,複寫因數為 -1 (表示使用 broker 預設值)。如果您的 broker 版本早於 2.4,則需要設定明確的值。 |
失敗標頭管理
在考慮如何管理失敗標頭 (原始標頭和例外標頭) 時,框架會委派給 DeadLetterPublishingRecoverer
來決定是否附加或取代標頭。
預設情況下,它會明確地將 appendOriginalHeaders
設定為 false
,並將 stripPreviousExceptionHeaders
保留為 DeadLetterPublishingRecover
使用的預設值。
這表示在預設組態下,僅保留第一個「原始」標頭和最後一個例外標頭。這是為了避免在涉及許多重試步驟時,建立過大的訊息 (例如,由於堆疊追蹤標頭)。
請參閱 Managing Dead Letter Record Headers 以取得更多資訊。
若要重新設定框架以針對這些屬性使用不同的設定,請透過覆寫擴展 RetryTopicConfigurationSupport
的 @Configuration
類別中的 configureCustomizers
方法,來設定 DeadLetterPublishingRecovererer
自訂器。請參閱 Configuring Global Settings and Features 以取得更多詳細資訊。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
從 2.8.4 版開始,如果您希望新增自訂標頭 (除了工廠新增的重試資訊標頭之外),您可以將 headersFunction
新增至工廠 - factory.setHeadersFunction((rec, ex) -> { ... })
。
預設情況下,新增的任何標頭都將是累積的 - Kafka 標頭可以包含多個值。從 2.9.5 版開始,如果函式傳回的 Headers
包含 DeadLetterPublishingRecoverer.SingleRecordHeader
類型的標頭,則將移除該標頭的任何現有值,並且僅保留新的單一值。
自訂 DeadLetterPublishingRecoverer
如失敗標頭管理中所見,可以自訂框架建立的預設 DeadLetterPublishingRecoverer
實例。但是,在某些使用案例中,有必要對 DeadLetterPublishingRecoverer
進行子類化,例如覆寫 createProducerRecord()
以修改傳送到重試 (或死信) 主題的內容。從 3.0.9 版開始,您可以覆寫 RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()
方法以提供 DeadLetterPublisherCreator
實例,例如
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建議您在建構自訂實例時使用提供的解析器。
根據拋出的例外將訊息路由到自訂 DLT
從 3.2.0 版開始,可以根據訊息處理期間拋出的例外類型,將訊息路由到自訂 DLT。為了做到這一點,需要指定路由。路由自訂包含額外目的地的規格。目的地又包含兩個設定:suffix
和 exceptions
。當拋出 exceptions
中指定的例外類型時,將在考慮通用 DLT 之前,將包含 suffix
的 DLT 視為訊息的目標主題。使用註解或 RetryTopicConfiguration
bean 的組態範例
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(kafkaOperations)
.create(template);
}
suffix
在自訂 DLT 名稱中位於一般 dltTopicSuffix
之前。考量提供的範例,導致 DeserializationException
的訊息將路由到 my-annotated-topic-deserialization-dlt
而不是 my-annotated-topic-dlt
。自訂 DLT 將遵循主題自動建立中說明的相同規則建立。