請求/回覆訊息傳遞

AmqpTemplate 也提供各種 sendAndReceive 方法,這些方法接受與先前針對單向傳送操作 (exchangeroutingKeyMessage) 所述相同的引數選項。這些方法對於請求-回覆情境非常有用,因為它們會在傳送前處理必要 reply-to 屬性的設定,並且可以在內部為此目的建立的獨佔佇列上監聽回覆訊息。

類似的請求-回覆方法也適用於 MessageConverter 應用於請求和回覆的情況。這些方法命名為 convertSendAndReceive。如需更多詳細資訊,請參閱 AmqpTemplate 的 Javadoc

從 1.5.0 版開始,每個 sendAndReceive 方法變體都有一個多載版本,它接受 CorrelationData。與正確設定的連線工厂一起使用時,這可以接收操作傳送端的發佈者確認。請參閱 關聯的發佈者確認和回傳 以及 RabbitOperations 的 Javadoc 以取得更多資訊。

從 2.0 版開始,這些方法有變體 (convertSendAndReceiveAsType) 接受額外的 ParameterizedTypeReference 引數來轉換複雜的傳回類型。範本必須設定為 SmartMessageConverter。如需更多資訊,請參閱 使用 RabbitTemplateMessage 轉換

從 2.1 版開始,您可以設定具有 noLocalReplyConsumer 選項的 RabbitTemplate,以控制回覆消費者的 noLocal 旗標。預設值為 false

回覆逾時

預設情況下,傳送和接收方法在五秒後逾時並傳回 null。您可以透過設定 replyTimeout 屬性來修改此行為。從 1.5 版開始,如果您將 mandatory 屬性設定為 true (或 mandatory-expression 對於特定訊息評估為 true),如果訊息無法傳遞到佇列,則會擲回 AmqpMessageReturnedException。此例外狀況具有 returnedMessagereplyCodereplyText 屬性,以及用於傳送的 exchangeroutingKey

此功能使用發佈者回傳。您可以透過在 CachingConnectionFactory 上將 publisherReturns 設定為 true 來啟用它 (請參閱 發佈者確認和回傳)。此外,您不得向 RabbitTemplate 註冊您自己的 ReturnCallback

從 2.1.2 版開始,已新增 replyTimedOut 方法,讓子類別瞭解逾時,以便它們可以清理任何保留的狀態。

從 2.0.11 和 2.1.3 版開始,當您使用預設 DirectReplyToMessageListenerContainer 時,您可以透過設定範本的 replyErrorHandler 屬性來新增錯誤處理常式。此錯誤處理常式會針對任何傳遞失敗 (例如延遲回覆和收到沒有相關標頭的訊息) 而叫用。傳入的例外狀況是 ListenerExecutionFailedException,它具有 failedMessage 屬性。

RabbitMQ 直接回覆至

從 3.4.0 版開始,RabbitMQ 伺服器支援 直接回覆至。這消除了固定回覆佇列的主要原因 (避免需要為每個請求建立暫時佇列)。從 Spring AMQP 1.4.1 版開始,預設會使用直接回覆至 (如果伺服器支援),而不是建立暫時回覆佇列。當未提供 replyQueue (或設定名稱為 amq.rabbitmq.reply-to) 時,RabbitTemplate 會自動偵測是否支援直接回覆至,並使用它或回退到使用暫時回覆佇列。使用直接回覆至時,不需要 reply-listener,也不應設定。

回覆監聽器仍然支援具名佇列 (amq.rabbitmq.reply-to 除外),允許控制回覆並行等等。

從 1.6 版開始,如果您希望為每個回覆使用暫時、獨佔、自動刪除的佇列,請將 useTemporaryReplyQueues 屬性設定為 true。如果您設定 replyAddress,則會忽略此屬性。

您可以透過子類別化 RabbitTemplate 並覆寫 useDirectReplyTo() 以檢查不同的準則,來變更指示是否使用直接回覆至的準則。該方法僅在傳送第一個請求時呼叫一次。

在 2.0 版之前,RabbitTemplate 為每個請求建立一個新的消費者,並在收到回覆 (或逾時) 時取消消費者。現在,範本改用 DirectReplyToMessageListenerContainer,讓消費者可以重複使用。範本仍然負責關聯回覆,因此沒有延遲回覆轉到不同傳送者的風險。如果您想要還原為先前的行為,請將 useDirectReplyToContainer (使用 XML 設定時為 direct-reply-to-container) 屬性設定為 false。

AsyncRabbitTemplate 沒有此選項。當使用直接回覆至時,它始終對回覆使用 DirectReplyToContainer

從 2.3.7 版開始,範本有一個新的屬性 useChannelForCorrelation。當此屬性為 true 時,伺服器不必將相關 ID 從請求訊息標頭複製到回覆訊息。相反地,用於傳送請求的通道用於將回覆與請求關聯。

使用回覆佇列的訊息關聯

當使用固定回覆佇列 (amq.rabbitmq.reply-to 除外) 時,您必須提供關聯資料,以便將回覆與請求關聯。請參閱 RabbitMQ 遠端程序呼叫 (RPC)。預設情況下,標準 correlationId 屬性用於保存關聯資料。但是,如果您希望使用自訂屬性來保存關聯資料,您可以在 <rabbit-template/> 上設定 correlation-key 屬性。明確地將屬性設定為 correlationId 與省略屬性相同。用戶端和伺服器必須使用相同的標頭來進行關聯資料。

Spring AMQP 1.1 版使用名為 spring_reply_correlation 的自訂屬性來處理此資料。如果您希望使用目前版本還原為此行為 (可能是為了維持與使用 1.1 的另一個應用程式的相容性),則必須將屬性設定為 spring_reply_correlation

預設情況下,範本會產生自己的關聯 ID (忽略任何使用者提供的值)。如果您希望使用您自己的關聯 ID,請將 RabbitTemplate 實例的 userCorrelationId 屬性設定為 true

關聯 ID 必須是唯一的,以避免為請求傳回錯誤回覆的可能性。

回覆監聽器容器

當使用 3.4.0 之前的 RabbitMQ 版本時,每個回覆都會使用一個新的暫時佇列。但是,可以在範本上設定單一回覆佇列,這可能更有效率,並且也允許您在該佇列上設定引數。在這種情況下,您還必須提供 <reply-listener/> 子元素。此元素為回覆佇列提供監聽器容器,而範本是監聽器。除了從範本的設定繼承的 connection-factorymessage-converter 之外,<listener-container/> 上允許的所有 訊息監聽器容器設定 屬性都允許在此元素上使用。

如果您執行應用程式的多個實例或使用多個 RabbitTemplate 實例,則 必須 為每個實例使用唯一的回覆佇列。RabbitMQ 無法從佇列中選取訊息,因此,如果它們都使用相同的佇列,則每個實例都會競爭回覆,並且不一定會收到自己的回覆。

以下範例定義具有連線工厂的 Rabbit 範本

<rabbit:template id="amqpTemplate"
        connection-factory="connectionFactory"
        reply-queue="replies"
        reply-address="replyEx/routeReply">
    <rabbit:reply-listener/>
</rabbit:template>

雖然容器和範本共用連線工厂,但它們不共用通道。因此,請求和回覆不會在相同的交易中執行 (如果使用交易)。

在 1.5.0 版之前,reply-address 屬性不可用。回覆始終透過使用預設交換器和 reply-queue 名稱作為路由金鑰來路由。這仍然是預設值,但您現在可以指定新的 reply-address 屬性。reply-address 可以包含格式為 <exchange>/<routingKey> 的位址,並且回覆會路由到指定的交換器,並路由到使用路由金鑰綁定的佇列。reply-address 優先於 reply-queue。當僅使用 reply-address 時,必須將 <reply-listener> 設定為單獨的 <listener-container> 元件。reply-addressreply-queue (或 <listener-container> 上的 queues 屬性) 必須在邏輯上參考相同的佇列。

透過此設定,SimpleListenerContainer 用於接收回覆,而 RabbitTemplateMessageListener。當使用 <rabbit:template/> 命名空間元素定義範本時 (如前述範例所示),剖析器會定義容器並將範本連接為監聽器。

當範本不使用固定 replyQueue (或使用直接回覆至 - 請參閱 RabbitMQ 直接回覆至) 時,不需要監聽器容器。當使用 RabbitMQ 3.4.0 或更高版本時,直接 reply-to 是慣用的機制。

如果您將 RabbitTemplate 定義為 <bean/> 或使用 @Configuration 類別將其定義為 @Bean,或者當您以程式設計方式建立範本時,您需要自行定義和連接回覆監聽器容器。如果您未執行此操作,則範本永遠不會收到回覆,並且最終會逾時並傳回 null 作為對 sendAndReceive 方法呼叫的回覆。

從 1.5 版開始,RabbitTemplate 會偵測是否已設定為 MessageListener 以接收回覆。如果沒有,則嘗試傳送和接收具有回覆位址的訊息將會失敗,並出現 IllegalStateException (因為永遠不會收到回覆)。

此外,如果使用簡單的 replyAddress (佇列名稱),則回覆監聽器容器會驗證它是否正在監聽具有相同名稱的佇列。如果回覆位址是交換器和路由金鑰,則無法執行此檢查,並且會寫入偵錯記錄訊息。

當自行連接回覆監聽器和範本時,務必確保範本的 replyAddress 和容器的 queues (或 queueNames) 屬性參考相同的佇列。範本會將回覆位址插入到輸出訊息 replyTo 屬性中。

以下清單顯示如何手動連接 Bean 的範例

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory" />
    <property name="exchange" value="foo.exchange" />
    <property name="routingKey" value="foo" />
    <property name="replyQueue" ref="replyQ" />
    <property name="replyTimeout" value="600000" />
    <property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <constructor-arg ref="connectionFactory" />
    <property name="queues" ref="replyQ" />
    <property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
    @Bean
    public RabbitTemplate amqpTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(msgConv());
        rabbitTemplate.setReplyAddress(replyQueue().getName());
        rabbitTemplate.setReplyTimeout(60000);
        rabbitTemplate.setUseDirectReplyToContainer(false);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer replyListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueues(replyQueue());
        container.setMessageListener(amqpTemplate());
        return container;
    }

    @Bean
    public Queue replyQueue() {
        return new Queue("my.reply.queue");
    }

此測試案例 中顯示了使用固定回覆佇列連接的 RabbitTemplate 的完整範例,以及處理請求並傳回回覆的「遠端」監聽器容器。

當回覆逾時 (replyTimeout) 時,sendAndReceive() 方法會傳回 null。

在 1.3.6 版之前,僅記錄逾時訊息的延遲回覆。現在,如果收到延遲回覆,則會拒絕它 (範本會擲回 AmqpRejectAndDontRequeueException)。如果回覆佇列設定為將拒絕的訊息傳送到死信交換器,則可以擷取回覆以供稍後分析。若要執行此操作,請將佇列繫結至已設定的死信交換器,路由金鑰等於回覆佇列的名稱。

如需有關設定死信的更多資訊,請參閱 RabbitMQ 死信文件。您也可以查看 FixedReplyQueueDeadLetterTests 測試案例以取得範例。

非同步 Rabbit 範本

1.6 版引入了 AsyncRabbitTemplate。這具有與 AmqpTemplate 上的方法類似的 sendAndReceive (和 convertSendAndReceive) 方法。但是,它們不會封鎖,而是傳回 CompletableFuture

sendAndReceive 方法會傳回 RabbitMessageFutureconvertSendAndReceive 方法會傳回 RabbitConverterFuture

您可以稍後透過叫用 future 上的 get() 來同步擷取結果,或者您可以註冊一個回呼,該回呼會以非同步方式使用結果來呼叫。以下清單顯示了兩種方法

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

    ...

    CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

    // do some more work

    String reply = null;
    try {
        reply = future.get(10, TimeUnit.SECONDS);
    }
    catch (ExecutionException e) {
        ...
    }

    ...

}

public void doSomeWorkAndGetResultAsync() {

    ...

    RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            // success
        }
        else {
            // failure
        }
    });

    ...

}

如果設定了 mandatory 且訊息無法傳遞,則 future 會擲回 ExecutionException,其原因為 AmqpMessageReturnedException,它會封裝傳回的訊息和有關回傳的資訊。

如果設定了 enableConfirms,則 future 具有名為 confirm 的屬性,它本身是 CompletableFuture<Boolean>,其中 true 表示發佈成功。如果確認 future 為 false,則 RabbitFuture 具有另一個名為 nackCause 的屬性,其中包含失敗原因 (如果可用)。

如果在回覆之後收到發佈者確認,則會捨棄該確認,因為回覆表示發佈成功。

您可以設定範本上的 receiveTimeout 屬性以使回覆逾時 (預設值為 30000 - 30 秒)。如果發生逾時,則 future 會以 AmqpReplyTimeoutException 完成。

範本實作 SmartLifecycle。停止範本時,如果存在擱置中的回覆,則會取消擱置中的 Future 實例。

從 2.0 版開始,非同步範本現在支援 直接回覆至,而不是已設定的回覆佇列。若要啟用此功能,請使用下列其中一個建構函式

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)

請參閱 RabbitMQ 直接回覆至,以搭配同步 RabbitTemplate 使用直接回覆至。

2.0 版引入了這些方法的變體 (convertSendAndReceiveAsType),它們採用額外的 ParameterizedTypeReference 引數來轉換複雜的傳回類型。您必須使用 SmartMessageConverter 設定基礎 RabbitTemplate。如需更多資訊,請參閱 使用 RabbitTemplateMessage 轉換

從 3.0 版開始,AsyncRabbitTemplate 方法現在傳回 CompletableFuture s 而不是 ListenableFuture s。

使用 AMQP 的 Spring Remoting

不再支援 Spring remoting,因為該功能已從 Spring Framework 中移除。

改為使用使用 RabbitTemplate (用戶端) 和 @RabbitListenersendAndReceive 操作。