篩選訊息
在某些情況下,例如重新平衡,已處理過的訊息可能會重新傳遞。 框架無法得知此類訊息是否已處理。 這是應用程式層級的功能。 這被稱為 Idempotent Receiver 模式,而 Spring Integration 提供了它的 實作。
Spring for Apache Kafka 專案也透過 FilteringMessageListenerAdapter
類別提供了一些協助,它可以包裝您的 MessageListener
。 此類別採用 RecordFilterStrategy
的實作,您可以在其中實作 filter
方法來表示訊息是重複的,應捨棄。 它還有一個名為 ackDiscarded
的額外屬性,指示配接器是否應確認捨棄的記錄。 預設值為 false
。
當您使用 @KafkaListener
時,請在容器工廠上設定 RecordFilterStrategy
(以及可選的 ackDiscarded
),以便將監聽器包裝在適當的篩選配接器中。
此外,還提供了 FilteringBatchMessageListenerAdapter
,用於當您使用批次 訊息監聽器 時。
如果您的 @KafkaListener 接收的是 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>> ,則會忽略 FilteringBatchMessageListenerAdapter ,因為 ConsumerRecords 是不可變的。 |
從 2.8.4 版開始,您可以透過使用監聽器註解上的 filter
屬性來覆寫監聽器容器工廠的預設 RecordFilterStrategy
。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}