啟用 Listener 端點註解

若要啟用 @RabbitListener 註解的支援,您可以將 @EnableRabbit 新增至您的 @Configuration 類別之一。以下範例示範如何執行此操作

@Configuration
@EnableRabbit
public class AppConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setContainerCustomizer(container -> /* customize the container */);
        return factory;
    }
}

自 2.0 版起,也提供 DirectMessageListenerContainerFactory。它會建立 DirectMessageListenerContainer 實例。

如需協助您在 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory 之間做出選擇的資訊,請參閱選擇容器

從 2.2.2 版開始,您可以提供 ContainerCustomizer 實作 (如上所示)。這可用於在容器建立和設定後進一步設定容器;例如,您可以使用它來設定容器工廠未公開的屬性。

2.4.8 版為希望應用多個自訂程式的情況提供 CompositeContainerCustomizer

預設情況下,基礎架構會尋找名為 rabbitListenerContainerFactory 的 Bean,作為用於建立訊息 Listener 容器的工廠來源。在這種情況下,並忽略 RabbitMQ 基礎架構設定,可以使用三個核心輪詢執行緒和最多十個執行緒的執行緒池來調用 processOrder 方法。

您可以自訂用於每個註解的 Listener 容器工廠,或者您可以透過實作 RabbitListenerConfigurer 介面來設定明確的預設值。只有在至少註冊一個沒有特定容器工廠的端點時,才需要預設值。如需完整詳細資訊和範例,請參閱 Javadoc

容器工廠提供用於新增 MessagePostProcessor 實例的方法,這些實例在接收訊息之後 (在調用 Listener 之前) 以及在傳送回覆之前應用。

有關回覆的資訊,請參閱回覆管理

從 2.0.6 版開始,您可以將 RetryTemplateRecoveryCallback 新增至 Listener 容器工廠。它在傳送回覆時使用。當重試耗盡時,會調用 RecoveryCallback。您可以使用 SendRetryContextAccessor 從內容中取得資訊。以下範例示範如何執行此操作

factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
    Message failed = SendRetryContextAccessor.getMessage(ctx);
    Address replyTo = SendRetryContextAccessor.getAddress(ctx);
    Throwable t = ctx.getLastThrowable();
    ...
    return null;
});

如果您偏好 XML 設定,則可以使用 <rabbit:annotation-driven> 元素。任何使用 @RabbitListener 註解的 Bean 都會被偵測到。

對於 SimpleRabbitListenerContainer 實例,您可以使用類似以下的 XML

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="concurrentConsumers" value="3"/>
    <property name="maxConcurrentConsumers" value="10"/>
</bean>

對於 DirectMessageListenerContainer 實例,您可以使用類似以下的 XML

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="consumersPerQueue" value="3"/>
</bean>

從 2.0 版開始,@RabbitListener 註解具有 concurrency 屬性。它支援 SpEL 表達式 (#{…​}) 和屬性佔位符 (${…​})。其含義和允許的值取決於容器類型,如下所示

  • 對於 DirectMessageListenerContainer,值必須是單一整數值,該值會設定容器上的 consumersPerQueue 屬性。

  • 對於 SimpleRabbitListenerContainer,該值可以是單一整數值,該值會設定容器上的 concurrentConsumers 屬性,或者它可以具有 m-n 形式,其中 mconcurrentConsumers 屬性,而 nmaxConcurrentConsumers 屬性。

在任一情況下,此設定都會覆寫工廠上的設定。先前,如果您有需要不同並行性的 Listener,則必須定義不同的容器工廠。

該註解也允許透過 autoStartupexecutor (自 2.2 起) 註解屬性覆寫工廠 autoStartuptaskExecutor 屬性。為每個 Listener 使用不同的執行器可能有助於識別與記錄和執行緒傾印中每個 Listener 相關聯的執行緒。

2.2 版也新增了 ackMode 屬性,可讓您覆寫容器工廠的 acknowledgeMode 屬性。

@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

    ...
    channel.basicAck(tag, false);
}