輪詢消費者

AmqpTemplate 本身可用於輪詢 Message 接收。預設情況下,如果沒有訊息可用,會立即傳回 null。沒有任何封鎖。從 1.5 版開始,您可以設定 receiveTimeout (以毫秒為單位),接收方法會封鎖最多那麼長的時間,等待訊息。小於零的值表示無限期封鎖 (或至少直到與 Broker 的連線遺失)。1.6 版引入了 receive 方法的變體,允許在每次呼叫時傳入逾時。

由於接收操作會為每個訊息建立新的 QueueingConsumer,因此此技術實際上不適用於高容量環境。對於這些用例,請考慮使用非同步消費者或 receiveTimeout 為零。

從 2.4.8 版開始,當使用非零逾時時,您可以指定傳遞到 basicConsume 方法中的引數,用於將消費者與通道關聯。例如:template.addConsumerArg("x-priority", 10)

有四種簡單的 receive 方法可用。與傳送端的 Exchange 一樣,有一種方法要求直接在範本本身上設定預設佇列屬性,還有一種方法在執行時接受佇列參數。1.6 版引入了變體來接受 timeoutMillis,以覆寫每個請求的 receiveTimeout。以下列表顯示了這四種方法的定義

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

與傳送訊息的情況一樣,AmqpTemplate 有一些方便的方法可以接收 POJO 而不是 Message 實例,並且實作提供了一種自訂 MessageConverter 的方法,用於建立傳回的 Object:以下列表顯示了這些方法

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

從 2.0 版開始,這些方法有一些變體,它們採用額外的 ParameterizedTypeReference 引數來轉換複雜類型。範本必須配置 SmartMessageConverter。有關更多資訊,請參閱使用 RabbitTemplateMessage 轉換

sendAndReceive 方法類似,從 1.3 版開始,AmqpTemplate 有幾個方便的 receiveAndReply 方法,用於同步接收、處理和回覆訊息。以下列表顯示了這些方法定義

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
       throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
     throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
     ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
            ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

AmqpTemplate 實作負責處理 receivereply 階段。在大多數情況下,您應該僅提供 ReceiveAndReplyCallback 的實作,以對接收到的訊息執行一些業務邏輯,並建構回覆物件或訊息 (如果需要)。請注意,ReceiveAndReplyCallback 可能會傳回 null。在這種情況下,不會傳送回覆,且 receiveAndReply 的運作方式與 receive 方法相同。這讓同一個佇列可以用於混合訊息,其中某些訊息可能不需要回覆。

只有在提供的回呼不是 ReceiveAndReplyMessageCallback 的實例時,才會套用自動訊息 (請求和回覆) 轉換,ReceiveAndReplyMessageCallback 提供原始訊息交換合約。

對於需要自訂邏輯以在執行時根據接收到的訊息和來自 ReceiveAndReplyCallback 的回覆來判斷 replyTo 位址的情況,ReplyToAddressCallback 非常有用。預設情況下,請求訊息中的 replyTo 資訊用於路由回覆。

以下列表顯示了基於 POJO 的接收和回覆範例

boolean received =
        this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {

                public Invoice handle(Order order) {
                        return processOrder(order);
                }
        });
if (received) {
        log.info("We received an order!");
}