組態
從 2.9 版本開始,對於預設組態,@EnableKafkaRetryTopic
註解應在 @Configuration
註解的類別中使用。這使得該功能能夠正確啟動,並允許存取注入該功能的一些元件,以便在運行時查找。
如果您添加此註解,則無需同時添加 @EnableKafka ,因為 @EnableKafkaRetryTopic 使用 @EnableKafka 進行元註解。 |
此外,從該版本開始,為了更進階地組態功能元件和全域功能,RetryTopicConfigurationSupport
類別應在 @Configuration
類別中擴展,並覆寫適當的方法。有關更多詳細資訊,請參閱組態全域設定和功能。
預設情況下,重試主題的容器將與主要容器具有相同的並行性。從 3.0 版本開始,您可以為重試容器設定不同的 concurrency
(可以在註解中,也可以在 RetryConfigurationBuilder
中)。
以上技術只能使用一種,並且只有一個 @Configuration 類別可以擴展 RetryTopicConfigurationSupport 。 |
使用 @RetryableTopic
註解
若要為使用 @KafkaListener
註解的方法設定重試主題和 dlt,您只需將 @RetryableTopic
註解添加到其中,Spring for Apache Kafka 將使用預設組態引導所有必要的主題和消費者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
從 3.2 開始,類別上 @KafkaListener
的 @RetryableTopic
支援將會是
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
您可以在同一個類別中指定一個方法來處理 dlt 訊息,方法是使用 @DltHandler
註解對其進行註解。如果未提供 DltHandler 方法,則會建立一個預設消費者,該消費者僅記錄消耗。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果您未指定 kafkaTemplate 名稱,則將查找名為 defaultRetryTopicKafkaTemplate 的 bean。如果找不到 bean,則會拋出例外。 |
從 3.0 版本開始,@RetryableTopic
註解可以用作自訂註解的元註解;例如
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
使用 RetryTopicConfiguration
beans
您也可以在 @Configuration
註解的類別中建立 RetryTopicConfiguration
bean 來組態非阻塞重試支援。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
這將為所有使用 @KafkaListener
註解的方法中的主題建立重試主題和 dlt,以及相應的消費者,並使用預設組態。訊息轉發需要 KafkaTemplate
實例。
為了更精細地控制如何處理每個主題的非阻塞重試,可以提供多個 RetryTopicConfiguration
bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重試主題和 dlt 的消費者將被分配到一個消費者組,該組的組 ID 是您在 @KafkaListener 註解的 groupId 參數中提供的值與主題後綴的組合。如果您沒有提供任何值,它們都將屬於同一個組,並且重試主題上的重新平衡將導致主要主題上不必要的重新平衡。 |
如果消費者配置了 ErrorHandlingDeserializer 以處理反序列化例外,則務必使用可以處理常規物件以及原始 byte[] 值的序列化器來組態 KafkaTemplate 及其生產者,這些值是由反序列化例外產生的。範本的泛型值類型應為 Object 。一種技術是使用 DelegatingByTypeSerializer ;範例如下 |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
多個 @KafkaListener 註解可以用於同一個主題,無論是否使用手動分割區分配以及非阻塞重試,但對於給定的主題,只會使用一個組態。最好使用單個 RetryTopicConfiguration bean 來組態此類主題;如果多個 @RetryableTopic 註解用於同一個主題,則它們都應具有相同的值,否則其中一個將應用於該主題的所有監聽器,而其他註解的值將被忽略。 |
組態全域設定和功能
從 2.9 開始,先前用於組態元件的 bean 覆寫方法已移除(由於上述 API 的實驗性質,因此沒有棄用)。這不會變更 RetryTopicConfiguration
bean 方法 - 僅限基礎架構元件的組態。現在,RetryTopicConfigurationSupport
類別應在(單個)@Configuration
類別中擴展,並覆寫適當的方法。範例如下
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
使用此組態方法時,不應使用 @EnableKafkaRetryTopic 註解,以防止上下文因重複的 bean 而無法啟動。請改用簡單的 @EnableKafka 註解。 |
當 autoCreateTopics
為 true 時,主要主題和重試主題將使用指定的分割區數量和複製因子建立。從 3.0 版本開始,預設複製因子為 -1
,表示使用 broker 預設值。如果您的 broker 版本早於 2.4,則需要設定顯式值。若要覆寫特定主題(例如,主要主題或 DLT)的這些值,只需添加具有所需屬性的 NewTopic
@Bean
即可;這將覆寫自動建立屬性。
預設情況下,記錄會使用接收記錄的原始分割區發佈到重試主題。如果重試主題的分割區少於主要主題,則應適當地組態框架;範例如下。 |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
函數的參數是消費者記錄和下一個主題的名稱。您可以返回特定的分割區號碼,或返回 null
以指示 KafkaProducer
應確定分割區。
預設情況下,當記錄在重試主題之間轉換時,會保留重試標頭的所有值(嘗試次數、時間戳記)。從 2.9.6 版本開始,如果您只想保留這些標頭的最後一個值,請使用上面顯示的 configureDeadLetterPublishingContainerFactory()
方法將工廠的 retainAllRetryHeaderValues
屬性設定為 false
。
尋找 RetryTopicConfiguration
嘗試透過以下方式提供 RetryTopicConfiguration
的實例:從 @RetryableTopic
註解建立一個實例,或者在沒有註解的情況下從 bean 容器建立一個實例。
如果在容器中找到 bean,則會進行檢查以確定提供的主題是否應由任何此類實例處理。
如果提供了 @RetryableTopic
註解,則會查找使用 DltHandler
註解的方法。
自 3.2 起,提供新的 API 以在類別上使用 @RetryableTopic
註解時建立 RetryTopicConfiguration
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}