空值酬載與 '墓碑' 紀錄的日誌壓縮

當您使用 日誌壓縮 時,您可以傳送和接收帶有空值酬載的訊息,以識別金鑰的刪除。

您也可能因為其他原因收到空值,例如當 Deserializer 無法反序列化值時,可能會傳回空值。

若要使用 KafkaTemplate 傳送空值酬載,您可以將 null 傳遞至 send() 方法的值引數。其中一個例外是 send(Message<?> message) 變體。由於 spring-messaging Message<?> 不能有空值酬載,您可以使用名為 KafkaNull 的特殊酬載類型,框架會傳送 null。為了方便起見,提供了靜態 KafkaNull.INSTANCE

當您使用訊息監聽器容器時,接收到的 ConsumerRecord 會具有 null value()

若要設定 @KafkaListener 處理空值酬載,您必須使用 @Payload 註解,並設定 required = false。如果是壓縮日誌的墓碑訊息,您通常也需要金鑰,以便您的應用程式可以判斷哪個金鑰被「刪除」。以下範例顯示了此類組態

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

當您使用類別層級的 @KafkaListener 與多個 @KafkaHandler 方法時,需要一些額外的組態。具體來說,您需要一個具有 KafkaNull 酬載的 @KafkaHandler 方法。以下範例顯示如何設定一個

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

請注意,引數是 null,而不是 KafkaNull

此功能需要使用 KafkaNullAwarePayloadArgumentResolver,框架在使用預設 MessageHandlerMethodFactory 時會設定它。當使用自訂 MessageHandlerMethodFactory 時,請參閱 將自訂 HandlerMethodArgumentResolver 新增至 @KafkaListener