@KafkaListener
註解
@KafkaListener
註解用於將 Bean 方法指定為監聽器容器的監聽器。Bean 包裝在 MessagingMessageListenerAdapter
中,並配置各種功能,例如轉換器,以便在必要時轉換資料以符合方法參數。
您可以使用 SpEL (透過 #{…}
) 或屬性佔位符 (${…}
) 在註解上配置大多數屬性。請參閱 Javadoc 以取得更多資訊。
記錄監聽器
@KafkaListener
註解為簡單的 POJO 監聽器提供機制。以下範例顯示如何使用它
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
此機制需要在您的 @Configuration
類別之一上使用 @EnableKafka
註解,以及監聽器容器 Factory,用於配置底層的 ConcurrentMessageListenerContainer
。預設情況下,預期會有一個名為 kafkaListenerContainerFactory
的 Bean。以下範例顯示如何使用 ConcurrentMessageListenerContainer
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
}
請注意,若要設定容器屬性,您必須使用 Factory 上的 getContainerProperties()
方法。它用作注入到容器中的實際屬性的範本。
從 2.1.1 版開始,您現在可以為註解建立的消費者設定 client.id
屬性。clientIdPrefix
會加上 -n
後綴,其中 n
是使用並行時表示容器編號的整數。
從 2.2 版開始,您現在可以使用註解本身的屬性來覆寫容器 Factory 的 concurrency
和 autoStartup
屬性。屬性可以是簡單值、屬性佔位符或 SpEL 表達式。以下範例顯示如何執行此操作
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
明確分割區指派
您也可以使用明確的主題和分割區 (以及選擇性的初始偏移量) 配置 POJO 監聽器。以下範例顯示如何執行此操作
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以在 partitions
或 partitionOffsets
屬性中指定每個分割區,但不能同時指定兩者。
與大多數註解屬性一樣,您可以使用 SpEL 表達式;如需如何產生大型分割區清單的範例,請參閱 手動指派所有分割區。
從 2.5.5 版開始,您可以將初始偏移量套用至所有指派的分割區
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
*
萬用字元代表 partitions
屬性中的所有分割區。每個 @TopicPartition
中只能有一個具有萬用字元的 @PartitionOffset
。
此外,當監聽器實作 ConsumerSeekAware
時,即使在使用手動指派時,現在也會呼叫 onPartitionsAssigned
。例如,這允許在該時間進行任何任意搜尋操作。
從 2.6.4 版開始,您可以指定逗號分隔的分割區清單或分割區範圍
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
範圍是包含性的;上面的範例將指派分割區 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
。
在指定初始偏移量時,可以使用相同的技術
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移量將套用至所有 6 個分割區。
自 3.2 版起,@PartitionOffset
支援 SeekPosition.END
、SeekPosition.BEGINNING
、SeekPosition.TIMESTAMP
,seekPosition
符合 SeekPosition
列舉名稱
@KafkaListener(id = "seekPositionTime", topicPartitions = {
@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
})
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
如果 seekPosition 設定為 END
或 BEGINNING
,將忽略 initialOffset
和 relativeToCurrent
。如果 seekPosition 設定為 TIMESTAMP
,則 initialOffset
表示時間戳記。
手動確認
使用手動 AckMode
時,您也可以向監聽器提供 Acknowledgment
。以下範例也顯示如何使用不同的容器 Factory。
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
消費者記錄中繼資料
最後,記錄的相關中繼資料可從訊息標頭取得。您可以使用以下標頭名稱來擷取訊息的標頭
-
KafkaHeaders.OFFSET
-
KafkaHeaders.RECEIVED_KEY
-
KafkaHeaders.RECEIVED_TOPIC
-
KafkaHeaders.RECEIVED_PARTITION
-
KafkaHeaders.RECEIVED_TIMESTAMP
-
KafkaHeaders.TIMESTAMP_TYPE
從 2.5 版開始,如果傳入的記錄具有 null
鍵,則 RECEIVED_KEY
不會出現;先前,標頭會以 null
值填入。此變更是為了使框架與 spring-messaging
慣例保持一致,其中 null
值標頭不會出現。
以下範例顯示如何使用標頭
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
參數註解 (@Payload 、@Header ) 必須在監聽器方法的具體實作上指定;如果它們在介面上定義,則不會偵測到它們。 |
從 2.5 版開始,您可以接收 ConsumerRecordMetadata
參數中的記錄中繼資料,而無需使用離散標頭。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
這包含來自 ConsumerRecord
的所有資料,但金鑰和值除外。
批次監聽器
從 1.1 版開始,您可以設定 @KafkaListener
方法以接收從消費者輪詢接收的整個消費者記錄批次。
非阻塞重試 不支援批次監聽器。 |
若要設定監聽器容器 Factory 以建立批次監聽器,您可以設定 batchListener
屬性。以下範例顯示如何執行此操作
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
從 2.8 版開始,您可以使用 @KafkaListener 註解上的 batch 屬性來覆寫 Factory 的 batchListener 屬性。這與 容器錯誤處理常式 的變更一起,允許相同的 Factory 用於記錄和批次監聽器。 |
從 2.9.6 版開始,容器 Factory 具有用於 recordMessageConverter 和 batchMessageConverter 屬性的個別 Setter。先前,只有一個屬性 messageConverter 同時適用於記錄和批次監聽器。 |
以下範例顯示如何接收酬載清單
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
主題、分割區、偏移量等等在與酬載平行的標頭中可用。以下範例顯示如何使用標頭
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,您可以接收 Message<?>
物件的 List
,其中每個訊息中都有每個偏移量和其他詳細資訊,但它必須是方法上定義的唯一參數 (除了使用手動提交時的選擇性 Acknowledgment
和/或 Consumer<?, ?>
參數)。以下範例顯示如何執行此操作
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在這種情況下,不會對酬載執行轉換。
如果 BatchMessagingMessageConverter
配置了 RecordMessageConverter
,您也可以將泛型類型新增至 Message
參數,並且會轉換酬載。請參閱 使用批次監聽器進行酬載轉換 以取得更多資訊。
您也可以接收 ConsumerRecord<?, ?>
物件的清單,但它必須是方法上定義的唯一參數 (除了使用手動提交和 Consumer<?, ?>
參數時的選擇性 Acknowledgment
)。以下範例顯示如何執行此操作
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
從 2.2 版開始,監聽器可以接收 poll()
方法傳回的完整 ConsumerRecords<?, ?>
物件,讓監聽器可以存取其他方法,例如 partitions()
(傳回清單中的 TopicPartition
實例) 和 records(TopicPartition)
(取得選擇性記錄)。同樣地,這必須是方法上的唯一參數 (除了使用手動提交或 Consumer<?, ?>
參數時的選擇性 Acknowledgment
)。以下範例顯示如何執行此操作
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器 Factory 配置了 RecordFilterStrategy ,則會忽略 ConsumerRecords<?, ?> 監聽器,並發出 WARN 日誌訊息。只有在使用 <List<?>> 形式的監聽器時,才能使用批次監聽器篩選記錄。預設情況下,記錄會一次篩選一筆;從 2.8 版開始,您可以覆寫 filterBatch 以在一次呼叫中篩選整個批次。 |
註解屬性
從 2.0 版開始,id
屬性 (如果存在) 用作 Kafka 消費者 group.id
屬性,覆寫消費者 Factory 中配置的屬性 (如果存在)。您也可以明確設定 groupId
或將 idIsGroup
設定為 false,以還原先前使用消費者 Factory group.id
的行為。
您可以在大多數註解屬性中使用屬性佔位符或 SpEL 表達式,如下列範例所示
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
從 2.1.2 版開始,SpEL 表達式支援特殊 Token:__listener
。它是一個偽 Bean 名稱,表示此註解存在的目前 Bean 實例。
請考慮以下範例
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
給定先前範例中的 Bean,我們接著可以使用以下內容
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果您在不太可能發生的情況下有一個名為 __listener
的實際 Bean,您可以透過使用 beanRef
屬性來變更表達式 Token。以下範例顯示如何執行此操作
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
從 2.2.4 版開始,您可以直接在註解上指定 Kafka 消費者屬性,這些屬性將覆寫消費者 Factory 中配置的任何同名屬性。您 無法 以這種方式指定 group.id
和 client.id
屬性;它們將被忽略;請針對這些屬性使用 groupId
和 clientIdPrefix
註解屬性。
這些屬性指定為具有一般 Java Properties
檔案格式的個別字串:foo:bar
、foo=bar
或 foo bar
,如下列範例所示
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是 使用 RoutingKafkaTemplate
中範例的對應監聽器範例。
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}