空值酬載與 '墓碑' 紀錄的日誌壓縮
當您使用 日誌壓縮 時,您可以傳送和接收帶有空值酬載的訊息,以識別金鑰的刪除。
您也可能因為其他原因收到空值,例如當 Deserializer
無法反序列化值時,可能會傳回空值。
若要使用 KafkaTemplate
傳送空值酬載,您可以將 null 傳遞至 send()
方法的值引數。其中一個例外是 send(Message<?> message)
變體。由於 spring-messaging
Message<?>
不能有空值酬載,您可以使用名為 KafkaNull
的特殊酬載類型,框架會傳送 null。為了方便起見,提供了靜態 KafkaNull.INSTANCE
。
當您使用訊息監聽器容器時,接收到的 ConsumerRecord
會具有 null value()
。
若要設定 @KafkaListener
處理空值酬載,您必須使用 @Payload
註解,並設定 required = false
。如果是壓縮日誌的墓碑訊息,您通常也需要金鑰,以便您的應用程式可以判斷哪個金鑰被「刪除」。以下範例顯示了此類組態
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
當您使用類別層級的 @KafkaListener
與多個 @KafkaHandler
方法時,需要一些額外的組態。具體來說,您需要一個具有 KafkaNull
酬載的 @KafkaHandler
方法。以下範例顯示如何設定一個
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
請注意,引數是 null,而不是 KafkaNull
。
請參閱 手動指派所有分割區。 |
此功能需要使用 KafkaNullAwarePayloadArgumentResolver ,框架在使用預設 MessageHandlerMethodFactory 時會設定它。當使用自訂 MessageHandlerMethodFactory 時,請參閱 將自訂 HandlerMethodArgumentResolver 新增至 @KafkaListener。 |