篩選訊息

在某些情況下,例如重新平衡,已處理過的訊息可能會重新傳遞。 框架無法得知此類訊息是否已處理。 這是應用程式層級的功能。 這被稱為 Idempotent Receiver 模式,而 Spring Integration 提供了它的 實作

Spring for Apache Kafka 專案也透過 FilteringMessageListenerAdapter 類別提供了一些協助,它可以包裝您的 MessageListener。 此類別採用 RecordFilterStrategy 的實作,您可以在其中實作 filter 方法來表示訊息是重複的,應捨棄。 它還有一個名為 ackDiscarded 的額外屬性,指示配接器是否應確認捨棄的記錄。 預設值為 false

當您使用 @KafkaListener 時,請在容器工廠上設定 RecordFilterStrategy(以及可選的 ackDiscarded),以便將監聽器包裝在適當的篩選配接器中。

此外,還提供了 FilteringBatchMessageListenerAdapter,用於當您使用批次 訊息監聽器 時。

如果您的 @KafkaListener 接收的是 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>>,則會忽略 FilteringBatchMessageListenerAdapter,因為 ConsumerRecords 是不可變的。

從 2.8.4 版開始,您可以透過使用監聽器註解上的 filter 屬性來覆寫監聽器容器工廠的預設 RecordFilterStrategy

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}