分割器

分割器是一種元件,其作用是將訊息分割成數個部分,並將產生的訊息傳送以獨立處理。它們通常是管線中的上游生產者,管線包含彙集器。

程式設計模型

用於執行分割的 API 由一個基礎類別 AbstractMessageSplitter 組成。它是 MessageHandler 實作,封裝了分割器的常見功能,例如在產生的訊息上填入適當的訊息標頭 (CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)。這種填入功能可追蹤訊息及其處理結果 (在典型情境中,這些標頭會複製到各種轉換端點產生的訊息中)。這些值隨後可用於,例如,複合訊息處理器

以下範例顯示了 AbstractMessageSplitter 的摘錄

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

若要在應用程式中實作特定的分割器,您可以擴充 AbstractMessageSplitter 並實作 splitMessage 方法,其中包含分割訊息的邏輯。傳回值可以是下列其中之一

  • 訊息的 Collection 或陣列,或可迭代訊息的 Iterable (或 Iterator)。在此情況下,訊息會以訊息形式傳送 (在填入 CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER 之後)。使用此方法可讓您有更多控制權 — 例如,在分割過程中填入自訂訊息標頭。

  • 非訊息物件的 Collection 或陣列,或可迭代非訊息物件的 Iterable (或 Iterator)。其運作方式與先前的情況類似,不同之處在於每個集合元素都用作訊息 Payload。使用此方法可讓您專注於網域物件,而無需考慮訊息傳遞系統,並產生更容易測試的程式碼。

  • Message 或非訊息物件 (但不是集合或陣列)。其運作方式與先前的案例類似,不同之處在於只傳送單一訊息。

在 Spring Integration 中,任何 POJO 都可以實作分割演算法,前提是它定義了一個接受單一引數並具有傳回值的方法。在此情況下,方法的傳回值會如先前所述進行解譯。輸入引數可以是 Message 或簡單的 POJO。在後一種情況下,分割器會接收傳入訊息的 Payload。我們建議使用此方法,因為它使程式碼與 Spring Integration API 解耦,並且通常更容易測試。

迭代器

從 4.1 版開始,AbstractMessageSplitter 支援要分割的 valueIterator 類型。請注意,在 Iterator (或 Iterable) 的情況下,我們無法存取基礎項目的數量,且 SEQUENCE_SIZE 標頭會設定為 0。這表示 <aggregator> 的預設 SequenceSizeReleaseStrategy 無法運作,且來自 splitterCORRELATION_ID 的群組將不會釋放;它將保持 incomplete 狀態。在此情況下,您應該使用適當的自訂 ReleaseStrategy,或依賴 send-partial-result-on-expiry 以及 group-timeoutMessageGroupStoreReaper

從 5.0 版開始,AbstractMessageSplitter 提供 protected obtainSizeIfPossible() 方法,以允許在可能的情況下判斷 IterableIterator 物件的大小。例如,XPathMessageSplitter 可以判斷基礎 NodeList 物件的大小。從 5.0.9 版開始,此方法也能正確傳回 com.fasterxml.jackson.core.TreeNode 的大小。

Iterator 物件對於避免在分割之前需要在記憶體中建置整個集合非常有用。例如,當使用迭代或串流從某些外部系統 (例如,DataBase 或 FTP MGET) 填入基礎項目時。

串流和 Flux

從 5.0 版開始,AbstractMessageSplitter 支援 Java Stream 和 Reactive Streams Publisher 類型,用於要分割的 value。在此情況下,目標 Iterator 是在其迭代功能上建置的。

此外,如果分割器的輸出通道是 ReactiveStreamsSubscribableChannel 的實例,則 AbstractMessageSplitter 會產生 Flux 結果,而不是 Iterator,且輸出通道會訂閱此 Flux,以進行基於背壓的下游流程需求分割。

從 5.2 版開始,分割器支援 discardChannel 選項,用於傳送那些分割函數傳回空容器 (集合、陣列、串流、Flux 等) 的請求訊息。在這種情況下,沒有任何項目可迭代以傳送至 outputChannelnull 分割結果仍保留為流程結束指示。

使用 Java、Groovy 和 Kotlin DSL 設定分割器

基於 Message 及其可迭代 Payload 的簡單分割器範例,具有 DSL 設定

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

@Bean
public IntegrationFlow someFlow() {
    return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
    integrationFlow {
        split<Message<*>> { it.payload }
    }
@Bean
someFlow() {
    integrationFlow {
        splitWith {
		    expectedType Message<?>
		    function { it.payload }
        }
    }
}

如需有關 DSL 的詳細資訊,請參閱各章節

使用 XML 設定分割器

可以透過 XML 設定分割器,如下所示

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           (1)
  ref="splitterBean"                  (2)
  method="split"                      (3)
  input-channel="inputChannel"        (4)
  output-channel="outputChannel"      (5)
  discard-channel="discardChannel" /> (6)

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 分割器的 ID 是選用的。
2 應用程式內容中定義的 Bean 的參考。Bean 必須實作分割邏輯,如先前章節所述。選用。如果未提供 Bean 的參考,則假設抵達 input-channel 的訊息的 Payload 是 java.util.Collection 的實作,且預設分割邏輯會套用至集合,將每個個別元素併入訊息並將其傳送至 output-channel
3 在 Bean 上定義的實作分割邏輯的方法。選用。
4 分割器的輸入通道。必要。
5 分割器將傳入訊息的分割結果傳送到的通道。選用 (因為傳入訊息可以自行指定回覆通道)。
6 在分割結果為空的情況下,將請求訊息傳送到的通道。選用 (它們將像 null 結果一樣停止)。

如果自訂分割器實作可以在其他 <splitter> 定義中參考,我們建議使用 ref 屬性。但是,如果自訂分割器處理器實作應該限定於 <splitter> 的單一定義,您可以設定內部 Bean 定義,如下列範例所示

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允許在相同的 <int:splitter> 設定中使用 ref 屬性和內部處理器定義,因為這會造成不明確的條件,並導致擲回例外狀況。
如果 ref 屬性參考擴充 AbstractMessageProducingHandler 的 Bean (例如,架構本身提供的分割器),則會透過將輸出通道直接注入處理器來最佳化設定。在此情況下,每個 ref 都必須是個別的 Bean 實例 (或 prototype 範圍的 Bean) 或使用內部 <bean/> 設定類型。但是,只有當您未在分割器 XML 定義中提供任何分割器特定屬性時,此最佳化才適用。如果您不小心從多個 Bean 參考相同的訊息處理器,您會收到設定例外狀況。

使用註解設定分割器

@Splitter 註解適用於預期 Message 類型或訊息 Payload 類型的方法,且方法的傳回值應為任何類型的 Collection。如果傳回值不是實際的 Message 物件,則每個項目都會封裝在 Message 中作為 Message 的 Payload。每個產生的 Message 都會傳送至定義 @Splitter 的端點的指定輸出通道。

以下範例顯示如何使用 @Splitter 註解設定分割器

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}