非同步出站閘道

前一節討論的閘道是同步的,亦即發送執行緒會暫停,直到收到回覆(或發生逾時)。Spring Integration 4.3 版新增了非同步閘道,其使用來自 Spring AMQP 的 AsyncRabbitTemplate。當訊息發送時,執行緒會在發送操作完成後立即返回,並且當收到訊息時,回覆會在範本的監聽器容器執行緒上發送。當閘道在輪詢器執行緒上調用時,這可能很有用。執行緒會被釋放,並且可用於框架中的其他任務。

以下列表顯示了 AMQP 非同步出站閘道的可能組態選項

  • Java DSL

  • Java

  • XML

@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
1 此配接器的唯一 ID。 選填。
2 訊息通道,訊息應發送到此通道,以便將其轉換並發布到 AMQP 交換器。 必填。
3 已設定 AsyncRabbitTemplate 的 Bean 參考。 選填(預設為 asyncRabbitTemplate)。
4 AMQP 交換器的名稱,訊息應發送到此交換器。 如果未提供,則訊息會發送到預設的無名稱交換器。 與 'exchange-name-expression' 互斥。 選填。
5 SpEL 表達式,經評估後可判斷 AMQP 交換器的名稱,訊息會發送到此交換器,並以訊息作為根物件。 如果未提供,則訊息會發送到預設的無名稱交換器。 與 'exchange-name' 互斥。 選填。
6 多個消費者註冊時,此消費者的順序,從而啟用負載平衡和故障轉移。 選填(預設為 Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7 訊息通道,回覆應發送到此通道,在從 AMQP 佇列接收並轉換後。 選填。
8 閘道在將回覆訊息發送到 reply-channel 時等待的時間。 這僅適用於 reply-channel 可能會阻塞的情況 — 例如容量受限且目前已滿的 QueueChannel。 預設為無限期。
9 如果在 AsyncRabbitTemplatereceiveTimeout 屬性內未收到任何回覆訊息,且此設定為 true,則閘道會將錯誤訊息發送到入站訊息的 errorChannel 標頭。 如果在 AsyncRabbitTemplatereceiveTimeout 屬性內未收到任何回覆訊息,且此設定為 false,則閘道會將錯誤訊息發送到預設的 errorChannel(如果可用)。 預設為 true
10 發送訊息時要使用的路由金鑰。 預設情況下,這是一個空的 String。 與 'routing-key-expression' 互斥。 選填。
11 SpEL 表達式,經評估後可判斷發送訊息時要使用的路由金鑰,並以訊息作為根物件(例如,'payload.key')。 預設情況下,這是一個空的 String。 與 'routing-key' 互斥。 選填。
12 訊息的預設傳遞模式:PERSISTENTNON_PERSISTENT。 如果 header-mapper 設定了傳遞模式,則會覆寫。 如果 Spring Integration 訊息標頭 (amqp_deliveryMode) 存在,則 DefaultHeaderMapper 會設定該值。 如果未提供此屬性且標頭對應器未設定,則預設值取決於 RabbitTemplate 使用的底層 Spring AMQP MessagePropertiesConverter。 如果未自訂,則預設值為 PERSISTENT。 選填。
13 定義關聯資料的表達式。 提供時,這會設定底層 AMQP 範本以接收發布者確認。 需要專用的 RabbitTemplateCachingConnectionFactory,且其 publisherConfirms 屬性設定為 true。 當收到發布者確認且提供關聯資料時,確認會寫入 confirm-ack-channelconfirm-nack-channel,具體取決於確認類型。 確認的 Payload 是由此表達式定義的關聯資料,並且訊息的 'amqp_publishConfirm' 標頭設定為 true (ack) 或 false (nack)。 對於 nack 實例,會提供額外的標頭 (amqp_publishConfirmNackCause)。 範例:headers['myCorrelationData']payload。 如果表達式解析為 Message<?> 實例(例如 “#this”),則 ack/nack 通道上發出的訊息基於該訊息,並新增了額外的標頭。 另請參閱 發布者確認和退回的替代機制。 選填。
14 正向 (ack) 發布者確認發送到的通道。 Payload 是由 confirm-correlation-expression 定義的關聯資料。 需要底層 AsyncRabbitTemplate 將其 enableConfirms 屬性設定為 true。 另請參閱 發布者確認和退回的替代機制。 選填(預設為 nullChannel)。
15 自 4.2 版起。 負向 (nack) 發布者確認發送到的通道。 Payload 是由 confirm-correlation-expression 定義的關聯資料。 需要底層 AsyncRabbitTemplate 將其 enableConfirms 屬性設定為 true。 另請參閱 發布者確認和退回的替代機制。 選填(預設為 nullChannel)。
16 設定後,如果在此時間(以毫秒為單位)內未收到發布者確認,閘道將合成負面確認 (nack)。 待處理的確認每隔此值的 50% 檢查一次,因此發送 nack 的實際時間將介於此值的 1 倍到 1.5 倍之間。 另請參閱 發布者確認和退回的替代機制。 預設為無(不會產生 nack)。
17 退回訊息發送到的通道。 提供時,底層 AMQP 範本會設定為將無法傳遞的訊息退回閘道。 訊息是從 AMQP 收到的資料建構而成的,並帶有以下額外標頭:amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。 需要底層 AsyncRabbitTemplate 將其 mandatory 屬性設定為 true。 另請參閱 發布者確認和退回的替代機制。 選填。
18 設定為 false 時,端點會在應用程式上下文初始化期間嘗試連線到 Broker。 這樣做可以透過記錄 Broker 關閉時的錯誤訊息,來實現「快速失敗」偵測不良組態。 當 true(預設值)時,當發送第一個訊息時,會建立連線(如果連線尚不存在,因為某些其他元件已建立連線)。

另請參閱 非同步服務啟動器 以取得更多資訊。

RabbitTemplate

當您使用確認和退回時,我們建議連線到 AsyncRabbitTemplateRabbitTemplate 應為專用的。 否則,可能會遇到意外的副作用。