訊息儲存庫
企業整合模式 (EIP) 書籍中,識別了數種具有緩衝訊息能力的模式。例如,彙集器會緩衝訊息,直到可以釋放它們為止,而 QueueChannel
會緩衝訊息,直到消費者明確地從該通道接收這些訊息。由於訊息流程中的任何點都可能發生故障,因此緩衝訊息的 EIP 元件也會引入訊息可能遺失的點。
為了降低訊息遺失的風險,EIP 定義了 訊息儲存庫 模式,該模式讓 EIP 元件能夠儲存訊息,通常儲存在某種類型的持久儲存庫中 (例如 RDBMS)。
Spring Integration 通過以下方式提供對訊息儲存庫模式的支援:
-
定義
org.springframework.integration.store.MessageStore
策略介面 -
提供此介面的數種實作
-
在所有具有緩衝訊息能力的元件上公開
message-store
屬性,以便您可以注入任何實作MessageStore
介面的實例。
關於如何設定特定的訊息儲存庫實作,以及如何將 MessageStore
實作注入到特定的緩衝元件的詳細資訊,將在本手冊中說明 (請參閱特定的元件,例如 QueueChannel、彙集器、延遲器 和其他元件)。以下範例組顯示如何為 QueueChannel
和彙集器新增對訊息儲存庫的參考
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
預設情況下,訊息會使用 o.s.i.store.SimpleMessageStore
(MessageStore
的實作) 儲存在記憶體中。對於開發或簡單的低容量環境來說,這可能已經足夠,在這些環境中,非持久性訊息的潛在遺失並不是問題。但是,典型的生產應用程式需要更穩健的選項,不僅要降低訊息遺失的風險,還要避免潛在的記憶體不足錯誤。因此,我們也為各種資料儲存庫提供 MessageStore
實作。以下是支援的實作的完整清單:
-
Hazelcast 訊息儲存庫:使用 Hazelcast 分散式快取來儲存訊息
-
JDBC 訊息儲存庫:使用 RDBMS 來儲存訊息
-
Redis 訊息儲存庫:使用 Redis 鍵/值資料儲存庫來儲存訊息
-
MongoDB 訊息儲存庫:使用 MongoDB 文件儲存庫來儲存訊息
但是,請注意在使用 訊息資料 (payload 和標頭) 會使用不同的序列化策略進行序列化和反序列化,具體取決於 請特別注意代表特定資料類型的標頭。例如,如果其中一個標頭包含某些 Spring Bean 的實例,則在反序列化時,您最終可能會得到該 Bean 的不同實例,這會直接影響框架建立的一些隱含標頭 (例如 從 Spring Integration 3.0 版開始,您可以通過設定標頭豐富器來解決此問題,該標頭豐富器設定為在向 此外,請考慮當您將訊息流程設定如下時會發生什麼情況:閘道器 → 佇列通道 (由持久性訊息儲存庫支援) → 服務啟動器。該閘道器會建立一個臨時回覆通道,在服務啟動器的輪詢器從佇列讀取時,該通道會遺失。同樣,您可以使用標頭豐富器將標頭替換為 如需更多資訊,請參閱 標頭豐富器。 |
Spring Integration 4.0 引入了兩個新的介面
-
ChannelMessageStore
:用於實作特定於QueueChannel
實例的操作 -
PriorityCapableChannelMessageStore
:用於標記MessageStore
實作以用於PriorityChannel
實例,並為持久性訊息提供優先順序。
實際行為取決於實作。框架提供了以下實作,可用作 QueueChannel
和 PriorityChannel
的持久性 MessageStore
關於
SimpleMessageStore 的注意事項從 4.1 版開始, 在彙集器等元件外部存取群組儲存庫的使用者,現在會取得彙集器正在使用的群組的直接參考,而不是副本。在彙集器外部操作群組可能會導致無法預測的結果。 因此,您應該不要執行此類操作,或將 |
使用 MessageGroupFactory
從 4.3 版開始,某些 MessageGroupStore
實作可以注入自訂 MessageGroupFactory
策略,以建立和自訂 MessageGroupStore
使用的 MessageGroup
實例。這預設為 SimpleMessageGroupFactory
,它根據 GroupType.HASH_SET
(LinkedHashSet
) 內部集合產生 SimpleMessageGroup
實例。其他可能的選項是 SYNCHRONISED_SET
和 BLOCKING_QUEUE
,其中最後一個選項可用於恢復先前的 SimpleMessageGroup
行為。此外,PERSISTENT
選項也可用。有關更多資訊,請參閱下一節。從 5.0.1 版開始,如果群組中訊息的順序和唯一性無關緊要,則 LIST
選項也可用。
持久性 MessageGroupStore
和延遲載入
從 4.3 版開始,所有持久性 MessageGroupStore
實例都以延遲載入方式從儲存庫中檢索 MessageGroup
實例及其 messages
。在大多數情況下,這對於關聯 MessageHandler
實例很有用 (請參閱 彙集器 和 重排序器),因為在每次關聯操作時從儲存庫載入整個 MessageGroup
會增加額外負擔。
您可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
選項從設定中關閉延遲載入行為。
我們針對 MongoDB MessageStore
(MongoDB 訊息儲存庫) 和 <aggregator>
(彙集器) 上的延遲載入效能測試,使用了類似於以下的自訂 release-strategy
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它為 1000 個簡單訊息產生類似於以下的結果
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
但是,從 5.5 版開始,所有持久性 MessageGroupStore
實作都提供了基於目標資料庫串流 API 的 streamMessagesForGroup(Object groupId)
合約。當群組在儲存庫中非常大時,這可以提高資源利用率。在框架內部,此新 API 用於 延遲器 (例如),在啟動時重新排程持久性訊息時。傳回的 Stream<Message<?>>
必須在處理結束時關閉,例如通過 try-with-resources
自動關閉。每當使用 PersistentMessageGroup
時,其 streamMessages()
都會委派給 MessageGroupStore.streamMessagesForGroup()
。
訊息群組條件
從 5.5 版開始,MessageGroup
抽象提供了一個 condition
字串選項。此選項的值可以是任何稍後可以出於任何原因解析的值,以對群組做出決策。例如,關聯訊息處理器 中的 ReleaseStrategy
可以從群組查詢此屬性,而不是迭代群組中的所有訊息。MessageGroupStore
公開了 setGroupCondition(Object groupId, String condition)
API。為此目的,已將 setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
選項新增至 AbstractCorrelatingMessageHandler
。此函數針對新增到群組的每個訊息以及群組的現有條件進行評估。實作可以決定傳回新值、現有值或將目標條件重設為 null
。condition
的值可以是 JSON、SpEL 表達式、數字或任何可以序列化為字串並隨後解析的內容。例如,來自 檔案彙集器 元件的 FileMarkerReleaseStrategy
,從 FileSplitter.FileMarker.Mark.END
訊息的 FileHeaders.LINE_COUNT
標頭將條件填入群組中,並從其 canRelease()
查詢,比較群組大小與此條件中的值。這樣,它就不會迭代群組中的所有訊息來尋找具有 FileHeaders.LINE_COUNT
標頭的 FileSplitter.FileMarker.Mark.END
訊息。它也允許結束標記在所有其他記錄之前到達彙集器;例如,在多執行緒環境中處理檔案時。
此外,為了設定方便,引入了 GroupConditionProvider
合約。AbstractCorrelatingMessageHandler
會檢查提供的 ReleaseStrategy
是否實作此介面,並提取 conditionSupplier
以進行群組條件評估邏輯。