AMQP 支援的消息通道
有兩種消息通道實作方式。一種是點對點,另一種是發布-訂閱。這兩種通道都為底層的 AmqpTemplate
和 SimpleMessageListenerContainer
提供了廣泛的配置屬性(如本章前面針對通道适配器和閘道器所示)。但是,我們在此處展示的範例具有最少的配置。探索 XML Schema 以查看可用的屬性。
點對點通道可能如下列範例所示
<int-amqp:channel id="p2pChannel"/>
在底層,前面的範例會導致宣告一個名為 si.p2pChannel
的 Queue
,並且此通道會發送到該 Queue
(技術上,是透過將消息發送到無名稱的直接交換器,其路由金鑰與此 Queue
的名稱相符)。此通道也會在該 Queue
上註冊一個消費者。如果您希望通道是「可輪詢」而不是消息驅動的,請提供 message-driven
標誌,其值為 false
,如下列範例所示
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
發布-訂閱通道可能如下所示
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在底層,前面的範例會導致宣告一個名為 si.fanout.pubSubChannel
的 fanout 交換器,並且此通道會發送到該 fanout 交換器。此通道還宣告一個伺服器命名的獨佔、自動刪除、非持久 Queue
,並將其繫結到 fanout 交換器,同時在該 Queue
上註冊一個消費者以接收消息。發布-訂閱通道沒有「可輪詢」選項。它必須是消息驅動的。
從 4.1 版開始,AMQP 支援的消息通道(與 channel-transacted
結合使用)支援 template-channel-transacted
,以分隔 AbstractMessageListenerContainer
和 RabbitTemplate
的 transactional
配置。請注意,先前,channel-transacted
預設為 true
。現在,對於 AbstractMessageListenerContainer
,預設為 false
。
在 4.3 版之前,AMQP 支援的通道僅支援具有 Serializable
有效負載和標頭的消息。整個消息被轉換(序列化)並發送到 RabbitMQ。現在,您可以將 extract-payload
屬性(或在使用 Java 配置時使用 setExtractPayload()
)設定為 true
。當此標誌為 true
時,消息有效負載會被轉換,並且標頭會以類似於您使用通道适配器的方式進行映射。這種安排讓 AMQP 支援的通道可以與非序列化有效負載一起使用(也許可以使用另一個消息轉換器,例如 Jackson2JsonMessageConverter
)。請參閱 AMQP 消息標頭 以瞭解有關預設映射標頭的更多資訊。您可以透過提供使用 outbound-header-mapper
和 inbound-header-mapper
屬性的自訂映射器來修改映射。您現在還可以指定 default-delivery-mode
,它用於在沒有 amqp_deliveryMode
標頭時設定傳遞模式。預設情況下,Spring AMQP MessageProperties
使用 PERSISTENT
傳遞模式。
與其他持久性支援的通道一樣,AMQP 支援的通道旨在提供消息持久性以避免消息遺失。它們不旨在將工作分配給其他對等應用程式。為此,請改用通道适配器。 |
從 5.0 版開始,可輪詢通道現在會將輪詢器線程阻塞指定的 receiveTimeout (預設為 1 秒)。先前,與其他 PollableChannel 實作不同,如果沒有可用消息,則線程會立即返回排程器,而與接收逾時無關。阻塞比使用 basicGet() 檢索消息(沒有逾時)稍微昂貴一些,因為必須建立一個消費者來接收每個消息。若要還原先前的行為,請將輪詢器的 receiveTimeout 設定為 0。 |
使用 Java 配置進行配置
以下範例顯示如何使用 Java 配置配置通道
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 進行配置
以下範例顯示如何使用 Java DSL 配置通道
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}