AMQP 支援的消息通道

有兩種消息通道實作方式。一種是點對點,另一種是發布-訂閱。這兩種通道都為底層的 AmqpTemplateSimpleMessageListenerContainer 提供了廣泛的配置屬性(如本章前面針對通道适配器和閘道器所示)。但是,我們在此處展示的範例具有最少的配置。探索 XML Schema 以查看可用的屬性。

點對點通道可能如下列範例所示

<int-amqp:channel id="p2pChannel"/>

在底層,前面的範例會導致宣告一個名為 si.p2pChannelQueue,並且此通道會發送到該 Queue(技術上,是透過將消息發送到無名稱的直接交換器,其路由金鑰與此 Queue 的名稱相符)。此通道也會在該 Queue 上註冊一個消費者。如果您希望通道是「可輪詢」而不是消息驅動的,請提供 message-driven 標誌,其值為 false,如下列範例所示

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

發布-訂閱通道可能如下所示

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在底層,前面的範例會導致宣告一個名為 si.fanout.pubSubChannel 的 fanout 交換器,並且此通道會發送到該 fanout 交換器。此通道還宣告一個伺服器命名的獨佔、自動刪除、非持久 Queue,並將其繫結到 fanout 交換器,同時在該 Queue 上註冊一個消費者以接收消息。發布-訂閱通道沒有「可輪詢」選項。它必須是消息驅動的。

從 4.1 版開始,AMQP 支援的消息通道(與 channel-transacted 結合使用)支援 template-channel-transacted,以分隔 AbstractMessageListenerContainerRabbitTemplatetransactional 配置。請注意,先前,channel-transacted 預設為 true。現在,對於 AbstractMessageListenerContainer,預設為 false

在 4.3 版之前,AMQP 支援的通道僅支援具有 Serializable 有效負載和標頭的消息。整個消息被轉換(序列化)並發送到 RabbitMQ。現在,您可以將 extract-payload 屬性(或在使用 Java 配置時使用 setExtractPayload())設定為 true。當此標誌為 true 時,消息有效負載會被轉換,並且標頭會以類似於您使用通道适配器的方式進行映射。這種安排讓 AMQP 支援的通道可以與非序列化有效負載一起使用(也許可以使用另一個消息轉換器,例如 Jackson2JsonMessageConverter)。請參閱 AMQP 消息標頭 以瞭解有關預設映射標頭的更多資訊。您可以透過提供使用 outbound-header-mapperinbound-header-mapper 屬性的自訂映射器來修改映射。您現在還可以指定 default-delivery-mode,它用於在沒有 amqp_deliveryMode 標頭時設定傳遞模式。預設情況下,Spring AMQP MessageProperties 使用 PERSISTENT 傳遞模式。

與其他持久性支援的通道一樣,AMQP 支援的通道旨在提供消息持久性以避免消息遺失。它們不旨在將工作分配給其他對等應用程式。為此,請改用通道适配器。
從 5.0 版開始,可輪詢通道現在會將輪詢器線程阻塞指定的 receiveTimeout(預設為 1 秒)。先前,與其他 PollableChannel 實作不同,如果沒有可用消息,則線程會立即返回排程器,而與接收逾時無關。阻塞比使用 basicGet() 檢索消息(沒有逾時)稍微昂貴一些,因為必須建立一個消費者來接收每個消息。若要還原先前的行為,請將輪詢器的 receiveTimeout 設定為 0。

使用 Java 配置進行配置

以下範例顯示如何使用 Java 配置配置通道

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 進行配置

以下範例顯示如何使用 Java DSL 配置通道

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}