啟用 Listener 端點註解
若要啟用 @RabbitListener
註解的支援,您可以將 @EnableRabbit
新增至您的 @Configuration
類別之一。以下範例示範如何執行此操作
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}
自 2.0 版起,也提供 DirectMessageListenerContainerFactory
。它會建立 DirectMessageListenerContainer
實例。
如需協助您在 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory 之間做出選擇的資訊,請參閱選擇容器。 |
從 2.2.2 版開始,您可以提供 ContainerCustomizer
實作 (如上所示)。這可用於在容器建立和設定後進一步設定容器;例如,您可以使用它來設定容器工廠未公開的屬性。
2.4.8 版為希望應用多個自訂程式的情況提供 CompositeContainerCustomizer
。
預設情況下,基礎架構會尋找名為 rabbitListenerContainerFactory
的 Bean,作為用於建立訊息 Listener 容器的工廠來源。在這種情況下,並忽略 RabbitMQ 基礎架構設定,可以使用三個核心輪詢執行緒和最多十個執行緒的執行緒池來調用 processOrder
方法。
您可以自訂用於每個註解的 Listener 容器工廠,或者您可以透過實作 RabbitListenerConfigurer
介面來設定明確的預設值。只有在至少註冊一個沒有特定容器工廠的端點時,才需要預設值。如需完整詳細資訊和範例,請參閱 Javadoc。
容器工廠提供用於新增 MessagePostProcessor
實例的方法,這些實例在接收訊息之後 (在調用 Listener 之前) 以及在傳送回覆之前應用。
有關回覆的資訊,請參閱回覆管理。
從 2.0.6 版開始,您可以將 RetryTemplate
和 RecoveryCallback
新增至 Listener 容器工廠。它在傳送回覆時使用。當重試耗盡時,會調用 RecoveryCallback
。您可以使用 SendRetryContextAccessor
從內容中取得資訊。以下範例示範如何執行此操作
factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
...
return null;
});
如果您偏好 XML 設定,則可以使用 <rabbit:annotation-driven>
元素。任何使用 @RabbitListener
註解的 Bean 都會被偵測到。
對於 SimpleRabbitListenerContainer
實例,您可以使用類似以下的 XML
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>
對於 DirectMessageListenerContainer
實例,您可以使用類似以下的 XML
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="consumersPerQueue" value="3"/>
</bean>
從 2.0 版開始,@RabbitListener
註解具有 concurrency
屬性。它支援 SpEL 表達式 (#{…}
) 和屬性佔位符 (${…}
)。其含義和允許的值取決於容器類型,如下所示
-
對於
DirectMessageListenerContainer
,值必須是單一整數值,該值會設定容器上的consumersPerQueue
屬性。 -
對於
SimpleRabbitListenerContainer
,該值可以是單一整數值,該值會設定容器上的concurrentConsumers
屬性,或者它可以具有m-n
形式,其中m
是concurrentConsumers
屬性,而n
是maxConcurrentConsumers
屬性。
在任一情況下,此設定都會覆寫工廠上的設定。先前,如果您有需要不同並行性的 Listener,則必須定義不同的容器工廠。
該註解也允許透過 autoStartup
和 executor
(自 2.2 起) 註解屬性覆寫工廠 autoStartup
和 taskExecutor
屬性。為每個 Listener 使用不同的執行器可能有助於識別與記錄和執行緒傾印中每個 Listener 相關聯的執行緒。
2.2 版也新增了 ackMode
屬性,可讓您覆寫容器工廠的 acknowledgeMode
屬性。
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
channel.basicAck(tag, false);
}