@KafkaListener
在類別上
當您在類別層級使用 @KafkaListener
時,您必須在方法層級指定 @KafkaHandler
。當訊息傳遞時,轉換後的訊息酬載類型會被用來決定要呼叫哪個方法。以下範例展示如何做到這一點
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
從 2.1.3 版本開始,您可以將 @KafkaHandler
方法指定為預設方法,如果其他方法沒有匹配項,則會調用此方法。最多只能指定一個方法。當使用 @KafkaHandler
方法時,酬載必須已經轉換為網域物件(以便可以執行匹配)。使用自訂的反序列化器、JsonDeserializer
或 JsonMessageConverter
,並將其 TypePrecedence
設定為 TYPE_ID
。有關更多資訊,請參閱 序列化、反序列化和訊息轉換。
由於 Spring 解析方法引數的方式存在一些限制,預設的 @KafkaHandler 無法接收離散的標頭;它必須使用 消費者記錄元數據 中討論的 ConsumerRecordMetadata 。 |
例如
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果物件是 String
,這將無法運作;topic
參數也會取得對 object
的參考。
如果您需要在預設方法中使用關於記錄的元數據,請使用這個
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}