非同步消費者

Spring AMQP 也透過使用 @RabbitListener 註解來支援註解的監聽器端點,並提供開放的基礎架構以程式化方式註冊端點。這是目前為止設定非同步消費者最方便的方式。請參閱 註解驅動的監聽器端點 以取得更多詳細資訊。

以前用於預取的預設值為 1,這可能會導致有效率的消費者未被充分利用。從 2.0 版開始,預設預取值現在為 250,這應使消費者在大多數常見情況下保持忙碌,從而提高輸送量。

然而,在某些情況下,預取值應該較低

  • 對於大型訊息,尤其是當處理速度緩慢時 (訊息可能會在用戶端程序中佔用大量記憶體)

  • 當嚴格的訊息順序是必要的時 (在這種情況下,預取值應設定回 1)

  • 其他特殊情況

此外,對於低流量訊息傳遞和多個消費者 (包括單一監聽器容器實例中的並行性),您可能希望降低預取值,以在消費者之間獲得更均勻的訊息分配。

如需更多關於預取的背景資訊,請參閱這篇關於 RabbitMQ 中消費者利用率 的文章,以及這篇關於 佇列理論 的文章。

訊息監聽器

對於非同步 Message 接收,會涉及一個專用元件 (而非 AmqpTemplate)。該元件是 Message 消費回呼的容器。我們稍後在本節中會考慮容器及其屬性。不過,首先,我們應該看看回呼,因為這是您的應用程式碼與訊息傳遞系統整合的地方。回呼有一些選項,首先是 MessageListener 介面的實作,以下清單顯示了該介面

public interface MessageListener {
    void onMessage(Message message);
}

如果您的回呼邏輯基於任何原因而依賴 AMQP Channel 實例,您可以改用 ChannelAwareMessageListener。它看起來很相似,但有一個額外的參數。以下清單顯示了 ChannelAwareMessageListener 介面定義

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}
在 2.1 版中,此介面從套件 o.s.amqp.rabbit.core 移至 o.s.amqp.rabbit.listener.api

MessageListenerAdapter

如果您希望在應用程式邏輯和訊息傳遞 API 之間保持更嚴格的分隔,您可以依賴框架提供的配接器實作。這通常稱為「訊息驅動 POJO」支援。

1.5 版引入了更彈性的 POJO 訊息傳遞機制,即 @RabbitListener 註解。請參閱 註解驅動的監聽器端點 以取得更多資訊。

當使用配接器時,您只需要提供配接器本身應調用的實例的參考。以下範例顯示如何執行此操作

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

您可以子類別化配接器並提供 getListenerMethodName() 的實作,以根據訊息動態選擇不同的方法。此方法有兩個參數:originalMessageextractedMessage,後者是任何轉換的結果。預設情況下,會設定 SimpleMessageConverter。請參閱 SimpleMessageConverter 以取得更多資訊以及關於其他可用轉換器的資訊。

從 1.4.2 版開始,原始訊息具有 consumerQueueconsumerTag 屬性,可用於判斷接收訊息的佇列。

從 1.5 版開始,您可以設定消費者佇列或標籤到方法名稱的對應,以動態選擇要呼叫的方法。如果對應中沒有項目,我們會退回到預設監聽器方法。預設監聽器方法 (如果未設定) 是 handleMessage

從 2.0 版開始,提供了一個方便的 FunctionalInterface。以下清單顯示了 FunctionalInterface 的定義

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

    R handleMessage(T t);

}

此介面透過使用 Java 8 lambda 來簡化配接器的設定,如下列範例所示

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
    ...
    return result;
}));

從 2.2 版開始,buildListenerArguments(Object) 已被棄用,並引入了新的 buildListenerArguments(Object, Channel, Message)。新方法可協助監聽器取得 ChannelMessage 引數以執行更多操作,例如在手動確認模式下呼叫 channel.basicReject(long, boolean)。以下清單顯示了最基本的範例

public class ExtendedListenerAdapter extends MessageListenerAdapter {

    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }

}

如果您需要接收「channel」和「message」,現在您可以將 ExtendedListenerAdapter 設定為與 MessageListenerAdapter 相同。監聽器的參數應設定為 buildListenerArguments(Object, Channel, Message) 傳回的值,如下列監聽器範例所示

public void handleMessage(Object object, Channel channel, Message message) throws IOException {
    ...
}

容器

現在您已經了解了 Message 監聽回呼的各種選項,我們可以將注意力轉向容器。基本上,容器處理「主動」責任,以便監聽器回呼可以保持被動。容器是「生命週期」元件的範例。它提供用於啟動和停止的方法。設定容器時,您基本上是在 AMQP 佇列和 MessageListener 實例之間架起橋樑。您必須提供 ConnectionFactory 的參考以及監聽器應從中消費訊息的佇列名稱或 Queue 實例。

在 2.0 版之前,只有一個監聽器容器,即 SimpleMessageListenerContainer。現在有第二個容器,即 DirectMessageListenerContainer。容器之間的差異以及您在選擇要使用哪個容器時可能應用的標準,在 選擇容器 中有所描述。

以下清單顯示了最基本的範例,它透過使用 SimpleMessageListenerContainer 來運作

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));

作為「主動」元件,最常見的是使用 bean 定義建立監聽器容器,以便它可以在背景中執行。以下範例顯示了使用 XML 執行此操作的一種方式

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

以下清單顯示了使用 XML 執行此操作的另一種方式

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

前面的兩個範例都建立了 DirectMessageListenerContainer (請注意 type 屬性 — 它預設為 simple)。

或者,您可能更喜歡使用 Java 設定,這看起來與前面的程式碼片段類似

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}

消費者優先順序

從 RabbitMQ 3.2 版開始,Broker 現在支援消費者優先順序 (請參閱 搭配 RabbitMQ 使用消費者優先順序)。這是透過在消費者上設定 x-priority 引數來啟用的。SimpleMessageListenerContainer 現在支援設定消費者引數,如下列範例所示

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

為了方便起見,命名空間在 listener 元素上提供了 priority 屬性,如下列範例所示

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>

從 1.3 版開始,您可以修改容器在執行階段監聽的佇列。請參閱 監聽器容器佇列

auto-delete 佇列

當容器設定為監聽 auto-delete 佇列時,佇列具有 x-expires 選項,或者 Time-To-Live 策略在 Broker 上設定,當容器停止時 (也就是說,當最後一個消費者被取消時),佇列會被 Broker 移除。在 1.3 版之前,容器無法重新啟動,因為佇列遺失了。RabbitAdmin 只會在連線關閉或開啟時自動重新宣告佇列等等,這在容器停止和啟動時不會發生。

從 1.3 版開始,容器使用 RabbitAdmin 在啟動期間重新宣告任何遺失的佇列。

您也可以搭配 auto-startup="false" 管理員使用條件式宣告 (請參閱 條件式宣告),以將佇列宣告延遲到容器啟動時。以下範例顯示如何執行此操作

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
    auto-startup="false" />

在這種情況下,佇列和交換器由 containerAdmin 宣告,後者具有 auto-startup="false",因此元素不會在內容初始化期間宣告。此外,容器也不會因為相同原因而啟動。當容器稍後啟動時,它會使用其對 containerAdmin 的參考來宣告元素。