重試與死信處理
預設情況下,當您在消費者繫結中設定重試 (例如 maxAttemts
) 和 enableDlq
時,這些功能會在繫結器內執行,而監聽器容器或 Kafka 消費者不會參與。
在某些情況下,最好將此功能移至監聽器容器,例如:
-
重試和延遲的總和將超過消費者的
max.poll.interval.ms
屬性,可能導致分割區重新平衡。 -
您希望將死信發布到不同的 Kafka 叢集。
-
您希望將重試監聽器新增至錯誤處理程式。
-
…
若要設定將此功能從繫結器移至容器,請定義類型為 ListenerContainerWithDlqAndRetryCustomizer
的 @Bean
。此介面具有以下方法:
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
目的地解析器和 BackOff
是從繫結屬性 (如果已設定) 建立的。 KafkaTemplate
使用來自 spring.kafka…
屬性的組態。然後,您可以使用它們來建立自訂錯誤處理程式和死信發布者;例如:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
現在,只有單一重試延遲需要大於消費者的 max.poll.interval.ms
屬性。
當使用多個繫結器時,'ListenerContainerWithDlqAndRetryCustomizer' bean 會被 'DefaultBinderFactory' 覆寫。為了使 bean 生效,您需要使用 'BinderCustomizer' 來設定容器自訂器 (請參閱 [binder-customizer])
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}