通道适配器

通道适配器是一種訊息端點,可將單一發送者或接收者連接到訊息通道。 Spring Integration 提供了許多适配器來支援各種傳輸協定,例如 JMS、檔案、HTTP、Web 服務、郵件等等。本參考指南的後續章節將討論每個适配器。然而,本章重點介紹簡單但靈活的方法調用通道适配器支援。有輸入和輸出适配器,每個都可以使用核心命名空間中提供的 XML 元素進行設定。這些提供了一種擴展 Spring Integration 的簡便方法,只要您有一個可以作為來源或目的地的調用方法即可。

設定輸入通道适配器

inbound-channel-adapter 元素(在 Java 設定中為 SourcePollingChannelAdapter)可以調用 Spring 管理物件上的任何方法,並在將方法的輸出轉換為 Message 後,將非 null 傳回值發送到 MessageChannel。當适配器的訂閱被啟動時,輪詢器會嘗試從來源接收訊息。輪詢器會根據提供的設定,使用 TaskScheduler 進行排程。若要為個別通道适配器設定輪詢間隔或 cron 運算式,您可以提供一個包含排程屬性(例如 'fixed-rate' 或 'cron')的 'poller' 元素。以下範例定義了兩個 inbound-channel-adapter 實例

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow source1() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.fixedRate(5000)))
                ...
                .get();
}

@Bean
public IntegrationFlow source2() {
    return IntegrationFlow.from(() -> new GenericMessage<>(...),
                             e -> e.poller(p -> p.cron("30 * 9-17 * * MON-FRI")))
                ...
                .get();
}
public class SourceService {

    @InboundChannelAdapter(channel = "channel1", poller = @Poller(fixedRate = "5000"))
    Object method1() {
        ...
    }

    @InboundChannelAdapter(channel = "channel2", poller = @Poller(cron = "30 * 9-17 * * MON-FRI"))
    Object method2() {
        ...
    }
}
@Bean
fun messageSourceFlow() =
    integrationFlow( { GenericMessage<>(...) },
                    { poller { it.fixedRate(5000) } }) {
        ...
    }
<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller fixed-rate="5000"/>
</int:inbound-channel-adapter>

<int:inbound-channel-adapter ref="source2" method="method2" channel="channel2">
    <int:poller cron="30 * 9-17 * * MON-FRI"/>
</int:channel-adapter>
如果未提供輪詢器,則必須在上下文中註冊單一預設輪詢器。有關更多詳細資訊,請參閱 端點命名空間支援
輪詢端點的預設觸發器是一個 PeriodicTrigger 實例,其固定延遲週期為 1 秒。
重要:輪詢器設定

所有 inbound-channel-adapter 類型都由 SourcePollingChannelAdapter 支援,這表示它們包含一個輪詢器設定,該設定根據 Poller 中指定的設定輪詢 MessageSource(以調用產生值的方法,該值將成為 Message Payload)。以下範例顯示了兩個輪詢器的設定

<int:poller max-messages-per-poll="1" fixed-rate="1000"/>

<int:poller max-messages-per-poll="10" fixed-rate="1000"/>

在第一個設定中,輪詢任務每次輪詢調用一次,並且在每個任務(輪詢)期間,根據 max-messages-per-poll 屬性值,方法(導致訊息產生)調用一次。在第二個設定中,輪詢任務每次輪詢調用 10 次,或直到它傳回 'null',因此每次輪詢可能會產生十個訊息,而每次輪詢都以一秒間隔發生。但是,如果設定看起來像以下範例,會發生什麼情況

<int:poller fixed-rate="1000"/>

請注意,沒有指定 max-messages-per-poll。正如我們稍後將介紹的,PollingConsumer(例如,service-activatorfilterrouter 和其他)中相同的輪詢器設定,max-messages-per-poll 的預設值為 -1,這表示「不停地執行輪詢任務,除非輪詢方法傳回 null(可能是因為 QueueChannel 中沒有更多訊息)」,然後休眠一秒鐘。

但是,在 SourcePollingChannelAdapter 中,情況略有不同。 max-messages-per-poll 的預設值為 1,除非您明確將其設定為負值(例如 -1)。這確保輪詢器可以對生命週期事件(例如啟動和停止)做出反應,並防止它在 MessageSource 的自訂方法實作有可能永遠不會傳回 null 並且碰巧是不可中斷的情況下,可能無限循環旋轉。

但是,如果您確定您的方法可以傳回 null,並且您需要為每次輪詢輪詢盡可能多的來源,則應明確將 max-messages-per-poll 設定為負值,如下列範例所示

<int:poller max-messages-per-poll="-1" fixed-rate="1000"/>

從 5.5 版開始,max-messages-per-poll 的值 0 具有特殊含義 - 完全跳過 MessageSource.receive() 調用,這可以被視為暫停此輸入通道适配器,直到稍後時間將 maxMessagesPerPoll 變更為非零值,例如透過控制匯流排。

從 6.2 版開始,fixed-delayfixed-rate 可以以 ISO 8601 Duration 格式設定,例如 PT10SP1D 等。此外,基礎 PeriodicTriggerinitial-interval 也以與 fixed-delayfixed-rate 類似的值格式公開。

另請參閱 全域預設輪詢器 以取得更多資訊。

設定輸出通道适配器

outbound-channel-adapter 元素(Java 設定的 @ServiceActivator)也可以將 MessageChannel 連接到任何 POJO 消費者方法,該方法應使用發送到該通道的訊息 Payload 調用。以下範例顯示如何定義輸出通道适配器

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean
public IntegrationFlow outboundChannelAdapterFlow(MyPojo myPojo) {
    return f -> f
             .handle(myPojo, "handle");
}
public class MyPojo {

    @ServiceActivator(channel = "channel1")
    void handle(Object payload) {
        ...
    }

}
@Bean
fun outboundChannelAdapterFlow(myPojo: MyPojo) =
    integrationFlow {
        handle(myPojo, "handle")
    }
<int:outbound-channel-adapter channel="channel1" ref="target" method="handle"/>

<beans:bean id="target" class="org.MyPojo"/>

如果正在适配的通道是 PollableChannel,則必須提供輪詢器子元素(@ServiceActivator 上的 @Poller 子註解),如下列範例所示

  • Java

  • XML

public class MyPojo {

    @ServiceActivator(channel = "channel1", poller = @Poller(fixedRate = "3000"))
    void handle(Object payload) {
        ...
    }

}
<int:outbound-channel-adapter channel="channel2" ref="target" method="handle">
    <int:poller fixed-rate="3000" />
</int:outbound-channel-adapter>

<beans:bean id="target" class="org.MyPojo"/>

如果 POJO 消費者實作可以在其他 <outbound-channel-adapter> 定義中重複使用,則應使用 ref 屬性。但是,如果消費者實作僅由 <outbound-channel-adapter> 的單一定義引用,則可以將其定義為內部 Bean,如下列範例所示

<int:outbound-channel-adapter channel="channel" method="handle">
    <beans:bean class="org.Foo"/>
</int:outbound-channel-adapter>
在同一個 <outbound-channel-adapter> 設定中使用 ref 屬性和內部處理常式定義是不允許的,因為這會產生不明確的條件。這樣的設定會導致擲回例外狀況。

任何通道适配器都可以在沒有 channel 參考的情況下建立,在這種情況下,它會隱含地建立 DirectChannel 的實例。建立的通道名稱與 <inbound-channel-adapter><outbound-channel-adapter> 元素的 id 屬性相符。因此,如果未提供 channel,則 id 是必要的。

通道适配器運算式和指令碼

與許多其他 Spring Integration 元件一樣,<inbound-channel-adapter><outbound-channel-adapter> 也提供 SpEL 運算式評估的支援。若要使用 SpEL,請在 'expression' 屬性中提供運算式字串,而不是提供用於 Bean 上方法調用的 'ref' 和 'method' 屬性。當評估運算式時,它遵循與方法調用相同的合約,其中:<inbound-channel-adapter> 的運算式會在每次評估結果為非 null 值時產生訊息,而 <outbound-channel-adapter> 的運算式必須等同於傳回 void 的方法調用。

從 Spring Integration 3.0 開始,<int:inbound-channel-adapter/> 也可以使用 SpEL <expression/>(甚至 <script/>)子元素進行設定,以便在需要比簡單 'expression' 屬性能實現的更複雜功能時使用。如果您使用 location 屬性將指令碼作為 Resource 提供,您還可以設定 refresh-check-delay,這允許定期重新整理資源。如果您希望在每次輪詢時檢查指令碼,則需要將此設定與輪詢器的觸發器協調,如下列範例所示

<int:inbound-channel-adapter ref="source1" method="method1" channel="channel1">
    <int:poller max-messages-per-poll="1" fixed-delay="5000"/>
    <script:script lang="ruby" location="Foo.rb" refresh-check-delay="5000"/>
</int:inbound-channel-adapter>

另請參閱使用 <expression/> 子元素時,ReloadableResourceBundleExpressionSource 上的 cacheSeconds 屬性。有關運算式的更多資訊,請參閱 Spring 運算式語言 (SpEL)。對於指令碼,請參閱 Groovy 支援指令碼支援

<int:inbound-channel-adapter/> (SourcePollingChannelAdapter) 是一個端點,它透過定期觸發來輪詢一些底層 MessageSource 來啟動訊息流。由於在輪詢時沒有訊息物件,因此運算式和指令碼無法存取根 Message,因此在大多數其他訊息傳遞 SpEL 運算式中沒有可用的 Payload 或標頭屬性。指令碼可以產生並傳回包含標頭和 Payload 的完整 Message 物件,或者僅傳回 Payload,框架會將其新增到包含基本標頭的訊息中。