訊息儲存庫

企業整合模式 (EIP) 書籍中,識別了數種具有緩衝訊息能力的模式。例如,彙集器會緩衝訊息,直到可以釋放它們為止,而 QueueChannel 會緩衝訊息,直到消費者明確地從該通道接收這些訊息。由於訊息流程中的任何點都可能發生故障,因此緩衝訊息的 EIP 元件也會引入訊息可能遺失的點。

為了降低訊息遺失的風險,EIP 定義了 訊息儲存庫 模式,該模式讓 EIP 元件能夠儲存訊息,通常儲存在某種類型的持久儲存庫中 (例如 RDBMS)。

Spring Integration 通過以下方式提供對訊息儲存庫模式的支援:

  • 定義 org.springframework.integration.store.MessageStore 策略介面

  • 提供此介面的數種實作

  • 在所有具有緩衝訊息能力的元件上公開 message-store 屬性,以便您可以注入任何實作 MessageStore 介面的實例。

關於如何設定特定的訊息儲存庫實作,以及如何將 MessageStore 實作注入到特定的緩衝元件的詳細資訊,將在本手冊中說明 (請參閱特定的元件,例如 QueueChannel彙集器延遲器 和其他元件)。以下範例組顯示如何為 QueueChannel 和彙集器新增對訊息儲存庫的參考

QueueChannel
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
彙集器
<int:aggregator message-store="refToMessageStore"/>

預設情況下,訊息會使用 o.s.i.store.SimpleMessageStore (MessageStore 的實作) 儲存在記憶體中。對於開發或簡單的低容量環境來說,這可能已經足夠,在這些環境中,非持久性訊息的潛在遺失並不是問題。但是,典型的生產應用程式需要更穩健的選項,不僅要降低訊息遺失的風險,還要避免潛在的記憶體不足錯誤。因此,我們也為各種資料儲存庫提供 MessageStore 實作。以下是支援的實作的完整清單:

但是,請注意在使用 MessageStore 的持久性實作時的一些限制。

訊息資料 (payload 和標頭) 會使用不同的序列化策略進行序列化和反序列化,具體取決於 MessageStore 的實作。例如,當使用 JdbcMessageStore 時,預設情況下只會持久儲存 Serializable 資料。在這種情況下,非 Serializable 標頭會在序列化之前移除。此外,請注意傳輸适配器 (例如 FTP、HTTP、JMS 等) 注入的協定特定標頭。例如,<http:inbound-channel-adapter/> 將 HTTP 標頭對應到訊息標頭,其中之一是非序列化的 org.springframework.http.MediaType 實例的 ArrayList。但是,您可以將您自己的 SerializerDeserializer 策略介面的實作注入到某些 MessageStore 實作 (例如 JdbcMessageStore) 中,以變更序列化和反序列化的行為。

請特別注意代表特定資料類型的標頭。例如,如果其中一個標頭包含某些 Spring Bean 的實例,則在反序列化時,您最終可能會得到該 Bean 的不同實例,這會直接影響框架建立的一些隱含標頭 (例如 REPLY_CHANNELERROR_CHANNEL)。目前,它們不可序列化,但是,即使它們可以序列化,反序列化的通道也不會代表預期的實例。

從 Spring Integration 3.0 版開始,您可以通過設定標頭豐富器來解決此問題,該標頭豐富器設定為在向 HeaderChannelRegistry 註冊通道後,將這些標頭替換為名稱。

此外,請考慮當您將訊息流程設定如下時會發生什麼情況:閘道器 → 佇列通道 (由持久性訊息儲存庫支援) → 服務啟動器。該閘道器會建立一個臨時回覆通道,在服務啟動器的輪詢器從佇列讀取時,該通道會遺失。同樣,您可以使用標頭豐富器將標頭替換為 String 表示形式。

如需更多資訊,請參閱 標頭豐富器

Spring Integration 4.0 引入了兩個新的介面

  • ChannelMessageStore:用於實作特定於 QueueChannel 實例的操作

  • PriorityCapableChannelMessageStore:用於標記 MessageStore 實作以用於 PriorityChannel 實例,並為持久性訊息提供優先順序。

實際行為取決於實作。框架提供了以下實作,可用作 QueueChannelPriorityChannel 的持久性 MessageStore

關於 SimpleMessageStore 的注意事項

從 4.1 版開始,SimpleMessageStore 在呼叫 getMessageGroup() 時不再複製訊息群組。對於大型訊息群組,這是一個顯著的效能問題。4.0.1 引入了一個布林值 copyOnGet 屬性,可讓您控制此行為。當彙集器在內部使用時,此屬性設定為 false 以提高效能。現在預設為 false

在彙集器等元件外部存取群組儲存庫的使用者,現在會取得彙集器正在使用的群組的直接參考,而不是副本。在彙集器外部操作群組可能會導致無法預測的結果。

因此,您應該不要執行此類操作,或將 copyOnGet 屬性設定為 true

使用 MessageGroupFactory

從 4.3 版開始,某些 MessageGroupStore 實作可以注入自訂 MessageGroupFactory 策略,以建立和自訂 MessageGroupStore 使用的 MessageGroup 實例。這預設為 SimpleMessageGroupFactory,它根據 GroupType.HASH_SET (LinkedHashSet) 內部集合產生 SimpleMessageGroup 實例。其他可能的選項是 SYNCHRONISED_SETBLOCKING_QUEUE,其中最後一個選項可用於恢復先前的 SimpleMessageGroup 行為。此外,PERSISTENT 選項也可用。有關更多資訊,請參閱下一節。從 5.0.1 版開始,如果群組中訊息的順序和唯一性無關緊要,則 LIST 選項也可用。

持久性 MessageGroupStore 和延遲載入

從 4.3 版開始,所有持久性 MessageGroupStore 實例都以延遲載入方式從儲存庫中檢索 MessageGroup 實例及其 messages。在大多數情況下,這對於關聯 MessageHandler 實例很有用 (請參閱 彙集器重排序器),因為在每次關聯操作時從儲存庫載入整個 MessageGroup 會增加額外負擔。

您可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false) 選項從設定中關閉延遲載入行為。

我們針對 MongoDB MessageStore (MongoDB 訊息儲存庫) 和 <aggregator> (彙集器) 上的延遲載入效能測試,使用了類似於以下的自訂 release-strategy

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

它為 1000 個簡單訊息產生類似於以下的結果

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

但是,從 5.5 版開始,所有持久性 MessageGroupStore 實作都提供了基於目標資料庫串流 API 的 streamMessagesForGroup(Object groupId) 合約。當群組在儲存庫中非常大時,這可以提高資源利用率。在框架內部,此新 API 用於 延遲器 (例如),在啟動時重新排程持久性訊息時。傳回的 Stream<Message<?>> 必須在處理結束時關閉,例如通過 try-with-resources 自動關閉。每當使用 PersistentMessageGroup 時,其 streamMessages() 都會委派給 MessageGroupStore.streamMessagesForGroup()

訊息群組條件

從 5.5 版開始,MessageGroup 抽象提供了一個 condition 字串選項。此選項的值可以是任何稍後可以出於任何原因解析的值,以對群組做出決策。例如,關聯訊息處理器 中的 ReleaseStrategy 可以從群組查詢此屬性,而不是迭代群組中的所有訊息。MessageGroupStore 公開了 setGroupCondition(Object groupId, String condition) API。為此目的,已將 setGroupConditionSupplier(BiFunction<Message<?>, String, String>) 選項新增至 AbstractCorrelatingMessageHandler。此函數針對新增到群組的每個訊息以及群組的現有條件進行評估。實作可以決定傳回新值、現有值或將目標條件重設為 nullcondition 的值可以是 JSON、SpEL 表達式、數字或任何可以序列化為字串並隨後解析的內容。例如,來自 檔案彙集器 元件的 FileMarkerReleaseStrategy,從 FileSplitter.FileMarker.Mark.END 訊息的 FileHeaders.LINE_COUNT 標頭將條件填入群組中,並從其 canRelease() 查詢,比較群組大小與此條件中的值。這樣,它就不會迭代群組中的所有訊息來尋找具有 FileHeaders.LINE_COUNT 標頭的 FileSplitter.FileMarker.Mark.END 訊息。它也允許結束標記在所有其他記錄之前到達彙集器;例如,在多執行緒環境中處理檔案時。

此外,為了設定方便,引入了 GroupConditionProvider 合約。AbstractCorrelatingMessageHandler 會檢查提供的 ReleaseStrategy 是否實作此介面,並提取 conditionSupplier 以進行群組條件評估邏輯。