分割器
分割器是一種元件,其作用是將訊息分割成數個部分,並將產生的訊息傳送以獨立處理。它們通常是管線中的上游生產者,管線包含彙集器。
程式設計模型
用於執行分割的 API 由一個基礎類別 AbstractMessageSplitter
組成。它是 MessageHandler
實作,封裝了分割器的常見功能,例如在產生的訊息上填入適當的訊息標頭 (CORRELATION_ID
、SEQUENCE_SIZE
和 SEQUENCE_NUMBER
)。這種填入功能可追蹤訊息及其處理結果 (在典型情境中,這些標頭會複製到各種轉換端點產生的訊息中)。這些值隨後可用於,例如,複合訊息處理器。
以下範例顯示了 AbstractMessageSplitter
的摘錄
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
若要在應用程式中實作特定的分割器,您可以擴充 AbstractMessageSplitter
並實作 splitMessage
方法,其中包含分割訊息的邏輯。傳回值可以是下列其中之一
-
訊息的
Collection
或陣列,或可迭代訊息的Iterable
(或Iterator
)。在此情況下,訊息會以訊息形式傳送 (在填入CORRELATION_ID
、SEQUENCE_SIZE
和SEQUENCE_NUMBER
之後)。使用此方法可讓您有更多控制權 — 例如,在分割過程中填入自訂訊息標頭。 -
非訊息物件的
Collection
或陣列,或可迭代非訊息物件的Iterable
(或Iterator
)。其運作方式與先前的情況類似,不同之處在於每個集合元素都用作訊息 Payload。使用此方法可讓您專注於網域物件,而無需考慮訊息傳遞系統,並產生更容易測試的程式碼。 -
Message
或非訊息物件 (但不是集合或陣列)。其運作方式與先前的案例類似,不同之處在於只傳送單一訊息。
在 Spring Integration 中,任何 POJO 都可以實作分割演算法,前提是它定義了一個接受單一引數並具有傳回值的方法。在此情況下,方法的傳回值會如先前所述進行解譯。輸入引數可以是 Message
或簡單的 POJO。在後一種情況下,分割器會接收傳入訊息的 Payload。我們建議使用此方法,因為它使程式碼與 Spring Integration API 解耦,並且通常更容易測試。
迭代器
從 4.1 版開始,AbstractMessageSplitter
支援要分割的 value
的 Iterator
類型。請注意,在 Iterator
(或 Iterable
) 的情況下,我們無法存取基礎項目的數量,且 SEQUENCE_SIZE
標頭會設定為 0
。這表示 <aggregator>
的預設 SequenceSizeReleaseStrategy
無法運作,且來自 splitter
的 CORRELATION_ID
的群組將不會釋放;它將保持 incomplete
狀態。在此情況下,您應該使用適當的自訂 ReleaseStrategy
,或依賴 send-partial-result-on-expiry
以及 group-timeout
或 MessageGroupStoreReaper
。
從 5.0 版開始,AbstractMessageSplitter
提供 protected obtainSizeIfPossible()
方法,以允許在可能的情況下判斷 Iterable
和 Iterator
物件的大小。例如,XPathMessageSplitter
可以判斷基礎 NodeList
物件的大小。從 5.0.9 版開始,此方法也能正確傳回 com.fasterxml.jackson.core.TreeNode
的大小。
Iterator
物件對於避免在分割之前需要在記憶體中建置整個集合非常有用。例如,當使用迭代或串流從某些外部系統 (例如,DataBase 或 FTP MGET
) 填入基礎項目時。
串流和 Flux
從 5.0 版開始,AbstractMessageSplitter
支援 Java Stream
和 Reactive Streams Publisher
類型,用於要分割的 value
。在此情況下,目標 Iterator
是在其迭代功能上建置的。
此外,如果分割器的輸出通道是 ReactiveStreamsSubscribableChannel
的實例,則 AbstractMessageSplitter
會產生 Flux
結果,而不是 Iterator
,且輸出通道會訂閱此 Flux
,以進行基於背壓的下游流程需求分割。
從 5.2 版開始,分割器支援 discardChannel
選項,用於傳送那些分割函數傳回空容器 (集合、陣列、串流、Flux
等) 的請求訊息。在這種情況下,沒有任何項目可迭代以傳送至 outputChannel
。null
分割結果仍保留為流程結束指示。
使用 Java、Groovy 和 Kotlin DSL 設定分割器
基於 Message
及其可迭代 Payload 的簡單分割器範例,具有 DSL 設定
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
如需有關 DSL 的詳細資訊,請參閱各章節
使用 XML 設定分割器
可以透過 XML 設定分割器,如下所示
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 分割器的 ID 是選用的。 |
2 | 應用程式內容中定義的 Bean 的參考。Bean 必須實作分割邏輯,如先前章節所述。選用。如果未提供 Bean 的參考,則假設抵達 input-channel 的訊息的 Payload 是 java.util.Collection 的實作,且預設分割邏輯會套用至集合,將每個個別元素併入訊息並將其傳送至 output-channel 。 |
3 | 在 Bean 上定義的實作分割邏輯的方法。選用。 |
4 | 分割器的輸入通道。必要。 |
5 | 分割器將傳入訊息的分割結果傳送到的通道。選用 (因為傳入訊息可以自行指定回覆通道)。 |
6 | 在分割結果為空的情況下,將請求訊息傳送到的通道。選用 (它們將像 null 結果一樣停止)。 |
如果自訂分割器實作可以在其他 <splitter>
定義中參考,我們建議使用 ref
屬性。但是,如果自訂分割器處理器實作應該限定於 <splitter>
的單一定義,您可以設定內部 Bean 定義,如下列範例所示
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允許在相同的 <int:splitter> 設定中使用 ref 屬性和內部處理器定義,因為這會造成不明確的條件,並導致擲回例外狀況。 |
如果 ref 屬性參考擴充 AbstractMessageProducingHandler 的 Bean (例如,架構本身提供的分割器),則會透過將輸出通道直接注入處理器來最佳化設定。在此情況下,每個 ref 都必須是個別的 Bean 實例 (或 prototype 範圍的 Bean) 或使用內部 <bean/> 設定類型。但是,只有當您未在分割器 XML 定義中提供任何分割器特定屬性時,此最佳化才適用。如果您不小心從多個 Bean 參考相同的訊息處理器,您會收到設定例外狀況。 |