請求/回覆訊息傳遞
AmqpTemplate
也提供各種 sendAndReceive
方法,這些方法接受與先前針對單向傳送操作 (exchange
、routingKey
和 Message
) 所述相同的引數選項。這些方法對於請求-回覆情境非常有用,因為它們會在傳送前處理必要 reply-to
屬性的設定,並且可以在內部為此目的建立的獨佔佇列上監聽回覆訊息。
類似的請求-回覆方法也適用於 MessageConverter
應用於請求和回覆的情況。這些方法命名為 convertSendAndReceive
。如需更多詳細資訊,請參閱 AmqpTemplate
的 Javadoc。
從 1.5.0 版開始,每個 sendAndReceive
方法變體都有一個多載版本,它接受 CorrelationData
。與正確設定的連線工厂一起使用時,這可以接收操作傳送端的發佈者確認。請參閱 關聯的發佈者確認和回傳 以及 RabbitOperations
的 Javadoc 以取得更多資訊。
從 2.0 版開始,這些方法有變體 (convertSendAndReceiveAsType
) 接受額外的 ParameterizedTypeReference
引數來轉換複雜的傳回類型。範本必須設定為 SmartMessageConverter
。如需更多資訊,請參閱 使用 RabbitTemplate
從 Message
轉換。
從 2.1 版開始,您可以設定具有 noLocalReplyConsumer
選項的 RabbitTemplate
,以控制回覆消費者的 noLocal
旗標。預設值為 false
。
回覆逾時
預設情況下,傳送和接收方法在五秒後逾時並傳回 null。您可以透過設定 replyTimeout
屬性來修改此行為。從 1.5 版開始,如果您將 mandatory
屬性設定為 true
(或 mandatory-expression
對於特定訊息評估為 true
),如果訊息無法傳遞到佇列,則會擲回 AmqpMessageReturnedException
。此例外狀況具有 returnedMessage
、replyCode
和 replyText
屬性,以及用於傳送的 exchange
和 routingKey
。
此功能使用發佈者回傳。您可以透過在 CachingConnectionFactory 上將 publisherReturns 設定為 true 來啟用它 (請參閱 發佈者確認和回傳)。此外,您不得向 RabbitTemplate 註冊您自己的 ReturnCallback 。 |
從 2.1.2 版開始,已新增 replyTimedOut
方法,讓子類別瞭解逾時,以便它們可以清理任何保留的狀態。
從 2.0.11 和 2.1.3 版開始,當您使用預設 DirectReplyToMessageListenerContainer
時,您可以透過設定範本的 replyErrorHandler
屬性來新增錯誤處理常式。此錯誤處理常式會針對任何傳遞失敗 (例如延遲回覆和收到沒有相關標頭的訊息) 而叫用。傳入的例外狀況是 ListenerExecutionFailedException
,它具有 failedMessage
屬性。
RabbitMQ 直接回覆至
從 3.4.0 版開始,RabbitMQ 伺服器支援 直接回覆至。這消除了固定回覆佇列的主要原因 (避免需要為每個請求建立暫時佇列)。從 Spring AMQP 1.4.1 版開始,預設會使用直接回覆至 (如果伺服器支援),而不是建立暫時回覆佇列。當未提供 replyQueue (或設定名稱為 amq.rabbitmq.reply-to ) 時,RabbitTemplate 會自動偵測是否支援直接回覆至,並使用它或回退到使用暫時回覆佇列。使用直接回覆至時,不需要 reply-listener ,也不應設定。 |
回覆監聽器仍然支援具名佇列 (amq.rabbitmq.reply-to
除外),允許控制回覆並行等等。
從 1.6 版開始,如果您希望為每個回覆使用暫時、獨佔、自動刪除的佇列,請將 useTemporaryReplyQueues
屬性設定為 true
。如果您設定 replyAddress
,則會忽略此屬性。
您可以透過子類別化 RabbitTemplate
並覆寫 useDirectReplyTo()
以檢查不同的準則,來變更指示是否使用直接回覆至的準則。該方法僅在傳送第一個請求時呼叫一次。
在 2.0 版之前,RabbitTemplate
為每個請求建立一個新的消費者,並在收到回覆 (或逾時) 時取消消費者。現在,範本改用 DirectReplyToMessageListenerContainer
,讓消費者可以重複使用。範本仍然負責關聯回覆,因此沒有延遲回覆轉到不同傳送者的風險。如果您想要還原為先前的行為,請將 useDirectReplyToContainer
(使用 XML 設定時為 direct-reply-to-container
) 屬性設定為 false。
AsyncRabbitTemplate
沒有此選項。當使用直接回覆至時,它始終對回覆使用 DirectReplyToContainer
。
從 2.3.7 版開始,範本有一個新的屬性 useChannelForCorrelation
。當此屬性為 true
時,伺服器不必將相關 ID 從請求訊息標頭複製到回覆訊息。相反地,用於傳送請求的通道用於將回覆與請求關聯。
使用回覆佇列的訊息關聯
當使用固定回覆佇列 (amq.rabbitmq.reply-to
除外) 時,您必須提供關聯資料,以便將回覆與請求關聯。請參閱 RabbitMQ 遠端程序呼叫 (RPC)。預設情況下,標準 correlationId
屬性用於保存關聯資料。但是,如果您希望使用自訂屬性來保存關聯資料,您可以在 <rabbit-template/> 上設定 correlation-key
屬性。明確地將屬性設定為 correlationId
與省略屬性相同。用戶端和伺服器必須使用相同的標頭來進行關聯資料。
Spring AMQP 1.1 版使用名為 spring_reply_correlation 的自訂屬性來處理此資料。如果您希望使用目前版本還原為此行為 (可能是為了維持與使用 1.1 的另一個應用程式的相容性),則必須將屬性設定為 spring_reply_correlation 。 |
預設情況下,範本會產生自己的關聯 ID (忽略任何使用者提供的值)。如果您希望使用您自己的關聯 ID,請將 RabbitTemplate
實例的 userCorrelationId
屬性設定為 true
。
關聯 ID 必須是唯一的,以避免為請求傳回錯誤回覆的可能性。 |
回覆監聽器容器
當使用 3.4.0 之前的 RabbitMQ 版本時,每個回覆都會使用一個新的暫時佇列。但是,可以在範本上設定單一回覆佇列,這可能更有效率,並且也允許您在該佇列上設定引數。在這種情況下,您還必須提供 <reply-listener/> 子元素。此元素為回覆佇列提供監聽器容器,而範本是監聽器。除了從範本的設定繼承的 connection-factory
和 message-converter
之外,<listener-container/> 上允許的所有 訊息監聽器容器設定 屬性都允許在此元素上使用。
如果您執行應用程式的多個實例或使用多個 RabbitTemplate 實例,則 必須 為每個實例使用唯一的回覆佇列。RabbitMQ 無法從佇列中選取訊息,因此,如果它們都使用相同的佇列,則每個實例都會競爭回覆,並且不一定會收到自己的回覆。 |
以下範例定義具有連線工厂的 Rabbit 範本
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
雖然容器和範本共用連線工厂,但它們不共用通道。因此,請求和回覆不會在相同的交易中執行 (如果使用交易)。
在 1.5.0 版之前,reply-address 屬性不可用。回覆始終透過使用預設交換器和 reply-queue 名稱作為路由金鑰來路由。這仍然是預設值,但您現在可以指定新的 reply-address 屬性。reply-address 可以包含格式為 <exchange>/<routingKey> 的位址,並且回覆會路由到指定的交換器,並路由到使用路由金鑰綁定的佇列。reply-address 優先於 reply-queue 。當僅使用 reply-address 時,必須將 <reply-listener> 設定為單獨的 <listener-container> 元件。reply-address 和 reply-queue (或 <listener-container> 上的 queues 屬性) 必須在邏輯上參考相同的佇列。 |
透過此設定,SimpleListenerContainer
用於接收回覆,而 RabbitTemplate
是 MessageListener
。當使用 <rabbit:template/> 命名空間元素定義範本時 (如前述範例所示),剖析器會定義容器並將範本連接為監聽器。
當範本不使用固定 replyQueue (或使用直接回覆至 - 請參閱 RabbitMQ 直接回覆至) 時,不需要監聽器容器。當使用 RabbitMQ 3.4.0 或更高版本時,直接 reply-to 是慣用的機制。 |
如果您將 RabbitTemplate
定義為 <bean/>
或使用 @Configuration
類別將其定義為 @Bean
,或者當您以程式設計方式建立範本時,您需要自行定義和連接回覆監聽器容器。如果您未執行此操作,則範本永遠不會收到回覆,並且最終會逾時並傳回 null 作為對 sendAndReceive
方法呼叫的回覆。
從 1.5 版開始,RabbitTemplate
會偵測是否已設定為 MessageListener
以接收回覆。如果沒有,則嘗試傳送和接收具有回覆位址的訊息將會失敗,並出現 IllegalStateException
(因為永遠不會收到回覆)。
此外,如果使用簡單的 replyAddress
(佇列名稱),則回覆監聽器容器會驗證它是否正在監聽具有相同名稱的佇列。如果回覆位址是交換器和路由金鑰,則無法執行此檢查,並且會寫入偵錯記錄訊息。
當自行連接回覆監聽器和範本時,務必確保範本的 replyAddress 和容器的 queues (或 queueNames ) 屬性參考相同的佇列。範本會將回覆位址插入到輸出訊息 replyTo 屬性中。 |
以下清單顯示如何手動連接 Bean 的範例
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
在 此測試案例 中顯示了使用固定回覆佇列連接的 RabbitTemplate
的完整範例,以及處理請求並傳回回覆的「遠端」監聽器容器。
當回覆逾時 (replyTimeout ) 時,sendAndReceive() 方法會傳回 null。 |
在 1.3.6 版之前,僅記錄逾時訊息的延遲回覆。現在,如果收到延遲回覆,則會拒絕它 (範本會擲回 AmqpRejectAndDontRequeueException
)。如果回覆佇列設定為將拒絕的訊息傳送到死信交換器,則可以擷取回覆以供稍後分析。若要執行此操作,請將佇列繫結至已設定的死信交換器,路由金鑰等於回覆佇列的名稱。
如需有關設定死信的更多資訊,請參閱 RabbitMQ 死信文件。您也可以查看 FixedReplyQueueDeadLetterTests
測試案例以取得範例。
非同步 Rabbit 範本
1.6 版引入了 AsyncRabbitTemplate
。這具有與 AmqpTemplate
上的方法類似的 sendAndReceive
(和 convertSendAndReceive
) 方法。但是,它們不會封鎖,而是傳回 CompletableFuture
。
sendAndReceive
方法會傳回 RabbitMessageFuture
。convertSendAndReceive
方法會傳回 RabbitConverterFuture
。
您可以稍後透過叫用 future 上的 get()
來同步擷取結果,或者您可以註冊一個回呼,該回呼會以非同步方式使用結果來呼叫。以下清單顯示了兩種方法
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果設定了 mandatory
且訊息無法傳遞,則 future 會擲回 ExecutionException
,其原因為 AmqpMessageReturnedException
,它會封裝傳回的訊息和有關回傳的資訊。
如果設定了 enableConfirms
,則 future 具有名為 confirm
的屬性,它本身是 CompletableFuture<Boolean>
,其中 true
表示發佈成功。如果確認 future 為 false
,則 RabbitFuture
具有另一個名為 nackCause
的屬性,其中包含失敗原因 (如果可用)。
如果在回覆之後收到發佈者確認,則會捨棄該確認,因為回覆表示發佈成功。 |
您可以設定範本上的 receiveTimeout
屬性以使回覆逾時 (預設值為 30000
- 30 秒)。如果發生逾時,則 future 會以 AmqpReplyTimeoutException
完成。
範本實作 SmartLifecycle
。停止範本時,如果存在擱置中的回覆,則會取消擱置中的 Future
實例。
從 2.0 版開始,非同步範本現在支援 直接回覆至,而不是已設定的回覆佇列。若要啟用此功能,請使用下列其中一個建構函式
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
請參閱 RabbitMQ 直接回覆至,以搭配同步 RabbitTemplate
使用直接回覆至。
2.0 版引入了這些方法的變體 (convertSendAndReceiveAsType
),它們採用額外的 ParameterizedTypeReference
引數來轉換複雜的傳回類型。您必須使用 SmartMessageConverter
設定基礎 RabbitTemplate
。如需更多資訊,請參閱 使用 RabbitTemplate
從 Message
轉換。
從 3.0 版開始,AsyncRabbitTemplate 方法現在傳回 CompletableFuture s 而不是 ListenableFuture s。 |