JMS 支援
Spring Integration 提供通道配接器,用於接收和傳送 JMS 訊息。
您需要將此相依性包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:6.3.5"
必須透過某些 JMS 廠商特定的實作 (例如 Apache ActiveMQ) 顯式新增 jakarta.jms:jakarta.jms-api
。
實際上,有兩種基於 JMS 的輸入通道配接器。第一種使用 Spring 的 JmsTemplate
根據輪詢週期進行接收。第二種是「訊息驅動」,並依賴 Spring MessageListener
容器。輸出通道配接器使用 JmsTemplate
轉換並按需傳送 JMS 訊息。
透過使用 JmsTemplate
和 MessageListener
容器,Spring Integration 依賴 Spring 的 JMS 支援。理解這一點很重要,因為這些配接器上公開的大多數屬性都設定底層的 JmsTemplate
和 MessageListener
容器。有關 JmsTemplate
和 MessageListener
容器的更多詳細資訊,請參閱 Spring JMS 文件。
雖然 JMS 通道配接器旨在用於單向訊息傳遞 (僅傳送或僅接收),但 Spring Integration 也為請求和回覆操作提供輸入和輸出 JMS 閘道。輸入閘道依賴 Spring 的 MessageListener
容器實作之一進行訊息驅動接收。它還能夠將傳回值傳送到接收訊息提供的 reply-to
目的地。輸出閘道將 JMS 訊息傳送到 request-destination
(或 request-destination-name
或 request-destination-expression
),然後接收回覆訊息。您可以顯式設定 reply-destination
參考 (或 reply-destination-name
或 reply-destination-expression
)。否則,輸出閘道會使用 JMS TemporaryQueue。
在 Spring Integration 2.2 之前,如有必要,會為每個請求或回覆建立 (和移除) TemporaryQueue
。從 Spring Integration 2.2 開始,您可以設定輸出閘道以使用 MessageListener
容器接收回覆,而不是直接使用新的 (或快取的) Consumer
來接收每個請求的回覆。如此設定後,且未提供顯式回覆目的地時,每個閘道會使用單個 TemporaryQueue
,而不是每個請求一個。
從 6.0 版開始,如果將 replyPubSubDomain
選項設定為 true
,則輸出閘道會建立 TemporaryTopic
而不是 TemporaryQueue
。某些 JMS 廠商以不同方式處理這些目的地。
輸入通道配接器
輸入通道配接器需要參考單個 JmsTemplate
實例,或同時參考 ConnectionFactory
和 Destination
(您可以提供 'destinationName' 來代替 'destination' 參考)。以下範例定義了具有 Destination
參考的輸入通道配接器
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundAdapter(connectionFactory)
.destination("inQueue"),
e -> e.poller(poller -> poller.fixedRate(30000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
integrationFlow(
Jms.inboundAdapter(connectionFactory).destination("inQueue"),
{ poller { Pollers.fixedRate(30000) } })
{
handle { m -> println(m.payload) }
}
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
source.setDestinationName("inQueue");
return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
<int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
請注意先前的設定,inbound-channel-adapter 是一個輪詢消費者。這表示它會在觸發時調用 receive() 。您只應在輪詢頻率相對較低且及時性不重要的情况下使用此功能。對於所有其他情况 (絕大多數基於 JMS 的用例),message-driven-channel-adapter (稍後描述) 是更好的選擇。 |
預設情况下,所有需要參考 ConnectionFactory 的 JMS 配接器都會自動尋找名為 jmsConnectionFactory 的 Bean。這就是為什麼您在許多範例中看不到 connection-factory 屬性的原因。但是,如果您的 JMS ConnectionFactory 具有不同的 Bean 名稱,則需要提供該屬性。 |
如果 extract-payload
設定為 true
(預設值),則接收到的 JMS 訊息會透過 MessageConverter
傳遞。當依賴預設的 SimpleMessageConverter
時,這表示產生的 Spring Integration 訊息將 JMS 訊息的主體作為其 Payload。JMS TextMessage
產生基於字串的 Payload,JMS BytesMessage
產生位元組陣列 Payload,而 JMS ObjectMessage
的可序列化實例會成為 Spring Integration 訊息的 Payload。如果您希望將原始 JMS 訊息作為 Spring Integration 訊息的 Payload,請將 extractPayload
選項設定為 false
。
從 5.0.8 版開始,org.springframework.jms.connection.CachingConnectionFactory
和 cacheConsumers
的 receive-timeout
預設值為 -1
(不等待),否則為 1 秒。JMS 輸入通道配接器會根據提供的 ConnectionFactory
和選項建立 DynamicJmsTemplate
。如果需要外部 JmsTemplate
(例如在 Spring Boot 環境中),或 ConnectionFactory
未快取,或沒有 cacheConsumers
,建議設定 jmsTemplate.receiveTimeout(-1)
(如果預期進行非阻塞消費)
Jms.inboundAdapter(connectionFactory)
.destination(queueName)
.configureJmsTemplate(template -> template.receiveTimeout(-1))
交易
從 4.0 版開始,輸入通道配接器支援 session-transacted
屬性。在較早的版本中,您必須注入將 sessionTransacted
設定為 true
的 JmsTemplate
。(配接器確實允許您將 acknowledge
屬性設定為 transacted
,但這是錯誤的,且不起作用)。
但是,請注意,將 session-transacted
設定為 true
的價值不大,因為交易會在 receive()
操作之後且在訊息傳送到 channel
之前立即提交。
如果您希望整個流程都是交易性的 (例如,如果存在下游輸出通道配接器),則必須將 transactional
poller 與 JmsTransactionManager
一起使用。或者,考慮使用 jms-message-driven-channel-adapter
,並將 acknowledge
設定為 transacted
(預設值)。
訊息驅動通道配接器
message-driven-channel-adapter
需要參考 Spring MessageListener
容器的實例 (AbstractMessageListenerContainer
的任何子類別),或同時參考 ConnectionFactory
和 Destination
(可以提供 'destinationName' 來代替 'destination' 參考)。以下範例定義了具有 Destination
參考的訊息驅動通道配接器
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue"))
.channel("exampleChannel")
.get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
integrationFlow(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue")) {
channel("exampleChannel")
}
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(cf());
container.setDestinationName("inQueue");
return container;
}
@Bean
public ChannelPublishingJmsMessageListener listener() {
ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
listener.setRequestChannelName("exampleChannel");
return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>
訊息驅動配接器也接受與 如果您有自訂的監聽器容器實作 (通常是 |
您無法使用 Spring JMS 命名空間元素 建議為 |
從 4.2 版開始,預設的 acknowledge 模式為 transacted ,除非您提供外部容器。在這種情况下,您應根據需要設定容器。我們建議將 transacted 與 DefaultMessageListenerContainer 一起使用,以避免訊息遺失。 |
'extract-payload' 屬性具有相同的效果,其預設值為 'true'。poller
元素不適用於訊息驅動通道配接器,因為它是主動調用的。對於大多數情境,訊息驅動方法更好,因為訊息會在從底層 JMS 消費者接收後立即傳遞到 MessageChannel
。
最後,<message-driven-channel-adapter>
元素也接受 'error-channel' 屬性。這提供了與 進入 GatewayProxyFactoryBean
中描述的相同的基本功能。以下範例顯示如何在訊息驅動通道配接器上設定錯誤通道
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
channel="exampleChannel"
error-channel="exampleErrorChannel"/>
當將先前的範例與通用閘道設定或我們稍後討論的 JMS 'inbound-gateway' 進行比較時,主要區別在於我們處於單向流程中,因為這是 'channel-adapter',而不是閘道。因此,從 'error-channel' 下游的流程也應為單向流程。例如,它可以傳送到記錄處理器,也可以連接到不同的 JMS <outbound-channel-adapter>
元素。
從主題消費時,請將 pub-sub-domain
屬性設定為 true。對於持久訂閱,請將 subscription-durable
設定為 true
,對於共用訂閱,請將 subscription-shared
設定為 true
(這需要 JMS 2.0 Broker,並且自 4.2 版以來可用)。使用 subscription-name
為訂閱命名。
從 5.1 版開始,當端點在應用程式保持執行狀態時停止時,底層的監聽器容器會關閉,從而關閉其共用連線和消費者。先前,連線和消費者保持開啟狀態。若要還原為先前的行為,請在 JmsMessageDrivenEndpoint
上將 shutdownContainerOnStop
設定為 false
。
從 6.3 版開始,ChannelPublishingJmsMessageListener
現在可以提供 RetryTemplate
和 RecoveryCallback<Message<?>>
,用於下游傳送和傳送與接收操作的重試。這些選項也公開到 Java DSL 的 JmsMessageDrivenChannelAdapterSpec
中。
輸入轉換錯誤
從 4.2 版開始,'error-channel' 也用於轉換錯誤。先前,如果 JMS <message-driven-channel-adapter/>
或 <inbound-gateway/>
由於轉換錯誤而無法傳遞訊息,則會將例外狀況擲回容器。如果容器設定為使用交易,則訊息會回滾並重複重新傳遞。轉換過程在訊息建構之前和期間發生,因此不會將此類錯誤傳送到 'error-channel'。現在,此類轉換例外狀況會導致將 ErrorMessage
傳送到 'error-channel',並將例外狀況作為 payload
。如果您希望交易回滾,並且已定義 'error-channel',則 'error-channel' 上的整合流程必須重新擲回例外狀況 (或其他例外狀況)。如果錯誤流程未擲回例外狀況,則會提交交易並移除訊息。如果未定義 'error-channel',則會像以前一樣將例外狀況擲回容器。
輸出通道配接器
JmsSendingMessageHandler
實作 MessageHandler
介面,並且能夠將 Spring Integration Messages
轉換為 JMS 訊息,然後傳送到 JMS 目的地。它需要 jmsTemplate
參考或同時需要 jmsConnectionFactory
和 destination
參考 (可以提供 destinationName
來代替 destination
)。與輸入通道配接器一樣,設定此配接器的最簡單方法是使用命名空間支援。以下設定產生一個配接器,該配接器從 exampleChannel
接收 Spring Integration 訊息,將這些訊息轉換為 JMS 訊息,並將其傳送到 Bean 名稱為 outQueue
的 JMS 目的地參考
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
-
Java
-
XML
@Bean
public IntegrationFlow jmsOutboundFlow() {
return IntegrationFlow.from("exampleChannel")
.handle(Jms.outboundAdapter(cachingConnectionFactory())
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
.configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
integrationFlow("exampleChannel") {
handle(Jms.outboundAdapter(jmsConnectionFactory())
.apply {
destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression("10000")
configureJmsTemplate { it.explicitQosEnabled(true) }
}
)
}
@Bean
jmsOutboundFlow() {
integrationFlow('exampleChannel') {
handle(Jms.outboundAdapter(new ActiveMQConnectionFactory())
.with {
destinationExpression 'headers.' + SimpMessageHeaderAccessor.DESTINATION_HEADER
deliveryModeFunction { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression '10000'
configureJmsTemplate {
it.explicitQosEnabled true
}
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
handler.setDestinationName("outQueue");
return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>
與輸入通道配接器一樣,有一個 'extract-payload' 屬性。但是,對於輸出配接器,其含義是相反的。布林屬性不是應用於 JMS 訊息,而是應用於 Spring Integration 訊息 Payload。換句話說,決定是將 Spring Integration 訊息本身作為 JMS 訊息主體傳遞,還是將 Spring Integration 訊息 Payload 作為 JMS 訊息主體傳遞。預設值為 'true'。因此,如果您傳遞 Payload 為 String
的 Spring Integration 訊息,則會建立 JMS TextMessage
。另一方面,如果您想透過 JMS 將實際的 Spring Integration 訊息傳送到另一個系統,請將其設定為 'false'。
無論 Payload 擷取的布林值為何,只要您依賴預設轉換器或提供對 MessageConverter 另一個實例的參考,Spring Integration MessageHeaders 都會對應到 JMS 屬性。(對於 '輸入' 配接器也是如此,只是在這些情况下,JMS 屬性會對應到 Spring Integration MessageHeaders )。 |
從 5.1 版開始,可以使用 deliveryModeExpression
和 timeToLiveExpression
屬性設定 <int-jms:outbound-channel-adapter>
(JmsSendingMessageHandler
),以根據請求 Spring Message
在執行階段評估 JMS 訊息的適當 QoS 值。DefaultJmsHeaderMapper
的新 setMapInboundDeliveryMode(true)
和 setMapInboundExpiration(true)
選項可以促進從訊息標頭動態 deliveryMode
和 timeToLive
的資訊來源
<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>
輸入閘道
Spring Integration 的訊息驅動 JMS 輸入閘道委派給 MessageListener
容器,支援動態調整並行消費者,並且還可以處理回覆。輸入閘道需要參考 ConnectionFactory
和請求 Destination
(或 'requestDestinationName')。以下範例定義了一個 JMS inbound-gateway
,它從 Bean ID 為 inQueue
的 JMS 佇列接收,並傳送到名為 exampleChannel
的 Spring Integration 通道
<int-jms:inbound-gateway id="jmsInGateway"
request-destination="inQueue"
request-channel="exampleChannel"/>
由於閘道提供請求-回覆行為,而不是單向傳送或接收行為,因此它們還具有兩個不同的「Payload 擷取」屬性 (如 先前討論 的通道配接器的 'extract-payload' 設定)。對於輸入閘道,'extract-request-payload' 屬性決定是否擷取接收到的 JMS 訊息主體。如果為 'false',則 JMS 訊息本身會成為 Spring Integration 訊息 Payload。預設值為 'true'。
同樣地,對於輸入閘道,'extract-reply-payload' 屬性適用於要轉換為回覆 JMS 訊息的 Spring Integration 訊息。如果您想傳遞整個 Spring Integration 訊息 (作為 JMS ObjectMessage 的主體),請將此值設定為 'false'。預設情况下,它也是 'true',即 Spring Integration 訊息 Payload 會轉換為 JMS 訊息 (例如,String
Payload 會變成 JMS TextMessage)。
與其他任何事物一樣,閘道調用可能會導致錯誤。預設情况下,不會通知生產者在消費者端可能發生的錯誤,並且會逾時等待回覆。但是,有時您可能希望將錯誤狀況傳達回消費者 (換句話說,您可能希望透過將例外狀況對應到訊息來將其視為有效回覆)。為了實現此目的,JMS 輸入閘道提供對訊息通道的支援,錯誤可以傳送到該通道進行處理,從而可能產生符合某些合約的回覆訊息 Payload,該合約定義了呼叫者可能期望作為「錯誤」回覆的內容。您可以使用 error-channel 屬性來設定此類通道,如下列範例所示
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="jmsInputChannel"
error-channel="errorTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
您可能會注意到,此範例看起來與 進入 GatewayProxyFactoryBean
中包含的範例非常相似。相同的想法也適用於此處:exceptionTransformer
可以是建立錯誤回應物件的 POJO,您可以參考 nullChannel
來抑制錯誤,或者您可以省略 'error-channel' 以讓例外狀況傳播。
請參閱 輸入轉換錯誤。
從主題消費時,請將 pub-sub-domain
屬性設定為 true。對於持久訂閱,請將 subscription-durable
設定為 true
,對於共用訂閱,請將 subscription-shared
設定為 true
(需要 JMS 2.0 Broker,並且自 4.2 版以來可用)。使用 subscription-name
為訂閱命名。
從 4.2 版開始,預設的 acknowledge 模式為 transacted ,除非提供了外部容器。在這種情况下,您應根據需要設定容器。我們建議您將 transacted 與 DefaultMessageListenerContainer 一起使用,以避免訊息遺失。 |
從 5.1 版開始,當端點在應用程式保持執行狀態時停止時,底層的監聽器容器會關閉,從而關閉其共用連線和消費者。先前,連線和消費者保持開啟狀態。若要還原為先前的行為,請在 JmsInboundGateway
上將 shutdownContainerOnStop
設定為 false
。
預設情况下,JmsInboundGateway
會在接收到的訊息中尋找 jakarta.jms.Message.getJMSReplyTo()
屬性,以確定將回覆傳送到何處。否則,可以使用靜態 defaultReplyDestination
或 defaultReplyQueueName
或 defaultReplyTopicName
進行設定。此外,從 6.1 版開始,可以在提供的 ChannelPublishingJmsMessageListener
上設定 replyToExpression
,以便在請求上的標準 JMSReplyTo
屬性為 null
時動態確定回覆目的地。接收到的 jakarta.jms.Message
用作根評估內容物件。以下範例示範如何使用 Java DSL API 設定輸入 JMS 閘道,該閘道具有從請求訊息解析的自訂回覆目的地
@Bean
public IntegrationFlow jmsInboundGatewayFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Jms.inboundGateway(connectionFactory)
.requestDestination("requestDestination")
.replyToFunction(message -> message.getStringProperty("myReplyTo")))
.<String, String>transform(String::toUpperCase)
.get();
}
從 6.3 版開始,Jms.inboundGateway()
API 公開了 retryTemplate()
和 recoveryCallback()
選項,用於重試內部傳送和接收操作。
輸出閘道
輸出閘道從 Spring Integration 訊息建立 JMS 訊息,並將其傳送到 request-destination
。然後,它會處理 JMS 回覆訊息,方法是使用選取器從您設定的 reply-destination
接收,或者,如果未提供 reply-destination
,則建立 JMS TemporaryQueue
(如果 replyPubSubDomain= true
則為 TemporaryTopic
) 實例。
搭配 如果您指定了回覆目的地,建議您不要使用快取的消費者。或者,考慮使用 |
以下範例顯示如何配置輸出閘道
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies"/>
'outbound-gateway' 有效負載提取屬性與 'inbound-gateway' 的屬性成反比 (請參閱先前的討論)。 這表示 'extract-request-payload' 屬性值適用於正在轉換為 JMS 訊息的 Spring Integration 訊息,以便作為請求發送。 'extract-reply-payload' 屬性值適用於接收為回覆的 JMS 訊息,然後將其轉換為 Spring Integration 訊息,隨後發送到 'reply-channel',如先前的配置範例所示。
使用 <reply-listener/>
Spring Integration 2.2 引入了一種處理回覆的替代技術。如果您將 <reply-listener/>
子元素添加到閘道,而不是為每個回覆建立一個消費者,則會使用 MessageListener
容器來接收回覆,並將它們交給請求執行緒。這提供了許多效能優勢,並緩解了先前的注意事項中描述的快取消費者記憶體使用問題。
當將 <reply-listener/>
與沒有 reply-destination
的輸出閘道一起使用時,不會為每個請求建立 TemporaryQueue
,而是使用單個 TemporaryQueue
。(如果與 Broker 的連線中斷並恢復,閘道會根據需要建立額外的 TemporaryQueue
)。 如果將 replyPubSubDomain
設定為 true
,從 6.0 版本開始,將會改為建立 TemporaryTopic
。
當使用 correlation-key
時,多個閘道可以共享相同的回覆目的地,因為監聽器容器使用每個閘道唯一的選擇器。
如果您指定了回覆監聽器並指定了回覆目的地 (或回覆目的地名稱),但未提供關聯鍵,則閘道會記錄警告並回復為 2.2 版本之前的行為。這是因為在這種情況下無法配置選擇器。因此,無法避免回覆發送到可能配置了相同回覆目的地的不同閘道。 請注意,在這種情況下,每個請求都會使用一個新的消費者,並且消費者可能會像上述注意事項中描述的那樣在記憶體中累積;因此在這種情況下不應使用快取的消費者。 |
以下範例顯示具有預設屬性的回覆監聽器
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies">
<int-jms:reply-listener />
</int-jms-outbound-gateway>
監聽器非常輕量,我們預期在大多數情況下,您只需要單個消費者。但是,您可以添加諸如 concurrent-consumers
、max-concurrent-consumers
等屬性。 有關完整支援屬性列表及其含義,請參閱綱要以及 Spring JMS 文件。
閒置回覆監聽器
從 4.2 版本開始,您可以根據需要啟動回覆監聽器 (並在閒置一段時間後停止它),而不是在閘道的生命週期內持續執行。如果您在應用程式上下文中有多個閘道,但它們大多處於閒置狀態,這可能會很有用。 一種情況是上下文中有許多 (非活動) 分區的 Spring Batch 作業,它們使用 Spring Integration 和 JMS 進行分區分發。 如果所有回覆監聽器都處於活動狀態,則 JMS Broker 會為每個閘道都有一個活動的消費者。透過啟用閒置逾時,每個消費者僅在對應的批次作業執行時 (以及在其完成後的一小段時間內) 存在。
請參閱 屬性參考中的 idle-reply-listener-timeout
。
閘道回覆關聯
本節描述用於回覆關聯的機制 (確保原始閘道僅接收對其請求的回覆),具體取決於閘道的配置方式。 有關此處討論的屬性的完整描述,請參閱 屬性參考。
以下列表描述了各種情境 (數字僅用於識別 — 順序無關緊要)
-
沒有
reply-destination*
屬性且沒有<reply-listener>
為每個請求建立一個
TemporaryQueue
,並在請求完成時 (無論成功與否) 刪除。correlation-key
無關緊要。 -
提供了
reply-destination*
屬性,但既未提供<reply-listener/>
也未提供correlation-key
等於傳出訊息的
JMSCorrelationID
用作消費者的訊息選擇器messageSelector = "JMSCorrelationID = '" + messageId + "'"
回應系統預期在回覆
JMSCorrelationID
中傳回傳入的JMSMessageID
。 這是一種常見的模式,Spring Integration 輸入閘道以及 Spring 的MessageListenerAdapter
for message-driven POJO 都實作了這種模式。當您使用此配置時,不應使用主題來進行回覆。 回覆可能會遺失。 -
提供了
reply-destination*
屬性,未提供<reply-listener/>
,且correlation-key="JMSCorrelationID"
閘道產生唯一的關聯 ID 並將其插入
JMSCorrelationID
標頭中。 訊息選擇器為messageSelector = "JMSCorrelationID = '" + uniqueId + "'"
回應系統預期在回覆
JMSCorrelationID
中傳回傳入的JMSCorrelationID
。 這是一種常見的模式,Spring Integration 輸入閘道以及 Spring 的MessageListenerAdapter
for message-driven POJO 都實作了這種模式。 -
提供了
reply-destination*
屬性,未提供<reply-listener/>
,且correlation-key="myCorrelationHeader"
閘道產生唯一的關聯 ID 並將其插入
myCorrelationHeader
訊息屬性中。correlation-key
可以是任何使用者定義的值。 訊息選擇器為messageSelector = "myCorrelationHeader = '" + uniqueId + "'"
回應系統預期在回覆
myCorrelationHeader
中傳回傳入的myCorrelationHeader
。 -
提供了
reply-destination*
屬性,未提供<reply-listener/>
,且correlation-key="JMSCorrelationID*"
(請注意關聯鍵中的*
。)閘道使用請求訊息中
jms_correlationId
標頭中的值 (如果存在),並將其插入JMSCorrelationID
標頭中。 訊息選擇器為messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"
使用者必須確保此值是唯一的。
如果標頭不存在,則閘道的行為與
3
相同。回應系統預期在回覆
JMSCorrelationID
中傳回傳入的JMSCorrelationID
。 這是一種常見的模式,Spring Integration 輸入閘道以及 Spring 的MessageListenerAdapter
for message-driven POJO 都實作了這種模式。 -
未提供
reply-destination*
屬性,並提供了<reply-listener>
建立一個臨時佇列,並用於來自此閘道實例的所有回覆。 訊息中不需要關聯資料,但傳出的
JMSMessageID
會在閘道內部使用,以將回覆導向正確的請求執行緒。 -
提供了
reply-destination*
屬性,提供了<reply-listener>
,且未提供correlation-key
不允許。
<reply-listener/>
配置被忽略,閘道的行為與2
相同。 會寫入警告日誌訊息以指示這種情況。 -
提供了
reply-destination*
屬性,提供了<reply-listener>
,且correlation-key="JMSCorrelationID"
閘道具有唯一的關聯 ID,並將其與遞增值一起插入
JMSCorrelationID
標頭中 (gatewayId + "_" + ++seq
)。 訊息選擇器為messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"
回應系統預期在回覆
JMSCorrelationID
中傳回傳入的JMSCorrelationID
。 這是一種常見的模式,Spring Integration 輸入閘道以及 Spring 的MessageListenerAdapter
for message-driven POJO 都實作了這種模式。 由於每個閘道都有唯一的 ID,因此每個實例僅取得其自己的回覆。 完整的關聯資料用於將回覆路由到正確的請求執行緒。 -
提供了
reply-destination*
屬性,提供了<reply-listener/>
,且correlation-key="myCorrelationHeader"
閘道具有唯一的關聯 ID,並將其與遞增值一起插入
myCorrelationHeader
屬性中 (gatewayId + "_" + ++seq
)。correlation-key
可以是任何使用者定義的值。 訊息選擇器為messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"
回應系統預期在回覆
myCorrelationHeader
中傳回傳入的myCorrelationHeader
。 由於每個閘道都有唯一的 ID,因此每個實例僅取得其自己的回覆。 完整的關聯資料用於將回覆路由到正確的請求執行緒。 -
提供了
reply-destination*
屬性,提供了<reply-listener/>
,且correlation-key="JMSCorrelationID*"
(請注意關聯鍵中的
*
)不允許。
具有回覆監聽器時,不允許使用者提供的關聯 ID。 閘道不會以此配置初始化。
非同步閘道
從 4.3 版本開始,您現在可以在配置輸出閘道時指定 async="true"
(或在 Java 中使用 setAsync(true)
)。
預設情況下,當請求發送到閘道時,請求執行緒會暫停,直到收到回覆。 然後流程會在該執行緒上繼續。 如果 async
為 true
,則請求執行緒會在 send()
完成後立即釋放,並且回覆會在監聽器容器執行緒上傳回 (並且流程繼續)。 當在輪詢器執行緒上調用閘道時,這可能會很有用。 執行緒被釋放,並且可用於框架內的其他任務。
async
需要 <reply-listener/>
(或在使用 Java 配置時使用 setUseReplyContainer(true)
)。 它還需要指定 correlationKey
(通常為 JMSCorrelationID
)。 如果不滿足這些條件中的任何一個,則會忽略 async
。
屬性參考
以下列表顯示了 outbound-gateway
的所有可用屬性
<int-jms:outbound-gateway
connection-factory="connectionFactory" (1)
correlation-key="" (2)
delivery-persistent="" (3)
destination-resolver="" (4)
explicit-qos-enabled="" (5)
extract-reply-payload="true" (6)
extract-request-payload="true" (7)
header-mapper="" (8)
message-converter="" (9)
priority="" (10)
receive-timeout="" (11)
reply-channel="" (12)
reply-destination="" (13)
reply-destination-expression="" (14)
reply-destination-name="" (15)
reply-pub-sub-domain="" (16)
reply-timeout="" (17)
request-channel="" (18)
request-destination="" (19)
request-destination-expression="" (20)
request-destination-name="" (21)
request-pub-sub-domain="" (22)
time-to-live="" (23)
requires-reply="" (24)
idle-reply-listener-timeout="" (25)
async=""> (26)
<int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 | jakarta.jms.ConnectionFactory 的參考。 預設為 jmsConnectionFactory 。 |
||
2 | 包含關聯資料的屬性名稱,用於將回應與回覆相關聯。 如果省略,閘道會期望回應系統在 JMSCorrelationID 標頭中傳回傳出的 JMSMessageID 標頭的值。 如果指定,閘道會產生關聯 ID 並使用它填充指定的屬性。 回應系統必須在相同的屬性中回傳該值。 它可以設定為 JMSCorrelationID ,在這種情況下,將使用標準標頭而不是 String 屬性來保存關聯資料。 當您使用 <reply-container/> 時,如果您提供明確的 reply-destination ,則必須指定 correlation-key 。 從 4.0.1 版本開始,此屬性也支援值 JMSCorrelationID* ,這表示如果傳出的訊息已經具有 JMSCorrelationID (從 jms_correlationId 映射而來) 標頭,則會使用它而不是產生新的標頭。 請注意,當您使用 <reply-container/> 時,不允許使用 JMSCorrelationID* 鍵,因為容器需要在初始化期間設定訊息選擇器。
|
||
3 | 一個布林值,指示傳遞模式應為 DeliveryMode.PERSISTENT (true ) 還是 DeliveryMode.NON_PERSISTENT (false )。 此設定僅在 explicit-qos-enabled 為 true 時生效。 |
||
4 | DestinationResolver 。 預設值為 DynamicDestinationResolver ,它將目的地名稱映射到該名稱的佇列或主題。 |
||
5 | 當設定為 true 時,它會啟用服務品質屬性的使用:priority 、delivery-mode 和 time-to-live 。 |
||
6 | 當設定為 true (預設值) 時,Spring Integration 回覆訊息的有效負載是從 JMS 回覆訊息的本文 (透過使用 MessageConverter ) 建立的。 當設定為 false 時,整個 JMS 訊息將成為 Spring Integration 訊息的有效負載。 |
||
7 | 當設定為 true (預設值) 時,Spring Integration 訊息的有效負載會轉換為 JMSMessage (透過使用 MessageConverter )。 當設定為 false 時,整個 Spring Integration 訊息會轉換為 JMSMessage 。 在這兩種情況下,Spring Integration 訊息標頭都會使用 HeaderMapper 映射到 JMS 標頭和屬性。 |
||
8 | 用於將 Spring Integration 訊息標頭映射到 JMS 訊息標頭和屬性以及從 JMS 訊息標頭和屬性映射而來的 HeaderMapper 。 |
||
9 | MessageConverter 的參考,用於在 JMS 訊息與 Spring Integration 訊息有效負載 (或訊息,如果 extract-request-payload 為 false ) 之間進行轉換。 預設值為 SimpleMessageConverter 。 |
||
10 | 請求訊息的預設優先順序。 如果存在,則會被訊息優先順序標頭覆寫。 其範圍為 0 到 9 。 此設定僅在 explicit-qos-enabled 為 true 時生效。 |
||
11 | 等待回覆的時間 (以毫秒為單位)。 預設值為 5000 (五秒)。 |
||
12 | 回覆訊息傳送到的通道。 | ||
13 | Destination 的參考,它設定為 JMSReplyTo 標頭。 最多只允許使用 reply-destination 、reply-destination-expression 或 reply-destination-name 中的一個。 如果未提供任何一個,則會使用 TemporaryQueue 作為此閘道的回覆。 |
||
14 | 評估為 Destination 的 SpEL 表達式,它將設定為 JMSReplyTo 標頭。 表達式可以產生 Destination 物件或 String 。 它由 DestinationResolver 用於解析實際的 Destination 。 最多只允許使用 reply-destination 、reply-destination-expression 或 reply-destination-name 中的一個。 如果未提供任何一個,則會使用 TemporaryQueue 作為此閘道的回覆。 |
||
15 | 設定為 JMSReplyTo 標頭的目的地名稱。 它由 DestinationResolver 用於解析實際的 Destination 。 最多只允許使用 reply-destination 、reply-destination-expression 或 reply-destination-name 中的一個。 如果未提供任何一個,則會使用 TemporaryQueue 作為此閘道的回覆。 |
||
16 | 當設定為 true 時,表示由 DestinationResolver 解析的任何回覆 Destination 都應為 Topic 而不是 Queue 。 |
||
17 | 閘道將回覆訊息傳送到 reply-channel 時等待的時間。 這僅在 reply-channel 可以阻塞時才有效 — 例如,容量受限且目前已滿的 QueueChannel 。 預設值為無限大。 |
||
18 | 此閘道接收請求訊息的通道。 | ||
19 | 請求訊息傳送到的 Destination 的參考。 reply-destination 、reply-destination-expression 或 reply-destination-name 之一是必要的。 您只能使用這三個屬性中的一個。 |
||
20 | 評估為請求訊息傳送到的 Destination 的 SpEL 表達式。 表達式可以產生 Destination 物件或 String 。 它由 DestinationResolver 用於解析實際的 Destination 。 reply-destination 、reply-destination-expression 或 reply-destination-name 之一是必要的。 您只能使用這三個屬性中的一個。 |
||
21 | 請求訊息傳送到的目的地名稱。 它由 DestinationResolver 用於解析實際的 Destination 。 reply-destination 、reply-destination-expression 或 reply-destination-name 之一是必要的。 您只能使用這三個屬性中的一個。 |
||
22 | 當設定為 true 時,表示由 DestinationResolver 解析的任何請求 Destination 都應為 Topic 而不是 Queue 。 |
||
23 | 指定訊息存活時間。 此設定僅在 explicit-qos-enabled 為 true 時生效。 |
||
24 | 指定此輸出閘道是否必須傳回非空值。 預設情況下,此值為 true ,並且當底層服務在 receive-timeout 後未傳回值時,會擲回 MessageTimeoutException 。 請注意,如果永遠不希望服務傳回回覆,則最好使用 <int-jms:outbound-channel-adapter/> 而不是 <int-jms:outbound-gateway/> 並將 requires-reply="false" 。 對於後者,傳送執行緒會被阻塞,等待 receive-timeout 期間的回覆。 |
||
25 | 當您使用 <reply-listener /> 時,依預設,其生命週期 (啟動和停止) 與閘道的生命週期相符。 當此值大於 0 時,容器會按需啟動 (當傳送請求時)。 容器會繼續執行,直到至少經過此時間且沒有收到任何請求 (並且直到沒有未完成的回覆)。 容器會在下一個請求時再次啟動。 停止時間是最小值,實際上可能高達此值的 1.5 倍。 |
||
26 | 請參閱 非同步閘道。 | ||
27 | 當包含此元素時,回覆會由非同步 MessageListenerContainer 接收,而不是為每個回覆建立一個消費者。 在許多情況下,這可能會更有效率。 |
將訊息標頭映射到 JMS 訊息以及從 JMS 訊息映射而來
JMS 訊息可以包含元資訊,例如 JMS API 標頭和簡單屬性。 您可以使用 JmsHeaderMapper
將這些資訊映射到 Spring Integration 訊息標頭以及從 Spring Integration 訊息標頭映射而來。 JMS API 標頭會傳遞到適當的 setter 方法 (例如 setJMSReplyTo
),而其他標頭則會複製到 JMS 訊息的一般屬性。 JMS 輸出閘道會使用 JmsHeaderMapper
的預設實作來啟動,這將映射標準 JMS API 標頭以及原始類型或 String
訊息標頭。 您也可以使用輸入和輸出閘道的 header-mapper
屬性來提供自訂標頭映射器。
許多 JMS 供應商特定的用戶端不允許直接在已建立的 JMS 訊息上設定 deliveryMode 、priority 和 timeToLive 屬性。 它們被視為 QoS 屬性,因此必須傳播到目標 MessageProducer.send(message, deliveryMode, priority, timeToLive) API。 因此,DefaultJmsHeaderMapper 不會將適當的 Spring Integration 標頭 (或表達式結果) 映射到提及的 JMS 訊息屬性中。 相反,JmsSendingMessageHandler 會使用 DynamicJmsTemplate 將標頭值從請求訊息傳播到 MessageProducer.send() API。 若要啟用此功能,您必須使用 DynamicJmsTemplate 配置輸出端點,並將其 explicitQosEnabled 屬性設定為 true。 Spring Integration Java DSL 依預設配置 DynamicJmsTemplate ,但您仍然必須設定 explicitQosEnabled 屬性。 |
從 4.0 版本開始,JMSPriority 標頭會映射到輸入訊息的標準 priority 標頭。 以前,priority 標頭僅用於輸出訊息。 若要恢復到先前的行為 (即不映射輸入優先順序),請將 DefaultJmsHeaderMapper 的 mapInboundPriority 屬性設定為 false 。 |
從 4.3 版本開始,DefaultJmsHeaderMapper 會透過調用其 toString() 方法將標準 correlationId 標頭映射為訊息屬性 (correlationId 通常是 UUID ,JMS 不支援 UUID)。 在輸入端,它會映射為 String 。 這與 jms_correlationId 標頭無關,後者會映射到 JMSCorrelationID 標頭以及從 JMSCorrelationID 標頭映射而來。 JMSCorrelationID 通常用於關聯請求和回覆,而 correlationId 通常用於將相關訊息組合到一個群組中 (例如使用聚合器或重新排序器)。 |
從 5.1 版本開始,可以配置 DefaultJmsHeaderMapper
以映射輸入 JMSDeliveryMode
和 JMSExpiration
屬性
@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
mapper.setMapInboundDeliveryMode(true)
mapper.setMapInboundExpiration(true)
return mapper;
}
這些 JMS 屬性分別映射到 JmsHeaders.DELIVERY_MODE
和 JmsHeaders.EXPIRATION
Spring 訊息標頭。
訊息轉換、編組和解組
如果您需要轉換訊息,所有 JMS 配接器和閘道都允許您透過設定 message-converter
屬性來提供 MessageConverter
。 為此,請提供可在同一 ApplicationContext 中使用的 MessageConverter
實例的 Bean 名稱。 此外,為了與編組器和解組器介面保持一致性,Spring 提供了 MarshallingMessageConverter
,您可以使用自己的自訂編組器和解組器來配置它。 以下範例顯示如何執行此操作
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="inbound-gateway-channel"
message-converter="marshallingMessageConverter"/>
<bean id="marshallingMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<constructor-arg>
<bean class="org.bar.SampleMarshaller"/>
</constructor-arg>
<constructor-arg>
<bean class="org.bar.SampleUnmarshaller"/>
</constructor-arg>
</bean>
當您提供自己的 MessageConverter 實例時,它仍然會封裝在 HeaderMappingMessageConverter 中。 這表示 'extract-request-payload' 和 'extract-reply-payload' 屬性可能會影響傳遞到轉換器的實際物件。 HeaderMappingMessageConverter 本身會委派給目標 MessageConverter ,同時也將 Spring Integration MessageHeaders 映射到 JMS 訊息屬性,然後再映射回來。 |
JMS 支援的訊息通道
先前介紹的通道配接器和閘道都適用於與其他外部系統整合的應用程式。 輸入選項假設其他系統正在向 JMS 目的地傳送 JMS 訊息,而輸出選項假設其他系統正在從目的地接收訊息。 其他系統可能是也可能不是 Spring Integration 應用程式。 當然,當將 Spring Integration 訊息實例作為 JMS 訊息本身的本文傳送時 (extract-payload
值設定為 false
),會假設其他系統是基於 Spring Integration 的。 但是,這絕不是必要的。 這種彈性是使用基於訊息的整合選項以及 “通道” (或 JMS 情況下的目的地) 抽象化的好處之一。
有時,給定 JMS 目的地的生產者和消費者都旨在成為同一應用程式的一部分,並在同一進程中執行。 您可以使用一對輸入和輸出通道配接器來完成此操作。 這種方法的問題在於,即使從概念上講,目標是擁有單個訊息通道,您也需要兩個配接器。 Spring Integration 2.0 版本開始支援更好的選項。 現在,在使用 JMS 命名空間時,可以定義單個 “通道”,如以下範例所示
<int-jms:channel id="jmsChannel" queue="exampleQueue"/>
先前範例中的通道行為與主要 Spring Integration 命名空間中的普通 <channel/>
元素非常相似。 它可以被任何端點的 input-channel
和 output-channel
屬性引用。 不同之處在於,此通道由名為 exampleQueue
的 JMS 佇列實例支援。 這表示在生產和消費端點之間可以實現非同步訊息傳遞。 但是,與透過在非 JMS <channel/>
元素中添加 <queue/>
元素建立的更簡單的非同步訊息通道不同,訊息不會儲存在記憶體佇列中。 相反,這些訊息在 JMS 訊息本文中傳遞,並且底層 JMS 提供者的全部功能可用於該通道。 使用此替代方案最常見的基本原理可能是利用 JMS 訊息傳遞的儲存轉發方法提供的持久性。
如果配置正確,JMS 支援的訊息通道也支援事務。 換句話說,如果生產者的傳送操作是回滾事務的一部分,則生產者實際上不會寫入事務性 JMS 支援的通道。 同樣,如果接收訊息是回滾事務的一部分,則消費者不會從通道中實際移除 JMS 訊息。 請注意,在這種情況下,生產者和消費者事務是分開的。 這與在沒有 <queue/>
子元素的簡單同步 <channel/>
元素中傳播事務上下文有很大不同。
由於先前的範例引用了 JMS 佇列實例,因此它充當點對點通道。 另一方面,如果您需要發布-訂閱行為,則可以使用單獨的元素並引用 JMS 主題。 以下範例顯示如何執行此操作
<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>
對於任何類型的 JMS 支援的通道,可以提供目的地名稱而不是參考,如以下範例所示
<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>
<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>
在先前的範例中,目的地名稱由 Spring 的預設 DynamicDestinationResolver
實作解析,但您可以提供 DestinationResolver
介面的任何實作。 此外,JMS ConnectionFactory
是通道的必要屬性,但依預設,預期的 Bean 名稱將為 jmsConnectionFactory
。 以下範例提供了用於解析 JMS 目的地名稱的自訂實例,以及用於 ConnectionFactory
的不同名稱
<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
destination-resolver="customDestinationResolver"
connection-factory="customConnectionFactory"/>
對於 <publish-subscribe-channel />
,將 durable
屬性設定為 true
以進行持久訂閱,或將 subscription-shared
設定為共享訂閱 (需要 JMS 2.0 Broker,並且自 4.2 版本以來可用)。 使用 subscription
來命名訂閱。
使用 JMS 訊息選擇器
透過 JMS 訊息選擇器,您可以根據 JMS 標頭以及 JMS 屬性篩選 JMS 訊息。 例如,如果您想監聽自訂 JMS 標頭屬性 myHeaderProperty
等於 something
的訊息,則可以指定以下表達式
myHeaderProperty = 'something'
訊息選擇器表達式是 SQL-92 條件表達式語法的子集,並在 Java 訊息服務 規範中定義。 您可以使用 XML 命名空間配置為以下 Spring Integration JMS 組件指定 JMS 訊息 selector
屬性
-
JMS 通道
-
JMS 發布訂閱通道
-
JMS 輸入通道配接器
-
JMS 輸入閘道
-
JMS 訊息驅動通道配接器
您無法使用 JMS 訊息選擇器引用訊息本文值。 |
JMS 範例
若要試用這些 JMS 配接器,請查看 Spring Integration 範例 Git 儲存庫中提供的 JMS 範例,網址為 https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms。
該儲存庫包含兩個範例。 一個提供輸入和輸出通道配接器,另一個提供輸入和輸出閘道。 它們配置為使用嵌入式 ActiveMQ 程序執行,但您可以修改每個範例的 common.xml Spring 應用程式上下文檔案,以支援不同的 JMS 提供者或獨立的 ActiveMQ 程序。
換句話說,您可以拆分配置,以便輸入和輸出配接器在單獨的 JVM 中執行。 如果您已安裝 ActiveMQ,請修改 common.xml
檔案中的 brokerURL
屬性以使用 tcp://127.0.0.1:61616
(而不是 vm://127.0.0.1
)。 這兩個範例都接受來自 stdin 的輸入並回顯到 stdout。 查看配置以了解這些訊息是如何透過 JMS 路由的。