功能

大多數功能適用於 @RetryableTopic 註解和 RetryTopicConfiguration bean。

BackOff 組態

BackOff 組態依賴於 Spring Retry 專案的 BackOffPolicy 介面。

它包含

  • 固定 Back Off

  • 指數 Back Off

  • 隨機指數 Back Off

  • 均勻隨機 Back Off

  • 無 Back Off

  • 自訂 Back Off

@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(4)
            .create(template);
}

您也可以提供 Spring Retry 的 SleepingBackOffPolicy 介面的自訂實作

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackoff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
預設的 back off 策略是 FixedBackOffPolicy,最多嘗試 3 次,間隔為 1000 毫秒。
ExponentialBackOffPolicy 的預設最大延遲時間為 30 秒。如果您的 back off 策略需要更大的延遲值,請相應地調整 maxDelay 屬性。
第一次嘗試計入 maxAttempts,因此如果您提供 maxAttempts 值為 4,則會有原始嘗試加上 3 次重試。

全域逾時

您可以設定重試流程的全域逾時時間。如果達到該時間,則下次消費者拋出例外時,訊息將直接傳送到 DLT,或者如果沒有 DLT 可用,則直接結束處理。

@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
預設為未設定逾時,也可以透過提供 -1 作為逾時值來達成。

例外分類器

您可以指定要重試哪些例外,以及不要重試哪些例外。您也可以設定為遍歷原因以查找巢狀例外。

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
預設行為是對所有例外進行重試,且不遍歷原因。

自 2.8.3 版起,有一個全域致命例外清單,這些例外會導致記錄直接傳送到 DLT,而不會進行任何重試。請參閱 DefaultErrorHandler 以取得預設的致命例外清單。您可以透過覆寫擴展 RetryTopicConfigurationSupport@Configuration 類別中的 configureNonBlockingRetries 方法,在此清單中新增或移除例外。請參閱 Configuring Global Settings and Features 以取得更多資訊。

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
若要停用致命例外的分類,只需清除提供的清單即可。

包含和排除主題

您可以透過 .includeTopic(String topic)、.includeTopics(Collection<String> topics)、.excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法,決定哪些主題將由 RetryTopicConfiguration bean 處理,哪些主題不會被處理。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
預設行為是包含所有主題。

主題自動建立

除非另有說明,否則框架將使用 KafkaAdmin bean 所使用的 NewTopic bean 自動建立所需的主題。您可以指定將建立主題的分割區數量和複寫因數,並且您可以關閉此功能。從 3.0 版開始,預設複寫因數為 `-1`,表示使用 broker 預設值。如果您的 broker 版本早於 2.4,則需要設定明確的值。

請注意,如果您未使用 Spring Boot,則必須提供 KafkaAdmin bean 才能使用此功能。
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
預設情況下,主題會自動建立一個分割區,複寫因數為 -1 (表示使用 broker 預設值)。如果您的 broker 版本早於 2.4,則需要設定明確的值。

失敗標頭管理

在考慮如何管理失敗標頭 (原始標頭和例外標頭) 時,框架會委派給 DeadLetterPublishingRecoverer 來決定是否附加或取代標頭。

預設情況下,它會明確地將 appendOriginalHeaders 設定為 false,並將 stripPreviousExceptionHeaders 保留為 DeadLetterPublishingRecover 使用的預設值。

這表示在預設組態下,僅保留第一個「原始」標頭和最後一個例外標頭。這是為了避免在涉及許多重試步驟時,建立過大的訊息 (例如,由於堆疊追蹤標頭)。

請參閱 Managing Dead Letter Record Headers 以取得更多資訊。

若要重新設定框架以針對這些屬性使用不同的設定,請透過覆寫擴展 RetryTopicConfigurationSupport@Configuration 類別中的 configureCustomizers 方法,來設定 DeadLetterPublishingRecovererer 自訂器。請參閱 Configuring Global Settings and Features 以取得更多詳細資訊。

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

從 2.8.4 版開始,如果您希望新增自訂標頭 (除了工廠新增的重試資訊標頭之外),您可以將 headersFunction 新增至工廠 - factory.setHeadersFunction((rec, ex) -> { ... })

預設情況下,新增的任何標頭都將是累積的 - Kafka 標頭可以包含多個值。從 2.9.5 版開始,如果函式傳回的 Headers 包含 DeadLetterPublishingRecoverer.SingleRecordHeader 類型的標頭,則將移除該標頭的任何現有值,並且僅保留新的單一值。

自訂 DeadLetterPublishingRecoverer

如失敗標頭管理中所見,可以自訂框架建立的預設 DeadLetterPublishingRecoverer 實例。但是,在某些使用案例中,有必要對 DeadLetterPublishingRecoverer 進行子類化,例如覆寫 createProducerRecord() 以修改傳送到重試 (或死信) 主題的內容。從 3.0.9 版開始,您可以覆寫 RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory() 方法以提供 DeadLetterPublisherCreator 實例,例如

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {

    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}

建議您在建構自訂實例時使用提供的解析器。

根據拋出的例外將訊息路由到自訂 DLT

從 3.2.0 版開始,可以根據訊息處理期間拋出的例外類型,將訊息路由到自訂 DLT。為了做到這一點,需要指定路由。路由自訂包含額外目的地的規格。目的地又包含兩個設定:suffixexceptions。當拋出 exceptions 中指定的例外類型時,將在考慮通用 DLT 之前,將包含 suffix 的 DLT 視為訊息的目標主題。使用註解或 RetryTopicConfiguration bean 的組態範例

@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(kafkaOperations)
            .create(template);
}

suffix 在自訂 DLT 名稱中位於一般 dltTopicSuffix 之前。考量提供的範例,導致 DeserializationException 的訊息將路由到 my-annotated-topic-deserialization-dlt 而不是 my-annotated-topic-dlt。自訂 DLT 將遵循主題自動建立中說明的相同規則建立。