DLT 策略

此框架提供一些策略來處理 DLT。您可以提供 DLT 處理的方法、使用預設記錄方法,或完全不使用 DLT。您也可以選擇 DLT 處理失敗時會發生的情況。

DLT 處理方法

您可以指定用於處理主題 DLT 的方法,以及該處理失敗時的行為。

為此,您可以在具有 @RetryableTopic 註解的類別方法中使用 @DltHandler 註解。請注意,相同的方法將用於該類別中所有帶有 @RetryableTopic 註解的方法。

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

DLT 處理常式方法也可以透過 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供,並將應處理 DLT 訊息的 bean 名稱和方法名稱作為引數傳遞。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}
如果未提供 DLT 處理常式,則會使用預設的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod

從 2.8 版開始,如果您完全不想在此應用程式中使用 DLT(包括預設處理常式),或您希望延遲使用,您可以控制是否啟動 DLT 容器,而與容器 factory 的 autoStartup 屬性無關。

當使用 @RetryableTopic 註解時,將 autoStartDltHandler 屬性設定為 false;當使用組態建構器時,請使用 autoStartDltHandler(false)

您稍後可以透過 KafkaListenerEndpointRegistry 啟動 DLT 處理常式。

DLT 失敗行為

如果 DLT 處理失敗,則有兩種可能的行為可用:ALWAYS_RETRY_ON_ERRORFAIL_ON_ERROR

在前一種情況下,記錄會轉發回 DLT 主題,使其不會阻止其他 DLT 記錄的處理。在後一種情況下,consumer 會結束執行,而不會轉發訊息。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
預設行為是 ALWAYS_RETRY_ON_ERROR
從 2.8.3 版開始,如果記錄導致拋出致命例外,例如 DeserializationExceptionALWAYS_RETRY_ON_ERROR 將 *不會* 將記錄路由回 DLT,因為通常此類例外將始終被拋出。

被視為致命的例外包括

  • DeserializationException (反序列化例外)

  • MessageConversionException (訊息轉換例外)

  • ConversionException (轉換例外)

  • MethodArgumentResolutionException (方法引數解析例外)

  • NoSuchMethodException (無此方法例外)

  • ClassCastException (類別轉換例外)

您可以使用 DestinationTopicResolver bean 上的方法,將例外新增至此清單和從此清單中移除例外。

有關更多資訊,請參閱例外分類器

設定不使用 DLT

此框架也提供不為主題設定 DLT 的可能性。在這種情況下,在重試耗盡後,處理就會直接結束。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}