非同步出站閘道
前一節討論的閘道是同步的,亦即發送執行緒會暫停,直到收到回覆(或發生逾時)。Spring Integration 4.3 版新增了非同步閘道,其使用來自 Spring AMQP 的 AsyncRabbitTemplate
。當訊息發送時,執行緒會在發送操作完成後立即返回,並且當收到訊息時,回覆會在範本的監聽器容器執行緒上發送。當閘道在輪詢器執行緒上調用時,這可能很有用。執行緒會被釋放,並且可用於框架中的其他任務。
以下列表顯示了 AMQP 非同步出站閘道的可能組態選項
-
Java DSL
-
Java
-
XML
@Configuration
public class AmqpAsyncApplication {
@Bean
public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
return f -> f
.handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
.routingKey("queue1")); // default exchange - route to queue 'queue1'
}
@MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
}
@Configuration
public class AmqpAsyncConfig {
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
SimpleMessageListenerContainer replyContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
}
@Bean
public SimpleMessageListenerContainer replyContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
container.setQueueNames("asyncRQ1");
return container;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway" (1)
request-channel="myRequestChannel" (2)
async-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
lazy-connect="true" /> (18)
1 | 此配接器的唯一 ID。 選填。 |
2 | 訊息通道,訊息應發送到此通道,以便將其轉換並發布到 AMQP 交換器。 必填。 |
3 | 已設定 AsyncRabbitTemplate 的 Bean 參考。 選填(預設為 asyncRabbitTemplate )。 |
4 | AMQP 交換器的名稱,訊息應發送到此交換器。 如果未提供,則訊息會發送到預設的無名稱交換器。 與 'exchange-name-expression' 互斥。 選填。 |
5 | SpEL 表達式,經評估後可判斷 AMQP 交換器的名稱,訊息會發送到此交換器,並以訊息作為根物件。 如果未提供,則訊息會發送到預設的無名稱交換器。 與 'exchange-name' 互斥。 選填。 |
6 | 多個消費者註冊時,此消費者的順序,從而啟用負載平衡和故障轉移。 選填(預設為 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] )。 |
7 | 訊息通道,回覆應發送到此通道,在從 AMQP 佇列接收並轉換後。 選填。 |
8 | 閘道在將回覆訊息發送到 reply-channel 時等待的時間。 這僅適用於 reply-channel 可能會阻塞的情況 — 例如容量受限且目前已滿的 QueueChannel 。 預設為無限期。 |
9 | 如果在 AsyncRabbitTemplate 的 receiveTimeout 屬性內未收到任何回覆訊息,且此設定為 true ,則閘道會將錯誤訊息發送到入站訊息的 errorChannel 標頭。 如果在 AsyncRabbitTemplate 的 receiveTimeout 屬性內未收到任何回覆訊息,且此設定為 false ,則閘道會將錯誤訊息發送到預設的 errorChannel (如果可用)。 預設為 true 。 |
10 | 發送訊息時要使用的路由金鑰。 預設情況下,這是一個空的 String 。 與 'routing-key-expression' 互斥。 選填。 |
11 | SpEL 表達式,經評估後可判斷發送訊息時要使用的路由金鑰,並以訊息作為根物件(例如,'payload.key')。 預設情況下,這是一個空的 String 。 與 'routing-key' 互斥。 選填。 |
12 | 訊息的預設傳遞模式:PERSISTENT 或 NON_PERSISTENT 。 如果 header-mapper 設定了傳遞模式,則會覆寫。 如果 Spring Integration 訊息標頭 (amqp_deliveryMode ) 存在,則 DefaultHeaderMapper 會設定該值。 如果未提供此屬性且標頭對應器未設定,則預設值取決於 RabbitTemplate 使用的底層 Spring AMQP MessagePropertiesConverter 。 如果未自訂,則預設值為 PERSISTENT 。 選填。 |
13 | 定義關聯資料的表達式。 提供時,這會設定底層 AMQP 範本以接收發布者確認。 需要專用的 RabbitTemplate 和 CachingConnectionFactory ,且其 publisherConfirms 屬性設定為 true 。 當收到發布者確認且提供關聯資料時,確認會寫入 confirm-ack-channel 或 confirm-nack-channel ,具體取決於確認類型。 確認的 Payload 是由此表達式定義的關聯資料,並且訊息的 'amqp_publishConfirm' 標頭設定為 true (ack ) 或 false (nack )。 對於 nack 實例,會提供額外的標頭 (amqp_publishConfirmNackCause )。 範例:headers['myCorrelationData'] 、payload 。 如果表達式解析為 Message<?> 實例(例如 “#this”),則 ack /nack 通道上發出的訊息基於該訊息,並新增了額外的標頭。 另請參閱 發布者確認和退回的替代機制。 選填。 |
14 | 正向 (ack ) 發布者確認發送到的通道。 Payload 是由 confirm-correlation-expression 定義的關聯資料。 需要底層 AsyncRabbitTemplate 將其 enableConfirms 屬性設定為 true 。 另請參閱 發布者確認和退回的替代機制。 選填(預設為 nullChannel )。 |
15 | 自 4.2 版起。 負向 (nack ) 發布者確認發送到的通道。 Payload 是由 confirm-correlation-expression 定義的關聯資料。 需要底層 AsyncRabbitTemplate 將其 enableConfirms 屬性設定為 true 。 另請參閱 發布者確認和退回的替代機制。 選填(預設為 nullChannel )。 |
16 | 設定後,如果在此時間(以毫秒為單位)內未收到發布者確認,閘道將合成負面確認 (nack)。 待處理的確認每隔此值的 50% 檢查一次,因此發送 nack 的實際時間將介於此值的 1 倍到 1.5 倍之間。 另請參閱 發布者確認和退回的替代機制。 預設為無(不會產生 nack)。 |
17 | 退回訊息發送到的通道。 提供時,底層 AMQP 範本會設定為將無法傳遞的訊息退回閘道。 訊息是從 AMQP 收到的資料建構而成的,並帶有以下額外標頭:amqp_returnReplyCode 、amqp_returnReplyText 、amqp_returnExchange 和 amqp_returnRoutingKey 。 需要底層 AsyncRabbitTemplate 將其 mandatory 屬性設定為 true 。 另請參閱 發布者確認和退回的替代機制。 選填。 |
18 | 設定為 false 時,端點會在應用程式上下文初始化期間嘗試連線到 Broker。 這樣做可以透過記錄 Broker 關閉時的錯誤訊息,來實現「快速失敗」偵測不良組態。 當 true (預設值)時,當發送第一個訊息時,會建立連線(如果連線尚不存在,因為某些其他元件已建立連線)。 |
另請參閱 非同步服務啟動器 以取得更多資訊。
RabbitTemplate
當您使用確認和退回時,我們建議連線到 |