@RabbitListener 與批次處理
當接收一批訊息時,通常由容器執行解批次處理,並一次使用一則訊息呼叫監聽器。從 2.2 版開始,您可以設定監聽器容器工廠和監聽器,以便在一次呼叫中接收整個批次,只需設定工廠的 batchListener
屬性,並將方法酬載參數設為 List
或 Collection
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
將 batchListener
屬性設為 true 會自動關閉工廠建立的容器中的 deBatchingEnabled
容器屬性 (除非 consumerBatchEnabled
為 true
- 請參閱下文)。實際上,解批次處理從容器移至監聽器配接器,而配接器會建立傳遞至監聽器的列表。
啟用批次處理的工廠不能與多方法監聽器一起使用。
同樣從 2.2 版開始,當一次接收一批訊息時,最後一則訊息包含一個布林值標頭,設定為 true
。可以透過將 @Header(AmqpHeaders.LAST_IN_BATCH)
布林值 last 參數新增至您的監聽器方法來取得此標頭。標頭是從 MessageProperties.isLastInBatch()
對應而來。此外,AmqpHeaders.BATCH_SIZE
會在每個訊息片段中填入批次的大小。
此外,已將新的屬性 consumerBatchEnabled
新增至 SimpleMessageListenerContainer
。當此屬性為 true 時,容器將建立一批訊息,最多為 batchSize
;如果 receiveTimeout
在沒有新訊息到達的情況下經過,則會傳遞部分批次。如果收到生產者建立的批次,則會將其解批次處理並新增至消費者端批次;因此,傳遞的訊息實際數量可能超過 batchSize
,batchSize
代表從 Broker 接收的訊息數量。當 consumerBatchEnabled
為 true 時,deBatchingEnabled
必須為 true;容器工廠將強制執行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
當將 consumerBatchEnabled
與 @RabbitListener
搭配使用時
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一個會使用收到的原始、未轉換的
org.springframework.amqp.core.Message
s 呼叫。 -
第二個會使用
org.springframework.messaging.Message<?>
s 呼叫,其中包含已轉換的酬載和已對應的標頭/屬性。 -
第三個會使用已轉換的酬載呼叫,無法存取標頭/屬性。
您也可以新增 Channel
參數,通常在使用 MANUAL
ack 模式時使用。這對於第三個範例來說不是非常有用,因為您無法存取 delivery_tag
屬性。
Spring Boot 為 consumerBatchEnabled
和 batchSize
提供組態屬性,但不為 batchListener
提供。從 3.0 版開始,在容器工廠上將 consumerBatchEnabled
設定為 true
也會將 batchListener
設定為 true
。當 consumerBatchEnabled
為 true
時,監聽器**必須**是批次監聽器。
從 3.0 版開始,監聽器方法可以取用 Collection<?>
或 List<?>
。