非同步消費者
Spring AMQP 也透過使用 @RabbitListener 註解來支援註解的監聽器端點,並提供開放的基礎架構以程式化方式註冊端點。這是目前為止設定非同步消費者最方便的方式。請參閱 註解驅動的監聽器端點 以取得更多詳細資訊。 |
以前用於預取的預設值為 1,這可能會導致有效率的消費者未被充分利用。從 2.0 版開始,預設預取值現在為 250,這應使消費者在大多數常見情況下保持忙碌,從而提高輸送量。 然而,在某些情況下,預取值應該較低
此外,對於低流量訊息傳遞和多個消費者 (包括單一監聽器容器實例中的並行性),您可能希望降低預取值,以在消費者之間獲得更均勻的訊息分配。 請參閱 訊息監聽器容器設定。 如需更多關於預取的背景資訊,請參閱這篇關於 RabbitMQ 中消費者利用率 的文章,以及這篇關於 佇列理論 的文章。 |
訊息監聽器
對於非同步 Message
接收,會涉及一個專用元件 (而非 AmqpTemplate
)。該元件是 Message
消費回呼的容器。我們稍後在本節中會考慮容器及其屬性。不過,首先,我們應該看看回呼,因為這是您的應用程式碼與訊息傳遞系統整合的地方。回呼有一些選項,首先是 MessageListener
介面的實作,以下清單顯示了該介面
public interface MessageListener {
void onMessage(Message message);
}
如果您的回呼邏輯基於任何原因而依賴 AMQP Channel 實例,您可以改用 ChannelAwareMessageListener
。它看起來很相似,但有一個額外的參數。以下清單顯示了 ChannelAwareMessageListener
介面定義
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
在 2.1 版中,此介面從套件 o.s.amqp.rabbit.core 移至 o.s.amqp.rabbit.listener.api 。 |
MessageListenerAdapter
如果您希望在應用程式邏輯和訊息傳遞 API 之間保持更嚴格的分隔,您可以依賴框架提供的配接器實作。這通常稱為「訊息驅動 POJO」支援。
1.5 版引入了更彈性的 POJO 訊息傳遞機制,即 @RabbitListener 註解。請參閱 註解驅動的監聽器端點 以取得更多資訊。 |
當使用配接器時,您只需要提供配接器本身應調用的實例的參考。以下範例顯示如何執行此操作
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
您可以子類別化配接器並提供 getListenerMethodName()
的實作,以根據訊息動態選擇不同的方法。此方法有兩個參數:originalMessage
和 extractedMessage
,後者是任何轉換的結果。預設情況下,會設定 SimpleMessageConverter
。請參閱 SimpleMessageConverter
以取得更多資訊以及關於其他可用轉換器的資訊。
從 1.4.2 版開始,原始訊息具有 consumerQueue
和 consumerTag
屬性,可用於判斷接收訊息的佇列。
從 1.5 版開始,您可以設定消費者佇列或標籤到方法名稱的對應,以動態選擇要呼叫的方法。如果對應中沒有項目,我們會退回到預設監聽器方法。預設監聽器方法 (如果未設定) 是 handleMessage
。
從 2.0 版開始,提供了一個方便的 FunctionalInterface
。以下清單顯示了 FunctionalInterface
的定義
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
此介面透過使用 Java 8 lambda 來簡化配接器的設定,如下列範例所示
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
從 2.2 版開始,buildListenerArguments(Object)
已被棄用,並引入了新的 buildListenerArguments(Object, Channel, Message)
。新方法可協助監聽器取得 Channel
和 Message
引數以執行更多操作,例如在手動確認模式下呼叫 channel.basicReject(long, boolean)
。以下清單顯示了最基本的範例
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
如果您需要接收「channel」和「message」,現在您可以將 ExtendedListenerAdapter
設定為與 MessageListenerAdapter
相同。監聽器的參數應設定為 buildListenerArguments(Object, Channel, Message)
傳回的值,如下列監聽器範例所示
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
容器
現在您已經了解了 Message
監聽回呼的各種選項,我們可以將注意力轉向容器。基本上,容器處理「主動」責任,以便監聽器回呼可以保持被動。容器是「生命週期」元件的範例。它提供用於啟動和停止的方法。設定容器時,您基本上是在 AMQP 佇列和 MessageListener
實例之間架起橋樑。您必須提供 ConnectionFactory
的參考以及監聽器應從中消費訊息的佇列名稱或 Queue 實例。
在 2.0 版之前,只有一個監聽器容器,即 SimpleMessageListenerContainer
。現在有第二個容器,即 DirectMessageListenerContainer
。容器之間的差異以及您在選擇要使用哪個容器時可能應用的標準,在 選擇容器 中有所描述。
以下清單顯示了最基本的範例,它透過使用 SimpleMessageListenerContainer
來運作
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作為「主動」元件,最常見的是使用 bean 定義建立監聽器容器,以便它可以在背景中執行。以下範例顯示了使用 XML 執行此操作的一種方式
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以下清單顯示了使用 XML 執行此操作的另一種方式
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
前面的兩個範例都建立了 DirectMessageListenerContainer
(請注意 type
屬性 — 它預設為 simple
)。
或者,您可能更喜歡使用 Java 設定,這看起來與前面的程式碼片段類似
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
消費者優先順序
從 RabbitMQ 3.2 版開始,Broker 現在支援消費者優先順序 (請參閱 搭配 RabbitMQ 使用消費者優先順序)。這是透過在消費者上設定 x-priority
引數來啟用的。SimpleMessageListenerContainer
現在支援設定消費者引數,如下列範例所示
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
為了方便起見,命名空間在 listener
元素上提供了 priority
屬性,如下列範例所示
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
從 1.3 版開始,您可以修改容器在執行階段監聽的佇列。請參閱 監聽器容器佇列。
auto-delete
佇列
當容器設定為監聽 auto-delete
佇列時,佇列具有 x-expires
選項,或者 Time-To-Live 策略在 Broker 上設定,當容器停止時 (也就是說,當最後一個消費者被取消時),佇列會被 Broker 移除。在 1.3 版之前,容器無法重新啟動,因為佇列遺失了。RabbitAdmin
只會在連線關閉或開啟時自動重新宣告佇列等等,這在容器停止和啟動時不會發生。
從 1.3 版開始,容器使用 RabbitAdmin
在啟動期間重新宣告任何遺失的佇列。
您也可以搭配 auto-startup="false"
管理員使用條件式宣告 (請參閱 條件式宣告),以將佇列宣告延遲到容器啟動時。以下範例顯示如何執行此操作
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
在這種情況下,佇列和交換器由 containerAdmin
宣告,後者具有 auto-startup="false"
,因此元素不會在內容初始化期間宣告。此外,容器也不會因為相同原因而啟動。當容器稍後啟動時,它會使用其對 containerAdmin
的參考來宣告元素。