@KafkaListener 在類別上

當您在類別層級使用 @KafkaListener 時,您必須在方法層級指定 @KafkaHandler。當訊息傳遞時,轉換後的訊息酬載類型會被用來決定要呼叫哪個方法。以下範例展示如何做到這一點

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

從 2.1.3 版本開始,您可以將 @KafkaHandler 方法指定為預設方法,如果其他方法沒有匹配項,則會調用此方法。最多只能指定一個方法。當使用 @KafkaHandler 方法時,酬載必須已經轉換為網域物件(以便可以執行匹配)。使用自訂的反序列化器、JsonDeserializerJsonMessageConverter,並將其 TypePrecedence 設定為 TYPE_ID。有關更多資訊,請參閱 序列化、反序列化和訊息轉換

由於 Spring 解析方法引數的方式存在一些限制,預設的 @KafkaHandler 無法接收離散的標頭;它必須使用 消費者記錄元數據 中討論的 ConsumerRecordMetadata

例如

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

如果物件是 String,這將無法運作;topic 參數也會取得對 object 的參考。

如果您需要在預設方法中使用關於記錄的元數據,請使用這個

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}