死信主題處理

啟用 DLQ

若要啟用 DLQ,基於 Kafka 綁定器的應用程式必須透過屬性 spring.cloud.stream.bindings.<binding-name>.group 提供消費者群組。匿名消費者群組 (即應用程式未明確提供群組) 無法啟用 DLQ 功能。

當應用程式想要將錯誤記錄傳送到 DLQ 主題時,該應用程式必須啟用 DLQ 功能,因為預設情況下未啟用此功能。若要啟用 DLQ,屬性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq 必須設定為 true。

當啟用 DLQ 時,在處理發生錯誤且所有重試都根據 spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts 屬性耗盡後,該記錄將被傳送到 DLQ 主題。

預設情況下,max-attempts 屬性設定為 3。當 max-attempts 屬性大於 1 且已啟用 dlq 時,您將看到重試會遵守 max-attempts 屬性。當未啟用 dlq (預設情況) 時,max-attempts 屬性對於重試的處理方式沒有任何影響。在這種情況下,重試將回退到 Spring for Apache Kafka 中的容器預設值,即 10 次重試。如果應用程式想要在停用 DLQ 時完全停用重試,則將 max-attempts 屬性設定為 1 將不起作用。若要在這種情況下完全停用重試,您需要提供 ListenerContainerCustomizer,然後使用適當的 Backoff 設定。以下是一個範例。

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
	return (container, destinationName, group) -> {
		var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
		container.setCommonErrorHandler(commonErrorHandler);
	};
}

透過這樣設定,預設容器行為將被停用,並且不會嘗試重試。如上所述,當啟用 DLQ 時,綁定器設定將具有優先權。

處理死信主題中的記錄

由於框架無法預期使用者希望如何處置死信訊息,因此它未提供任何標準機制來處理它們。如果死信的原因是暫時性的,您可能希望將訊息路由回原始主題。但是,如果問題是永久性的問題,則可能會導致無限迴圈。本主題中的範例 Spring Boot 應用程式示範如何將這些訊息路由回原始主題,但它會在三次嘗試後將它們移動到「停車場」主題。該應用程式是另一個 spring-cloud-stream 應用程式,它從死信主題讀取。當 5 秒內未收到任何訊息時,它會退出。

範例假設原始目的地是 so8400out,而消費者群組是 so8400

有幾種策略需要考慮

  • 考慮僅在主要應用程式未執行時才執行重新路由。否則,暫時性錯誤的重試次數將很快用完。

  • 或者,使用兩階段方法:使用此應用程式路由到第三個主題,另一個應用程式從那裡路由回主要主題。

以下程式碼清單顯示了範例應用程式

application.properties
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
應用程式
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private StreamBridge streamBridge;

    @Bean
    public Function<Message<?>, Message<?>> reRoute() {
        return failed -> {
            processed.incrementAndGet();
            Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
            if (retries == null) {
                System.out.println("First retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else if (retries < 3) {
                System.out.println("Another retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, retries + 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else {
                System.out.println("Retries exhausted for " + failed);
                streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
            }
            return null;
        };
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, exiting");
                return;
            }
        }
    }
}