訊息端點

本章的第一部分涵蓋了一些背景理論,並揭示了許多關於驅動 Spring Integration 各種訊息傳遞元件的底層 API。如果您想真正了解幕後發生的事情,這些資訊可能會有所幫助。但是,如果您想開始使用各種元素的簡化命名空間設定,請隨時跳到端點命名空間支援

如總覽中所述,訊息端點負責將各種訊息傳遞元件連接到通道。在接下來的幾章中,我們將介紹許多不同的元件,這些元件會使用訊息。其中一些元件也能够發送回覆訊息。發送訊息非常簡單。如先前在訊息通道中所示,您可以將訊息發送到訊息通道。但是,接收訊息稍微複雜一些。主要原因是消費者有兩種型別:輪詢消費者事件驅動消費者

在兩者中,事件驅動消費者更簡單。它們基本上是不需要管理和排程單獨輪詢器執行緒的監聽器,具有回呼方法。當連接到 Spring Integration 的可訂閱訊息通道之一時,這個簡單的選項效果很好。但是,當連接到緩衝的、可輪詢的訊息通道時,某些元件必須排程和管理輪詢執行緒。Spring Integration 提供了兩種不同的端點實作來適應這兩種型別的消費者。因此,消費者本身只需要實作回呼介面。當需要輪詢時,端點充當消費者實例的容器。其好處類似於使用容器來託管訊息驅動 Bean,但是,由於這些消費者是 Spring 管理的物件,在 ApplicationContext 中執行,因此它更類似於 Spring 自己的 MessageListener 容器。

訊息處理器

Spring Integration 的 MessageHandler 介面由框架內的許多元件實作。換句話說,這不是公用 API 的一部分,您通常不會直接實作 MessageHandler。儘管如此,訊息消費者使用它來實際處理已消費的訊息,因此了解這個策略介面確實有助於理解消費者的整體角色。介面定義如下

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

儘管介面非常簡單,但它為以下章節中涵蓋的大多數元件(路由器、轉換器、分割器、彙集器、服務啟動器和其他元件)提供了基礎。這些元件各自對它們處理的訊息執行非常不同的功能,但是實際接收訊息的要求是相同的,輪詢和事件驅動行為之間的選擇也是相同的。Spring Integration 提供了兩個端點實作,用於託管這些基於回呼的處理器,並讓它們連接到訊息通道。

事件驅動消費者

由於事件驅動消費者是兩者中較簡單的,因此我們先介紹事件驅動消費者端點。您可能還記得 SubscribableChannel 介面提供了 subscribe() 方法,並且該方法接受 MessageHandler 參數(如 SubscribableChannel 中所示)。以下清單顯示了 subscribe 方法的定義

subscribableChannel.subscribe(messageHandler);

由於訂閱通道的處理器不必主動輪詢該通道,因此這是一個事件驅動消費者,Spring Integration 提供的實作接受 SubscribableChannelMessageHandler,如以下範例所示

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

輪詢消費者

Spring Integration 也提供 PollingConsumer,可以以相同的方式實例化它,但通道必須實作 PollableChannel,如以下範例所示

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有關輪詢消費者的更多資訊,請參閱 通道配接器通道配接器

輪詢消費者還有許多其他設定選項。以下範例示範如何設定觸發器

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));

PeriodicTrigger 通常使用簡單的間隔 (Duration) 定義,但也支援 initialDelay 屬性和布林值 fixedRate 屬性(預設值為 false - 即,無固定延遲)。以下範例設定了這兩個屬性

PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);

前述範例中三個設定的結果是一個觸發器,它等待五秒鐘,然後每秒觸發一次。

CronTrigger 需要有效的 cron 運算式。有關詳細資訊,請參閱 Javadoc。以下範例設定了新的 CronTrigger

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

前一個範例中定義的觸發器的結果是一個觸發器,它在週一至週五每十秒觸發一次。

輪詢端點的預設觸發器是 PeriodicTrigger 實例,其固定延遲週期為 1 秒。

除了觸發器之外,您還可以指定其他兩個與輪詢相關的設定屬性:maxMessagesPerPollreceiveTimeout。以下範例示範如何設定這兩個屬性

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll 屬性指定在給定的輪詢操作中要接收的最大訊息數。這表示輪詢器會繼續呼叫 receive() 而不等待,直到返回 null 或達到最大值。例如,如果輪詢器的間隔觸發器為十秒,且 maxMessagesPerPoll 設定為 25,並且它正在輪詢佇列中有 100 條訊息的通道,則所有 100 條訊息都可以在 40 秒內擷取。它抓取 25 條,等待十秒鐘,抓取接下來的 25 條,依此類推。如果 maxMessagesPerPoll 設定為負值,則在單個輪詢週期內呼叫 MessageSource.receive(),直到它返回 null。從 5.5 版開始,值 0 具有特殊含義 - 完全跳過 MessageSource.receive() 呼叫,這可以視為暫停此輪詢端點,直到稍後將 maxMessagesPerPoll 更改為非零值,例如透過控制匯流排。

如果輪詢器在叫用接收操作時沒有訊息可用,則 receiveTimeout 屬性指定輪詢器應等待的時間量。例如,考慮兩個表面上看起來相似但實際上非常不同的選項:第一個選項的間隔觸發器為 5 秒,接收逾時為 50 毫秒,而第二個選項的間隔觸發器為 50 毫秒,接收逾時為 5 秒。第一個選項可能會比它在通道上接受的時間晚最多 4950 毫秒接收到訊息(如果該訊息在其輪詢呼叫返回後立即到達)。另一方面,第二個設定永遠不會遺漏超過 50 毫秒的訊息。不同之處在於第二個選項需要執行緒等待。但是,結果是,它可以更快地回應到達的訊息。這種稱為「長輪詢」的技術可用於模擬輪詢來源上的事件驅動行為。

輪詢消費者也可以委派給 Spring TaskExecutor,如以下範例所示

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

此外,PollingConsumer 具有一個名為 adviceChain 的屬性。此屬性可讓您指定 AOP Advice 的 List,以處理包括交易在內的其他跨領域關注點。這些 Advice 應用於 doPoll() 方法周圍。有關更深入的資訊,請參閱 端點命名空間支援 下關於 AOP Advice 鏈和交易支援的章節。另請參閱 @Poller 註解 Javadoc 和各自的 訊息傳遞註解支援 章節。Java DSL 也提供了 .poller() 端點設定選項及其各自的 Pollers 工廠。

先前的範例顯示了依賴項查閱。但是,請記住,這些消費者通常設定為 Spring Bean 定義。實際上,Spring Integration 也提供了一個名為 ConsumerEndpointFactoryBeanFactoryBean,它根據通道的型別建立適當的消費者型別。此外,Spring Integration 具有完整的 XML 命名空間支援,甚至可以進一步隱藏這些詳細資訊。在本指南中,當介紹每種類型的元件時,都會介紹基於命名空間的設定。

許多 MessageHandler 實作可以產生回覆訊息。如前所述,與接收訊息相比,發送訊息微不足道。儘管如此,何時以及發送多少回覆訊息取決於處理器型別。例如,彙集器等待多條訊息到達,並且通常設定為分割器的下游消費者,分割器可以為它處理的每條訊息產生多個回覆。當使用命名空間設定時,您不必嚴格地了解所有詳細資訊。但是,仍然值得了解的是,其中幾個元件共用一個通用的基底類別 AbstractReplyProducingMessageHandler,並且它提供了 setOutputChannel(..) 方法。

端點命名空間支援

在本參考手冊中,您可以找到端點元素的特定設定範例,例如路由器、轉換器、服務啟動器等等。它們中的大多數都支援 input-channel 屬性,並且許多都支援 output-channel 屬性。在被解析之後,這些端點元素會產生 PollingConsumerEventDrivenConsumer 的實例,具體取決於引用的 input-channel 的型別:分別為 PollableChannelSubscribableChannel。當通道可輪詢時,輪詢行為基於端點元素的 poller 子元素及其屬性。

以下列出了 poller 的所有可用設定選項

<int:poller cron=""                                  (1)
            default="false"                          (2)
            error-channel=""                         (3)
            fixed-delay=""                           (4)
            fixed-rate=""                            (5)
            initial-delay=""                         (6)
            id=""                                    (7)
            max-messages-per-poll=""                 (8)
            receive-timeout=""                       (9)
            ref=""                                   (10)
            task-executor=""                         (11)
            time-unit="MILLISECONDS"                 (12)
            trigger="">                              (13)
            <int:advice-chain />                     (14)
            <int:transactional />                    (15)
</int:poller>
1 提供使用 Cron 運算式設定輪詢器的能力。底層實作使用 org.springframework.scheduling.support.CronTrigger。如果設定了此屬性,則不得指定以下任何屬性:fixed-delaytriggerfixed-rateref
2 將此屬性設定為 true,您可以準確地定義一個全域預設輪詢器。如果在應用程式環境定義中定義了多個預設輪詢器,則會引發例外。任何連接到 PollableChannel (PollingConsumer) 的端點或任何沒有明確設定輪詢器的 SourcePollingChannelAdapter 都會使用全域預設輪詢器。預設值為 false。選用。
3 識別如果此輪詢器的叫用中發生失敗,則將錯誤訊息發送到哪個通道。若要完全抑制例外,您可以提供對 nullChannel 的參考。選用。
4 固定延遲觸發器在底層使用 PeriodicTrigger。數值以 time-unit 為單位,也可以是持續時間格式(從 6.2 版開始),例如 PT10SP1D。如果設定了此屬性,則不得指定以下任何屬性:fixed-ratetriggercronref
5 固定速率觸發器在底層使用 PeriodicTrigger。數值以 time-unit 為單位,也可以是持續時間格式(從 6.2 版開始),例如 PT10SP1D。如果設定了此屬性,則不得指定以下任何屬性:fixed-delaytriggercronref
6 底層 PeriodicTrigger 的初始延遲(從 6.2 版開始)。數值以 time-unit 為單位,也可以是持續時間格式,例如 PT10SP1D
7 識別輪詢器底層 Bean 定義的 ID,其型別為 org.springframework.integration.scheduling.PollerMetadata。對於頂層輪詢器元素,id 屬性是必需的,除非它是預設輪詢器 (default="true")。
8 有關更多資訊,請參閱 設定輸入通道配接器。如果未指定,則預設值取決於環境定義。如果您使用 PollingConsumer,則此屬性預設為 -1。但是,如果您使用 SourcePollingChannelAdapter,則 max-messages-per-poll 屬性預設為 1。選用。
9 值設定在底層類別 PollerMetadata 上。如果未指定,則預設值為 1000(毫秒)。選用。
10 Bean 參考另一個頂層輪詢器。ref 屬性不得出現在頂層 poller 元素上。但是,如果設定了此屬性,則不得指定以下任何屬性:fixed-ratetriggercronfixed-delay
11 提供參考自訂任務執行器的能力。有關更多資訊,請參閱 TaskExecutor 支援。選用。
12 此屬性在底層 org.springframework.scheduling.support.PeriodicTrigger 上指定 java.util.concurrent.TimeUnit 列舉值。因此,此屬性只能與 fixed-delayfixed-rate 屬性結合使用。如果與 crontrigger 參考屬性結合使用,則會導致失敗。PeriodicTrigger 的最小支援粒度為毫秒。因此,唯一可用的選項是毫秒和秒。如果未提供此值,則任何 fixed-delayfixed-rate 值都將被解釋為毫秒。基本上,此列舉為基於秒的間隔觸發器值提供了便利。對於每小時、每天和每月的設定,我們建議改用 cron 觸發器。
13 參考任何 Spring 設定的 Bean,該 Bean 實作了 org.springframework.scheduling.Trigger 介面。但是,如果設定了此屬性,則不得指定以下任何屬性:fixed-delayfixed-ratecronref。選用。
14 允許指定額外的 AOP Advice 以處理其他跨領域關注點。有關更多資訊,請參閱 交易。選用。
15 輪詢器可以設為交易性。有關更多資訊,請參閱 AOP Advice 鏈。選用。

範例

可以使用以下方式設定簡單的基於間隔的輪詢器,間隔為 1 秒

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

作為使用 fixed-rate 屬性的替代方法,您也可以使用 fixed-delay 屬性。

對於基於 Cron 運算式的輪詢器,請改用 cron 屬性,如以下範例所示

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

如果輸入通道是 PollableChannel,則需要輪詢器設定。具體來說,如前所述,triggerPollingConsumer 類別的必要屬性。因此,如果您為輪詢消費者端點的設定省略 poller 子元素,則可能會擲回例外。如果您嘗試在連接到不可輪詢通道的元素上設定輪詢器,也可能會擲回例外。

也可以建立頂層輪詢器,在這種情況下,只需要 ref 屬性,如以下範例所示

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>
ref 屬性僅允許在內部輪詢器定義上使用。在頂層輪詢器上定義此屬性會導致在應用程式環境定義初始化期間擲回設定例外。

全域預設輪詢器

為了進一步簡化設定,您可以定義全域預設輪詢器。XML DSL 中的單個頂層輪詢器元件可以將 default 屬性設定為 true。對於 Java 設定,必須在此情況下宣告具有 PollerMetadata.DEFAULT_POLLER 名稱的 PollerMetadata Bean。在這種情況下,任何輸入通道為 PollableChannel 的端點,在同一個 ApplicationContext 中定義,並且沒有明確設定 poller,都將使用該預設值。以下範例顯示了這樣一個輪詢器和一個使用它的轉換器

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlow.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
    	transform(transformer) // No 'poller' attribute because there is a default global poller
    	channel("output")
    }
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>

交易支援

Spring Integration 也為輪詢器提供交易支援,以便每個接收和轉發操作都可以作為原子工作單元執行。若要為輪詢器設定交易,請新增 <transactional/> 子元素。以下範例顯示了可用的屬性

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

有關更多資訊,請參閱 輪詢器交易支援

AOP Advice 鏈

由於 Spring 交易支援依賴於具有 TransactionInterceptor (AOP Advice) 的 Proxy 機制來處理由輪詢器啟動的訊息流程的交易行為,因此您有時必須提供額外的 Advice 來處理與輪詢器關聯的其他跨領域行為。為此,poller 定義了一個 advice-chain 元素,可讓您在實作 MethodInterceptor 介面的類別中新增更多 Advice。以下範例顯示了如何為 poller 定義 advice-chain

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

有關如何實作 MethodInterceptor 介面的更多資訊,請參閱 Spring Framework 參考指南的 AOP 章節。Advice 鏈也可以應用於沒有任何交易設定的輪詢器,讓您可以增強由輪詢器啟動的訊息流程的行為。

當使用 Advice 鏈時,無法指定 <transactional/> 子元素。而是宣告 <tx:advice/> Bean 並將其新增到 <advice-chain/>。有關完整的設定詳細資訊,請參閱 輪詢器交易支援

TaskExecutor 支援

輪詢執行緒可以由 Spring TaskExecutor 抽象的任何實例執行。這為端點或端點群組啟用了並行性。從 Spring 3.0 開始,核心 Spring Framework 具有 task 命名空間,其 <executor/> 元素支援建立簡單的執行緒池執行器。該元素接受用於常見並行設定的屬性,例如 pool-size 和 queue-capacity。設定執行緒池執行器可以顯著改變端點在負載下的效能。這些設定對於每個端點都是可用的,因為端點的效能是要考慮的主要因素之一(另一個主要因素是端點訂閱的通道上的預期容量)。若要為使用 XML 命名空間支援設定的輪詢端點啟用並行性,請在其 <poller/> 元素上提供 task-executor 參考,然後提供以下範例中顯示的一個或多個屬性

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

如果您不提供任務執行器,則消費者的處理器會在呼叫者的執行緒中叫用。請注意,呼叫者通常是預設的 TaskScheduler(請參閱 設定任務排程器)。您也應該記住,task-executor 屬性可以透過指定 Bean 名稱來提供對 Spring TaskExecutor 介面的任何實作的參考。先前顯示的 executor 元素是為了方便起見而提供的。

如先前在 輪詢消費者的背景章節 中所述,您也可以將輪詢消費者設定為模擬事件驅動行為的方式。透過長接收逾時和觸發器中的短間隔,即使在輪詢訊息來源上,您也可以確保對到達的訊息做出非常及時的反應。請注意,這僅適用於具有逾時阻塞等待呼叫的來源。例如,檔案輪詢器不會阻塞。每個 receive() 呼叫都會立即返回,並且包含新檔案或不包含。因此,即使輪詢器包含長 receive-timeout,該值在這種情況下也永遠不會使用。另一方面,當使用 Spring Integration 自己的基於佇列的通道時,逾時值確實有機會參與。以下範例示範了輪詢消費者如何幾乎立即接收訊息

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

使用此方法不會帶來太多額外負荷,因為在內部,它只是一個定時等待執行緒,它不需要像(例如)不斷抖動的無限迴圈那樣多的 CPU 資源使用量。

在執行階段變更輪詢速率

當使用 fixed-delayfixed-rate 屬性設定輪詢器時,預設實作使用 PeriodicTrigger 實例。PeriodicTrigger 是核心 Spring Framework 的一部分。它僅接受間隔作為建構子引數。因此,它無法在執行階段變更。

然而,您可以定義自己實作的 org.springframework.scheduling.Trigger 介面。您甚至可以使用 PeriodicTrigger 作為起點。然後您可以為間隔(週期)新增 setter,或者您甚至可以在觸發器本身內嵌入自己的節流邏輯。period 屬性與每次對 nextExecutionTime 的呼叫一起使用,以排程下一次輪詢。若要在輪詢器中使用此自訂觸發器,請在您的應用程式內容中宣告自訂觸發器的 bean 定義,並使用 trigger 屬性將相依性注入到您的輪詢器組態中,該屬性會參考自訂觸發器 bean 實例。您現在可以取得對觸發器 bean 的參考,並變更輪詢之間的輪詢間隔。

如需範例,請參閱 Spring Integration Samples 專案。它包含一個名為 dynamic-poller 的範例,該範例使用自訂觸發器,並示範在執行階段變更輪詢間隔的能力。

此範例提供了一個自訂觸發器,實作了 org.springframework.scheduling.Trigger 介面。此範例的觸發器是基於 Spring 的 PeriodicTrigger 實作。然而,自訂觸發器的欄位不是 final 的,且屬性具有明確的 getter 和 setter,讓您可以在執行階段動態變更輪詢週期。

但請務必注意,由於 Trigger 方法是 nextExecutionTime(),因此對動態觸發器的任何變更,在根據現有組態進行下一次輪詢之前,都不會生效。無法強制觸發器在其目前組態的下一次執行時間之前觸發。

酬載類型轉換

在本參考手冊中,您也可以看到各種端點的特定組態和實作範例,這些端點接受訊息或任何任意 Object 作為輸入參數。在 Object 的情況下,此類參數會對應到訊息酬載或酬載的一部分或標頭(當使用 Spring 運算式語言時)。然而,端點方法的輸入參數類型有時與酬載或其部分的類型不符。在這種情況下,我們需要執行類型轉換。Spring Integration 提供了一種方便的方式,可在其自身的轉換服務 bean 實例(名為 integrationConversionService)中註冊類型轉換器(透過使用 Spring ConversionService)。只要使用 Spring Integration 基礎架構定義第一個轉換器,就會自動建立該 bean。若要註冊轉換器,您可以實作 org.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactory

Converter 實作是最簡單的,且會從單一類型轉換為另一種類型。若要進行更複雜的操作,例如轉換為類別階層,您可以實作 GenericConverter,以及可能的 ConditionalConverter。這些讓您可以完全存取 fromto 類型描述子,從而實現複雜的轉換。例如,如果您有一個名為 Something 的抽象類別,它是您轉換的目標(參數類型、通道資料類型等等),您有兩個名為 Thing1Thing 的具體實作,並且您希望根據輸入類型轉換為其中一個,則 GenericConverter 將會是一個不錯的選擇。如需更多資訊,請參閱這些介面的 Javadoc

當您實作轉換器後,您可以使用方便的命名空間支援來註冊它,如下列範例所示

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

或者,您可以使用內部 bean,如下列範例所示

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

從 Spring Integration 4.0 開始,您可以使用註解來建立先前的組態,如下列範例所示

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

或者,您可以使用 @Configuration 註解,如下列範例所示

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}

當組態應用程式內容時,Spring Framework 可讓您新增 conversionService bean(請參閱 組態 ConversionService 章節)。此服務會在需要時使用,以在 bean 建立和組態期間執行適當的轉換。

相反地,integrationConversionService 用於執行階段轉換。這些用途截然不同。如果用於執行階段,針對資料類型通道、酬載類型轉換器等等中的訊息進行 Spring Integration 運算式評估,則旨在用於連接 bean 建構子引數和屬性的轉換器可能會產生非預期的結果。

但是,如果您確實想要使用 Spring conversionService 作為 Spring Integration integrationConversionService,您可以在應用程式內容中組態別名,如下列範例所示

<alias name="conversionService" alias="integrationConversionService"/>

在這種情況下,conversionService 提供的轉換器可用於 Spring Integration 執行階段轉換。

內容類型轉換

從版本 5.0 開始,預設情況下,方法調用機制是基於 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 基礎架構。其 HandlerMethodArgumentResolver 實作(例如 PayloadArgumentResolverMessageMethodArgumentResolver)可以使用 MessageConverter 抽象概念,將傳入的 payload 轉換為目標方法引數類型。轉換可以基於 contentType 訊息標頭。為此,Spring Integration 提供了 ConfigurableCompositeMessageConverter,它委派給已註冊的轉換器清單,以便調用,直到其中一個轉換器傳回非 null 結果。預設情況下,此轉換器提供(依嚴格順序)

請參閱 Javadoc(連結於先前的清單中),以取得有關其用途和適用於轉換的 contentType 值的更多資訊。使用 ConfigurableCompositeMessageConverter 是因為它可以提供任何其他 MessageConverter 實作,包括或排除先前提及的預設轉換器。它也可以註冊為應用程式內容中的適當 bean,覆寫預設轉換器,如下列範例所示

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

這兩個新的轉換器在預設轉換器之前註冊在組合轉換器中。您也可以不使用 ConfigurableCompositeMessageConverter,而是透過註冊名稱為 integrationArgumentResolverMessageConverter 的 bean(透過設定 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 屬性)來提供您自己的 MessageConverter

當使用 SpEL 方法調用時,基於 MessageConverter 的轉換(包括 contentType 標頭)不可用。在這種情況下,只有 酬載類型轉換 中提及的常規類別對類別轉換才可用。

非同步輪詢

如果您希望輪詢是非同步的,輪詢器可以選擇性地指定 task-executor 屬性,該屬性指向任何 TaskExecutor bean 的現有實例(Spring 3.0 透過 task 命名空間提供了方便的命名空間組態)。但是,當使用 TaskExecutor 組態輪詢器時,您必須了解某些事項。

問題在於存在兩個組態,輪詢器和 TaskExecutor。它們必須彼此協調一致。否則,您最終可能會建立人為的記憶體洩漏。

請考慮下列組態

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

先前的組態示範了一個不協調的組態。

預設情況下,任務執行器具有無界限的任務佇列。即使所有執行緒都被封鎖,等待新訊息到達或逾時到期,輪詢器仍會持續排程新任務。假設有 20 個執行緒以五秒逾時執行任務,它們的執行速率為每秒 4 個。但是,新任務的排程速率為每秒 20 個,因此任務執行器中的內部佇列以每秒 16 個的速度增長(當程序閒置時),因此我們發生了記憶體洩漏。

處理此問題的方法之一是設定任務執行器的 queue-capacity 屬性。即使是 0 也是一個合理的值。您也可以透過設定 Task Executor 的 rejection-policy 屬性(例如,設定為 DISCARD)來管理它,以指定如何處理無法排入佇列的訊息。換句話說,當組態 TaskExecutor 時,您必須了解某些細節。如需有關此主題的更多詳細資訊,請參閱 Spring 參考手冊中的 「任務執行和排程」

端點內部 Bean

許多端點都是複合 bean。這包括所有消費者和所有輪詢的輸入通道配接器。消費者(輪詢或事件驅動)委派給 MessageHandler。輪詢的配接器透過委派給 MessageSource 來取得訊息。通常,取得對委派 bean 的參考很有用,可能是為了在執行階段變更組態或進行測試。這些 bean 可以從 ApplicationContext 中取得,並具有眾所周知的名稱。MessageHandler 實例會使用類似於 someConsumer.handler 的 bean ID 註冊到應用程式內容中(其中 'consumer' 是端點 id 屬性的值)。MessageSource 實例會使用類似於 somePolledAdapter.source 的 bean ID 註冊,其中 'somePolledAdapter' 是配接器的 ID。

先前內容僅適用於框架元件本身。您可以改為使用內部 bean 定義,如下列範例所示

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

此 bean 的處理方式與任何宣告的內部 bean 相同,並且不會註冊到應用程式內容中。如果您希望以其他方式存取此 bean,請在頂層宣告它並使用 id,並改用 ref 屬性。如需更多資訊,請參閱 Spring 文件