配置訊息通道
若要建立訊息通道實例,您可以使用 xml 的 <channel/>
元素或 Java 配置的 DirectChannel
實例,如下所示
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
當您使用不含任何子元素的 <channel/>
元素時,它會建立 DirectChannel
實例 (SubscribableChannel
)。
若要建立發布-訂閱通道,請使用 <publish-subscribe-channel/>
元素 (Java 中的 PublishSubscribeChannel
),如下所示
-
Java
-
XML
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
或者,您可以提供各種 <queue/>
子元素來建立任何可輪詢的通道類型 (如訊息通道實作中所述)。以下章節顯示每種通道類型的範例。
DirectChannel
配置
如先前所述,DirectChannel
是預設類型。以下清單顯示如何定義一個
-
Java
-
XML
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
預設通道具有循環配置資源負載平衡器,並且也啟用容錯移轉 (請參閱 DirectChannel
以取得更多詳細資訊)。若要停用其中一個或兩個,請新增 <dispatcher/>
子元素 (DirectChannel
的 LoadBalancingStrategy
建構子) 並配置屬性,如下所示
-
Java
-
XML
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
從 6.3 版開始,所有基於 UnicastingDispatcher
的 MessageChannel
實作都可以使用 Predicate<Exception> failoverStrategy
而不是純粹的 failover
選項進行配置。此述詞會根據從目前處理器擲回的例外狀況,判斷是否容錯移轉至下一個 MessageHandler
。更複雜的錯誤分析應使用ErrorMessageExceptionTypeRouter
完成。
資料類型通道配置
有時,消費者只能處理特定類型的 Payload,因此您必須確保輸入訊息的 Payload 類型。首先想到的可能是使用訊息篩選器。但是,訊息篩選器所能做的就是篩選掉不符合消費者需求的訊息。另一種方法是使用基於內容的路由器,並將具有不相容資料類型的訊息路由到特定的轉換器,以強制轉換和轉換為所需的資料類型。這會有效,但完成相同事情的更簡單方法是應用資料類型通道模式。您可以為每個特定的 Payload 資料類型使用單獨的資料類型通道。
若要建立僅接受包含特定 Payload 類型訊息的資料類型通道,請在通道元素的 datatype
屬性中提供資料類型的完整類別名稱,如下列範例所示
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
請注意,類型檢查適用於可指派給通道資料類型的任何類型。換句話說,前述範例中的 numberChannel
將接受 Payload 為 java.lang.Integer
或 java.lang.Double
的訊息。可以以逗號分隔的清單形式提供多種類型,如下列範例所示
-
Java
-
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
因此,前述範例中的 'numberChannel' 僅接受資料類型為 java.lang.Number
的訊息。但是,如果訊息的 Payload 不是所需的類型,會發生什麼事?這取決於您是否已定義名為 integrationConversionService
的 Bean,該 Bean 是 Spring 的轉換服務的實例。如果沒有,則會立即擲回 Exception
。但是,如果您已定義 integrationConversionService
Bean,則會使用它來嘗試將訊息的 Payload 轉換為可接受的類型。
您甚至可以註冊自訂轉換器。例如,假設您將 Payload 為 String
的訊息傳送到我們上面配置的 'numberChannel'。您可以如下處理訊息
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常,這將是一個完全合法的操作。但是,由於我們使用資料類型通道,因此此類操作的結果將產生類似以下的例外狀況
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
發生例外狀況的原因是我們要求 Payload 類型為 Number
,但我們傳送了 String
。因此,我們需要將 String
轉換為 Number
的東西。為此,我們可以實作類似以下範例的轉換器
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
然後,我們可以將其註冊為 Integration Conversion Service 的轉換器,如下列範例所示
-
Java
-
XML
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
或者,在標記有 @Component
註解以進行自動掃描的 StringToIntegerConverter
類別上。
當剖析 'converter' 元素時,如果尚未定義,它會建立 integrationConversionService
Bean。有了該轉換器,send
操作現在將會成功,因為資料類型通道會使用該轉換器將 String
Payload 轉換為 Integer
。
有關 Payload 類型轉換的更多資訊,請參閱Payload 類型轉換。
從 4.0 版開始,integrationConversionService
由 DefaultDatatypeChannelMessageConverter
叫用,後者會在應用程式內容中查閱轉換服務。若要使用不同的轉換技術,您可以在通道上指定 message-converter
屬性。這必須是對 MessageConverter
實作的參考。僅使用 fromMessage
方法。它為轉換器提供存取訊息標頭的權限 (以防轉換可能需要來自標頭的資訊,例如 content-type
)。該方法只能傳回轉換後的 Payload 或完整的 Message
物件。如果是後者,轉換器必須小心複製輸入訊息中的所有標頭。
或者,您可以宣告類型為 MessageConverter
且 ID 為 datatypeChannelMessageConverter
的 <bean/>
,並且該轉換器將由所有具有 datatype
的通道使用。
QueueChannel
配置
若要建立 QueueChannel
,請使用 <queue/>
子元素。您可以如下指定通道的容量
-
Java
-
XML
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
如果您未在此 <queue/> 子元素上為 'capacity' 屬性提供值,則產生的佇列是無邊界的。為避免記憶體不足等問題,我們強烈建議您為有界佇列設定明確的值。 |
持久性 QueueChannel
配置
由於 QueueChannel
提供緩衝訊息的功能,但預設情況下僅在記憶體中執行此操作,因此也引入了在系統故障時訊息可能會遺失的可能性。為了減輕此風險,QueueChannel
可以由 MessageGroupStore
策略介面的持久性實作支援。有關 MessageGroupStore
和 MessageStore
的更多詳細資訊,請參閱訊息儲存區。
當使用 message-store 屬性時,不允許使用 capacity 屬性。 |
當 QueueChannel
接收到 Message
時,它會將訊息新增至訊息儲存區。當從 QueueChannel
輪詢 Message
時,會從訊息儲存區中移除該訊息。
預設情況下,QueueChannel
將其訊息儲存在記憶體內佇列中,這可能會導致前面提到的訊息遺失情況。但是,Spring Integration 提供持久性儲存區,例如 JdbcChannelMessageStore
。
您可以透過新增 message-store
屬性來為任何 QueueChannel
配置訊息儲存區,如下列範例所示
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(請參閱以下 Java/Kotlin 配置選項的範例。)
Spring Integration JDBC 模組還為許多流行的資料庫提供結構描述資料定義語言 (DDL)。這些結構描述位於該模組 (spring-integration-jdbc
) 的 org.springframework.integration.jdbc.store.channel 套件中。
一個重要的功能是,使用任何交易式持久性儲存區 (例如 JdbcChannelMessageStore ),只要輪詢器配置了交易,從儲存區移除的訊息只能在交易成功完成時才能永久移除。否則,交易會回滾,並且 Message 不會遺失。 |
隨著與「NoSQL」資料儲存區相關的 Spring 專案數量不斷增加,以提供對這些儲存區的底層支援,許多其他訊息儲存區的實作也隨之出現。如果您找不到符合您特定需求的實作,您也可以提供您自己的 MessageGroupStore
介面實作。
自 4.0 版起,我們建議將 QueueChannel
實例配置為盡可能使用 ChannelMessageStore
。與一般訊息儲存區相比,這些通常針對此用途進行了最佳化。如果 ChannelMessageStore
是 ChannelPriorityMessageStore
,則訊息會以 FIFO 方式在優先順序內接收。優先順序的概念由訊息儲存區實作決定。例如,以下範例顯示了MongoDB 通道訊息儲存區的 Java 配置
-
Java
-
Java DSL
-
Kotlin DSL
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
請注意 MessageGroupQueue 類別。這是一個 BlockingQueue 實作,用於使用 MessageGroupStore 操作。 |
自訂 QueueChannel
環境的另一個選項是由 <int:queue>
子元素或其特定建構子的 ref
屬性提供。此屬性提供對任何 java.util.Queue
實作的參考。例如,Hazelcast 分散式IQueue
可以配置如下
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel
配置
若要建立 PublishSubscribeChannel
,請使用 <publish-subscribe-channel/> 元素。當使用此元素時,您還可以指定用於發布訊息的 task-executor
(如果未指定,則在傳送者的執行緒中發布),如下所示
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
如果您在 PublishSubscribeChannel
的下游提供重排序器或聚合器,則可以在通道上將 'apply-sequence' 屬性設定為 true
。這樣做表示通道應在傳遞訊息之前設定 sequence-size
和 sequence-number
訊息標頭以及關聯 ID。例如,如果有五個訂閱者,則 sequence-size
將設定為 5
,並且訊息將具有 sequence-number
標頭值,範圍從 1
到 5
。
除了 Executor
之外,您還可以配置 ErrorHandler
。預設情況下,PublishSubscribeChannel
使用 MessagePublishingErrorHandler
實作將錯誤傳送到來自 errorChannel
標頭或全域 errorChannel
實例的 MessageChannel
。如果未配置 Executor
,則會忽略 ErrorHandler
,並且例外狀況會直接擲回給呼叫者的執行緒。
如果您在 PublishSubscribeChannel
的下游提供 Resequencer
或 Aggregator
,則可以在通道上將 'apply-sequence' 屬性設定為 true
。這樣做表示通道應在傳遞訊息之前設定 sequence-size 和 sequence-number 訊息標頭以及關聯 ID。例如,如果有五個訂閱者,則 sequence-size 將設定為 5
,並且訊息將具有 sequence-number 標頭值,範圍從 1
到 5
。
以下範例顯示如何將 apply-sequence
標頭設定為 true
-
Java
-
XML
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
apply-sequence 值預設為 false ,以便發布-訂閱通道可以將完全相同的訊息實例傳送到多個輸出通道。由於 Spring Integration 強制 Payload 和標頭參考的不可變性,因此當標誌設定為 true 時,通道會建立具有相同 Payload 參考但不同標頭值的新 Message 實例。 |
從 5.4.3 版開始,PublishSubscribeChannel
也可以使用其 BroadcastingDispatcher
的 requireSubscribers
選項進行配置,以指示當通道沒有訂閱者時,不會靜默忽略訊息。當沒有訂閱者並且此選項設定為 true
時,會擲回包含 Dispatcher has no subscribers
訊息的 MessageDispatchingException
。
ExecutorChannel
若要建立 ExecutorChannel
,請新增具有 task-executor
屬性的 <dispatcher>
子元素。屬性的值可以參考內容中的任何 TaskExecutor
。例如,這樣做可以為將訊息分派到訂閱的處理器配置執行緒池。如先前所述,這樣做會中斷傳送者和接收者之間的單執行緒執行內容,以便處理器的叫用不會共用任何活動交易內容 (也就是說,處理器可能會擲回 Exception
,但 send
叫用已成功傳回)。以下範例顯示如何使用 dispatcher
元素並在 task-executor
屬性中指定執行器
-
Java
-
XML
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
PriorityChannel
配置
若要建立 PriorityChannel
,請使用 <priority-queue/>
子元素,如下列範例所示
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
預設情況下,通道會查詢訊息的 priority
標頭。但是,您可以改為提供自訂 Comparator
參考。此外,請注意 PriorityChannel
(與其他類型一樣) 確實支援 datatype
屬性。與 QueueChannel
一樣,它也支援 capacity
屬性。以下範例示範了所有這些
-
Java
-
XML
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
自 4.0 版起,priority-channel
子元素支援 message-store
選項 (在這種情況下不允許使用 comparator
和 capacity
)。訊息儲存區必須是 PriorityCapableChannelMessageStore
。目前為 Redis
、JDBC
和 MongoDB
提供了 PriorityCapableChannelMessageStore
的實作。有關更多資訊,請參閱QueueChannel
配置和訊息儲存區。您可以在支援訊息通道中找到範例配置。
RendezvousChannel
配置
當佇列子元素為 <rendezvous-queue>
時,會建立 RendezvousChannel
。它不提供任何其他配置選項,其佇列不接受任何容量值,因為它是零容量直接交接佇列。以下範例顯示如何宣告 RendezvousChannel
-
Java
-
XML
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
通道攔截器配置
訊息通道也可以具有攔截器,如通道攔截器中所述。<interceptors/>
子元素可以新增至 <channel/>
(或更特定的元素類型)。您可以提供 ref
屬性來參考任何實作 ChannelInterceptor
介面的 Spring 管理物件,如下列範例所示
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
一般而言,我們建議在單獨的位置定義攔截器實作,因為它們通常提供可以在多個通道之間重複使用的通用行為。
全域通道攔截器配置
通道攔截器提供了一種乾淨簡潔的方式,可以針對每個單獨的通道應用跨領域行為。如果相同的行為應應用於多個通道,則為每個通道配置相同的攔截器集並不是最有效的方式。為了避免重複配置,同時也允許攔截器應用於多個通道,Spring Integration 提供了全域攔截器。請考慮以下一對範例
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
每個 <channel-interceptor/>
元素都可讓您定義全域攔截器,該攔截器會應用於符合 pattern
屬性定義的任何模式的所有通道。在前述情況下,全域攔截器會應用於 'thing1' 通道以及所有其他以 'thing2' 或 'input' 開頭的通道,但不適用於以 'thing3' 開頭的通道 (自 5.0 版起)。
將此語法新增至模式會導致一個可能的 (但可能不太可能的) 問題。如果您有一個名為 !thing1 的 Bean,並且您在通道攔截器的 pattern 模式中包含 !thing1 模式,則它不再匹配。該模式現在匹配所有未命名為 thing1 的 Bean。在這種情況下,您可以使用 \ 跳脫模式中的 ! 。模式 \!thing1 匹配名為 !thing1 的 Bean。 |
當給定通道上有多個攔截器時,order
屬性可讓您管理此攔截器的注入位置。例如,通道 'inputChannel' 可以具有在本機配置的個別攔截器 (請參閱下文),如下列範例所示
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
一個合理的問題是「全域攔截器相對於在本機配置或透過其他全域攔截器定義配置的其他攔截器是如何注入的?」目前的實作提供了一種簡單的機制來定義攔截器執行的順序。order
屬性中的正數可確保在任何現有攔截器之後注入攔截器,而負數可確保在現有攔截器之前注入攔截器。這表示,在前述範例中,全域攔截器是在本機配置的 'wire-tap' 攔截器之後注入的 (因為其 order
大於 0
)。如果還有另一個具有匹配 pattern
的全域攔截器,則其順序將透過比較兩個攔截器的 order
屬性的值來決定。若要在現有攔截器之前注入全域攔截器,請為 order
屬性使用負值。
請注意,order 和 pattern 屬性都是選用的。order 的預設值將為 0,而 pattern 的預設值為 '*' (以匹配所有通道)。 |
Wire Tap
如先前所述,Spring Integration 提供了一個簡單的 wire tap 攔截器。您可以在 <interceptors/>
元素中的任何通道上配置 wire tap。這樣做對於偵錯特別有用,並且可以與 Spring Integration 的記錄通道配接器結合使用,如下所示
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
'logging-channel-adapter' 也接受 'expression' 屬性,以便您可以針對 'payload' 和 'headers' 變數評估 SpEL 運算式。或者,若要記錄完整訊息 toString() 結果,請為 'log-full-message' 屬性提供值 true 。預設情況下,它是 false ,因此僅記錄 Payload。將其設定為 true 會啟用記錄除 Payload 之外的所有標頭。'expression' 選項提供了最大的彈性 (例如,expression="payload.user.name" )。 |
關於 wire tap 和其他類似元件 (訊息發布配置) 的常見誤解之一是它們本質上是自動非同步的。預設情況下,wire tap 作為元件不會以非同步方式叫用。相反,Spring Integration 專注於配置非同步行為的單一統一方法:訊息通道。使訊息流程的某些部分同步或非同步的是已在該流程中配置的訊息通道類型。這是訊息通道抽象的主要優點之一。從框架開始之初,我們就一直強調訊息通道作為框架一級公民的需求和價值。它不僅僅是 EIP 模式的內部、隱含實作。它完全公開為最終使用者可配置的元件。因此,wire tap 元件僅負責執行以下任務
-
透過點擊通道 (例如,
channelA
) 來攔截訊息流程 -
抓取每則訊息
-
將訊息傳送到另一個通道 (例如,
channelB
)
這本質上是橋接模式的變體,但它封裝在通道定義中(因此更容易啟用和停用,而不會中斷流程)。此外,與橋接模式不同,它基本上是分支另一個訊息流程。該流程是同步還是非同步?答案取決於 'channelB' 的訊息通道類型。我們有以下選項:直接通道、可輪詢通道和執行器通道。後兩者打破了執行緒邊界,使得透過此類通道的通訊成為非同步的,因為從該通道到其訂閱處理常式的訊息分派發生在與用於將訊息發送到該通道的執行緒不同的執行緒上。這就是使您的線路監聽流程同步或非同步的原因。這與框架內的其他組件(例如訊息發布者)一致,並增加了層次的一致性和簡潔性,讓您無需預先擔心(除了編寫執行緒安全程式碼之外)特定程式碼片段應該以同步還是非同步方式實作。透過訊息通道連接兩段程式碼(例如,組件 A 和組件 B)的實際佈線決定了它們的協作是同步還是非同步。您甚至可能希望將來從同步變更為非同步,而訊息通道可讓您快速完成此操作,而無需修改程式碼。
關於線路監聽的最後一點是,儘管上面提供了預設不應為非同步的理由,但您應記住,通常最好盡快交出訊息。因此,使用非同步通道選項作為線路監聽的出站通道非常常見。但是,非同步行為預設不是強制執行的。如果我們這樣做,將會破壞許多用例,包括您可能不希望破壞交易邊界。也許您將線路監聽模式用於稽核目的,並且您確實希望稽核訊息在原始交易中發送。例如,您可以將線路監聽連接到 JMS 出站通道配接器。這樣,您可以獲得兩全其美的效果:1) JMS 訊息的發送可以在交易內發生,同時 2) 它仍然是一個「發送後不理」的動作,從而防止主訊息流程中出現任何明顯的延遲。
從 4.0 版本開始,當攔截器(例如 WireTap 類別)引用通道時,避免循環引用非常重要。您需要從目前攔截器攔截的通道中排除此類通道。這可以使用適當的模式或以程式設計方式完成。如果您有自訂的 ChannelInterceptor 引用了 channel ,請考慮實作 VetoCapableInterceptor 。這樣,框架會根據提供的模式詢問攔截器是否可以攔截每個候選通道。您也可以在攔截器方法中新增執行階段保護,以確保通道不是攔截器引用的通道。WireTap 同時使用了這兩種技術。 |
從 4.3 版本開始,WireTap
具有額外的建構子,這些建構子接受 channelName
而不是 MessageChannel
實例。這對於 Java 配置以及使用通道自動建立邏輯時可能很方便。目標 MessageChannel
Bean 會在稍後(在與攔截器的第一次互動時)從提供的 channelName
解析。
通道解析需要 BeanFactory ,因此線路監聽實例必須是 Spring 管理的 Bean。 |
這種延遲綁定方法也簡化了使用 Java DSL 配置的典型線路監聽模式,如下例所示
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
條件式線路監聽
線路監聽可以透過使用 selector
或 selector-expression
屬性來設定條件。selector
引用 MessageSelector
Bean,它可以決定在執行階段訊息是否應傳送到監聽通道。同樣地,selector-expression
是一個布林值 SpEL 運算式,它執行相同的目的:如果運算式評估為 true
,則訊息會傳送到監聽通道。
全域線路監聽配置
可以將全域線路監聽配置為 全域通道攔截器配置 的特殊情況。若要執行此操作,請配置最上層的 wire-tap
元素。現在,除了正常的 wire-tap
命名空間支援之外,還支援 pattern
和 order
屬性,並且它們的工作方式與 channel-interceptor
完全相同。以下範例示範如何配置全域線路監聽
-
Java
-
XML
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
全域線路監聽提供了一種方便的方式來外部配置單通道線路監聽,而無需修改現有的通道配置。若要執行此操作,請將 pattern 屬性設定為目標通道名稱。例如,您可以使用此技術來配置測試案例,以驗證通道上的訊息。 |