聚合器
基本上,聚合器是分割器的鏡像,它是一種訊息處理器,接收多個訊息並將它們組合成單一訊息。事實上,聚合器通常是包含分割器的管線中的下游消費者。
從技術上講,聚合器比分割器更複雜,因為它是有狀態的。它必須保存要聚合的訊息,並確定何時完整的一組訊息準備好進行聚合。為了做到這一點,它需要 MessageStore。
功能性
聚合器通過關聯和儲存一組相關訊息,直到該組被認為是完整的。在那個時間點,聚合器通過處理整個組來建立單一訊息,並將聚合後的訊息作為輸出發送。
實作聚合器需要提供執行聚合的邏輯(即從多個訊息建立單一訊息)。兩個相關的概念是關聯和釋放。
關聯性決定了訊息如何分組以進行聚合。在 Spring Integration 中,預設情況下,關聯是基於 IntegrationMessageHeaderAccessor.CORRELATION_ID 訊息標頭完成的。具有相同 IntegrationMessageHeaderAccessor.CORRELATION_ID 的訊息被分組在一起。但是,您可以自訂關聯策略,以允許其他方式指定訊息應如何分組在一起。為此,您可以實作 CorrelationStrategy(在本章稍後介紹)。
為了確定訊息組何時準備好被處理,會諮詢 ReleaseStrategy。聚合器的預設釋放策略是在序列中包含的所有訊息都存在時釋放組,這是基於 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 標頭。您可以通過提供對自訂 ReleaseStrategy 實作的引用來覆蓋此預設策略。
程式設計模型
聚合 API 由許多類別組成
-
介面
MessageGroupProcessor
及其子類別:MethodInvokingAggregatingMessageGroupProcessor
和ExpressionEvaluatingMessageGroupProcessor
-
ReleaseStrategy
介面及其預設實作:SimpleSequenceSizeReleaseStrategy
-
CorrelationStrategy
介面及其預設實作:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler
(AbstractCorrelatingMessageHandler
的子類別)是一個 MessageHandler
實作,封裝了聚合器(和其他關聯用例)的常見功能,如下所示
-
將訊息關聯到要聚合的組中
-
將這些訊息維護在
MessageStore
中,直到組可以被釋放 -
決定何時可以釋放組
-
將釋放的組聚合為單一訊息
-
識別並回應過期的組
決定訊息應如何分組在一起的責任委派給 CorrelationStrategy
實例。決定訊息組是否可以釋放的責任委派給 ReleaseStrategy
實例。
以下列表簡要重點介紹了基礎 AbstractAggregatingMessageGroupProcessor
(實作 aggregatePayloads
方法的責任留給開發人員)
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
請參閱 DefaultAggregatingMessageGroupProcessor
、ExpressionEvaluatingMessageGroupProcessor
和 MethodInvokingMessageGroupProcessor
作為 AbstractAggregatingMessageGroupProcessor
的現成實作。
從版本 5.2 開始,Function<MessageGroup, Map<String, Object>>
策略可用於 AbstractAggregatingMessageGroupProcessor
,以合併和計算(聚合)輸出訊息的標頭。DefaultAggregateHeadersFunction
實作提供了邏輯,該邏輯返回組中沒有衝突的所有標頭;組內一個或多個訊息上不存在的標頭不被視為衝突。衝突的標頭將被省略。連同新引入的 DelegatingMessageGroupProcessor
,此函數用於任何任意 (非 AbstractAggregatingMessageGroupProcessor
) MessageGroupProcessor
實作。本質上,框架將提供的函數注入到 AbstractAggregatingMessageGroupProcessor
實例中,並將所有其他實作包裝到 DelegatingMessageGroupProcessor
中。AbstractAggregatingMessageGroupProcessor
和 DelegatingMessageGroupProcessor
之間的邏輯差異在於,後者不會在調用委派策略之前預先計算標頭,並且如果委派返回 Message
或 AbstractIntegrationMessageBuilder
,則不會調用該函數。在這種情況下,框架假設目標實作已負責產生一組正確的標頭,並填充到返回的結果中。Function<MessageGroup, Map<String, Object>>
策略可用作 XML 設定的 headers-function
參考屬性、Java DSL 的 AggregatorSpec.headersFunction()
選項以及純 Java 設定的 AggregatorFactoryBean.setHeadersFunction()
。
CorrelationStrategy
由 AbstractCorrelatingMessageHandler
擁有,並具有基於 IntegrationMessageHeaderAccessor.CORRELATION_ID
訊息標頭的預設值,如下例所示
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至於訊息組的實際處理,預設實作是 DefaultAggregatingMessageGroupProcessor
。它建立一個單一 Message
,其酬載是為給定組接收的酬載的 List
。這對於使用分割器、發布-訂閱通道或上游接收者列表路由器的簡單分散-收集實作非常有效。
在這種情況下使用發布-訂閱通道或接收者列表路由器時,請務必啟用 apply-sequence 標誌。這樣做會新增必要的標頭:CORRELATION_ID 、SEQUENCE_NUMBER 和 SEQUENCE_SIZE 。預設情況下,分割器在 Spring Integration 中啟用此行為,但發布-訂閱通道或接收者列表路由器未啟用此行為,因為這些元件可能在各種不需要這些標頭的上下文中使用。 |
為應用程式實作特定的聚合器策略時,您可以擴展 AbstractAggregatingMessageGroupProcessor
並實作 aggregatePayloads
方法。但是,有更好的解決方案,與 API 的耦合性較低,用於實作聚合邏輯,可以通過 XML 或註解進行設定。
一般而言,任何 POJO 都可以實作聚合演算法,只要它提供一個接受單一 java.util.List
作為引數的方法(也支援參數化列表)。此方法用於聚合訊息,如下所示
-
如果引數是
java.util.Collection<T>
且參數類型 T 可分配給Message
,則為聚合累積的整個訊息列表將發送到聚合器。 -
如果引數是非參數化的
java.util.Collection
或參數類型不可分配給Message
,則該方法接收累積訊息的酬載。 -
如果返回類型不可分配給
Message
,則它被視為框架自動建立的Message
的酬載。
為了程式碼的簡潔性並促進低耦合性、可測試性等最佳實踐,實作聚合邏輯的首選方法是通過 POJO 並使用 XML 或註解支援在應用程式中設定它。 |
從版本 5.3 開始,在處理訊息組後,AbstractCorrelatingMessageHandler
會執行 MessageBuilder.popSequenceDetails()
訊息標頭修改,以適用於具有多個巢狀層級的正確分割器-聚合器場景。僅當訊息組釋放結果不是訊息集合時才執行此操作。在這種情況下,目標 MessageGroupProcessor
負責在建構這些訊息時調用 MessageBuilder.popSequenceDetails()
。
如果 MessageGroupProcessor
返回 Message
,則僅當 sequenceDetails
與組中的第一個訊息匹配時,才對輸出訊息執行 MessageBuilder.popSequenceDetails()
。(以前,只有在從 MessageGroupProcessor
返回純酬載或 AbstractIntegrationMessageBuilder
時才會執行此操作。)
此功能可以通過新的 popSequence
布林屬性來控制,因此在某些情況下,當關聯詳細資訊尚未由標準分割器填充時,可以停用 MessageBuilder.popSequenceDetails()
。本質上,此屬性撤銷了最接近的上游 applySequence = true
在 AbstractMessageSplitter
中所做的事情。有關更多資訊,請參閱 分割器。
SimpleMessageGroup.getMessages() 方法返回 unmodifiableCollection 。因此,如果聚合 POJO 方法具有 Collection<Message> 參數,則傳遞的引數正是該 Collection 實例,並且當您為聚合器使用 SimpleMessageStore 時,原始 Collection<Message> 會在釋放組後被清除。因此,如果 Collection<Message> 變數從聚合器中傳遞出來,則 POJO 中的 Collection<Message> 變數也會被清除。如果您希望簡單地按原樣釋放該集合以進行進一步處理,則必須建立一個新的 Collection (例如,new ArrayList<Message>(messages) )。從版本 4.3 開始,框架不再將訊息複製到新的集合中,以避免不必要的額外物件建立。 |
在版本 4.2 之前,無法通過使用 XML 設定來提供 MessageGroupProcessor
。只有 POJO 方法可以用於聚合。現在,如果框架檢測到引用的(或內部)Bean 實作了 MessageProcessor
,則將其用作聚合器的輸出處理器。
如果您希望從自訂 MessageGroupProcessor
釋放物件集合作為訊息的酬載,則您的類別應擴展 AbstractAggregatingMessageGroupProcessor
並實作 aggregatePayloads()
。
此外,自版本 4.2 以來,還提供了 SimpleMessageGroupProcessor
。它返回組中的訊息集合,如前所述,這會導致釋放的訊息單獨發送。
這使得聚合器可以作為訊息屏障工作,其中到達的訊息被保留,直到釋放策略觸發,並且組被釋放為一系列單獨的訊息。
從版本 6.0 開始,上述分割行為僅在組處理器是 SimpleMessageGroupProcessor
時才有效。否則,對於任何其他返回 Collection<Message>
的 MessageGroupProcessor
實作,只會發出單一回覆訊息,其中整個訊息集合作為其酬載。這種邏輯是由聚合器的規範目的決定的 - 按某個鍵收集請求訊息並產生單一分組訊息。
ReleaseStrategy
ReleaseStrategy
介面定義如下
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
一般而言,任何 POJO 都可以實作完成決策邏輯,只要它提供一個接受單一 java.util.List
作為引數的方法(也支援參數化列表)並返回布林值。此方法在每個新訊息到達後調用,以決定組是否完成,如下所示
-
如果引數是
java.util.List<T>
且參數類型 T 可分配給Message
,則組中累積的整個訊息列表將發送到該方法。 -
如果引數是非參數化的
java.util.List
或參數類型不可分配給Message
,則該方法接收累積訊息的酬載。 -
如果訊息組準備好進行聚合,則該方法必須返回
true
,否則返回false
。
以下範例示範如何將 @ReleaseStrategy
註解用於 Message
類型的 List
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下範例示範如何將 @ReleaseStrategy
註解用於 String
類型的 List
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
基於前面兩個範例中的簽名,基於 POJO 的釋放策略會傳遞一個尚未釋放的訊息 Collection
(如果您需要訪問整個 Message
)或一個酬載物件 Collection
(如果類型參數不是 Message
)。這滿足了大多數用例。但是,如果由於某種原因,您需要訪問完整的 MessageGroup
,則應提供 ReleaseStrategy
介面的實作。
在處理潛在的大型組時,您應該了解這些方法是如何調用的,因為釋放策略可能會在組釋放之前被多次調用。最有效率的是 基於這些原因,對於大型組,我們建議您實作 |
當組被釋放以進行聚合時,其所有尚未釋放的訊息都會被處理並從組中移除。如果組也已完成(即,如果序列中的所有訊息都已到達或未定義序列),則組將被標記為已完成。此組的任何新訊息都會發送到丟棄通道(如果已定義)。將 expire-groups-upon-completion
設定為 true
(預設為 false
)會移除整個組,並且任何新訊息(與已移除組具有相同的關聯 ID)都會形成一個新組。您可以通過使用 MessageGroupStoreReaper
並將 send-partial-result-on-expiry
設定為 true
來釋放部分序列。
為了方便丟棄遲到的訊息,聚合器必須在組釋放後維護有關組的狀態。這最終可能會導致記憶體不足的情況。為了避免這種情況,您應該考慮設定 MessageGroupStoreReaper 以移除組元數據。應設定過期參數,以便在達到預期不會有遲到訊息到達的點後使組過期。有關設定 reaper 的資訊,請參閱 Managing State in an Aggregator: MessageGroupStore 。 |
Spring Integration 為 ReleaseStrategy
提供了一個實作:SimpleSequenceSizeReleaseStrategy
。此實作會查詢每個到達訊息的 SEQUENCE_NUMBER
和 SEQUENCE_SIZE
標頭,以決定訊息組何時完成並準備好聚合。如前所示,它也是預設策略。
在版本 5.0 之前,預設釋放策略是 SequenceSizeReleaseStrategy ,它在大型組中表現不佳。使用該策略,會檢測到重複的序列號並拒絕。此操作可能很耗費資源。 |
如果您要聚合大型組,不需要釋放部分組,並且不需要檢測/拒絕重複的序列,請考慮改用 SimpleSequenceSizeReleaseStrategy
- 它對於這些用例來說效率更高,並且自版本 5.0 以來,當未指定部分組釋放時,它是預設策略。
聚合大型組
4.3 版本變更了 SimpleMessageGroup
中訊息的預設 Collection
為 HashSet
(先前為 BlockingQueue
)。當從大型群組中移除個別訊息時,這項操作的成本很高 (需要 O(n) 線性掃描)。雖然雜湊集合通常移除速度快得多,但對於大型訊息來說,成本可能會很高,因為雜湊必須在插入和移除時都進行計算。如果您的訊息雜湊成本很高,請考慮使用其他集合類型。如 使用 MessageGroupFactory
中所述,提供了 SimpleMessageGroupFactory
,以便您可以選擇最符合您需求的 Collection
。您也可以提供自己的 factory 實作來建立其他 Collection<Message<?>>
。
以下範例示範如何使用先前的實作和 SimpleSequenceSizeReleaseStrategy
設定聚合器
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果篩選器端點參與了聚合器的上游流程,則序列大小釋放策略 (固定或基於 sequenceSize 標頭) 將無法達到其目的,因為序列中的某些訊息可能會被篩選器丟棄。在這種情況下,建議選擇另一個 ReleaseStrategy ,或使用從丟棄子流程傳送的補償訊息,這些訊息在其內容中攜帶一些資訊,以便在自訂完成群組函式中跳過。請參閱 篩選器 以取得更多資訊。 |
關聯策略
CorrelationStrategy
介面定義如下
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
此方法傳回一個 Object
,代表用於將訊息與訊息群組關聯的關聯鍵。對於 Map
中的鍵,此鍵必須滿足 equals()
和 hashCode()
實作所使用的準則。
一般來說,任何 POJO 都可以實作關聯邏輯,而將訊息對應到方法引數 (或多個引數) 的規則與 ServiceActivator
的規則相同 (包括對 @Header
註解的支援)。方法必須傳回一個值,且該值不得為 null
。
Spring Integration 提供了 CorrelationStrategy
的實作:HeaderAttributeCorrelationStrategy
。此實作傳回訊息標頭之一的值 (其名稱由建構子引數指定) 作為關聯鍵。預設情況下,關聯策略是 HeaderAttributeCorrelationStrategy
,它傳回 CORRELATION_ID
標頭屬性的值。如果您有想要用於關聯的自訂標頭名稱,您可以在 HeaderAttributeCorrelationStrategy
的實例上設定它,並將其作為聚合器關聯策略的參考提供。
鎖定註冊表
群組的變更具有執行緒安全性。因此,當您同時為相同的關聯 ID 傳送訊息時,只有其中一個會在聚合器中處理,使其有效地成為每個訊息群組的單執行緒。LockRegistry
用於取得已解析關聯 ID 的鎖定。預設情況下使用 DefaultLockRegistry
(記憶體內)。為了同步跨伺服器的更新,其中正在使用共用 MessageGroupStore
,您必須設定共用鎖定註冊表。
避免死鎖
如上所述,當訊息群組被變更 (新增或釋放訊息) 時,會持有鎖定。
考慮以下流程
...->aggregator1-> ... ->aggregator2-> ...
如果有複數執行緒,且聚合器共用一個通用鎖定註冊表,則可能會發生死鎖。這將導致執行緒掛起,而 jstack <pid>
可能會呈現如下結果
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有幾種方法可以避免此問題
-
確保每個聚合器都有自己的鎖定註冊表 (這可以是跨應用程式實例的共用註冊表,但流程中的兩個或多個聚合器都必須各自具有不同的註冊表)
-
使用
ExecutorChannel
或QueueChannel
作為聚合器的輸出通道,以便下游流程在新執行緒上執行 -
從 5.1.1 版本開始,將
releaseLockBeforeSend
聚合器屬性設定為true
如果由於某些原因,單個聚合器的輸出最終路由回同一個聚合器,也可能導致此問題。當然,上述第一個解決方案在此情況下不適用。 |
在 Java DSL 中設定聚合器
請參閱 聚合器和重新排序器,以了解如何在 Java DSL 中設定聚合器。
使用 XML 設定聚合器
Spring Integration 支援透過 <aggregator/>
元素使用 XML 設定聚合器。以下範例顯示聚合器的範例
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合器的 ID 是選用的。 |
2 | 生命週期屬性,指示是否應在應用程式內容啟動期間啟動聚合器。選用 (預設值為 'true')。 |
3 | 聚合器從中接收訊息的通道。必要。 |
4 | 聚合器將聚合結果傳送到的通道。選用 (因為傳入訊息本身可以在 'replyChannel' 訊息標頭中指定回覆通道)。 |
5 | 聚合器將逾時訊息傳送到的通道 (如果 send-partial-result-on-expiry 為 false )。選用。 |
6 | 對 MessageGroupStore 的參考,用於在訊息群組完成之前,將其儲存在關聯鍵下。選用。預設情況下,它是揮發性記憶體內儲存區。請參閱 訊息儲存區 以取得更多資訊。 |
7 | 當多個處理常式訂閱相同的 DirectChannel 時,此聚合器的順序 (用於負載平衡目的)。選用。 |
8 | 指示過期的訊息應被聚合並傳送到 'output-channel' 或 'replyChannel',一旦它們包含的 MessageGroup 過期 (請參閱 MessageGroupStore.expireMessageGroups(long) )。使 MessageGroup 過期的一種方法是設定 MessageGroupStoreReaper 。但是,您也可以透過呼叫 MessageGroupStore.expireMessageGroups(timeout) 使 MessageGroup 過期。您可以透過控制匯流排操作來完成此操作,或者,如果您有對 MessageGroupStore 實例的參考,則可以調用 expireMessageGroups(timeout) 。否則,此屬性本身不會執行任何操作。它僅作為指示器,指示是否要丟棄或傳送到輸出或回覆通道任何仍然在即將過期的 MessageGroup 中的訊息。選用 (預設值為 false )。注意:此屬性可能更適當地稱為 send-partial-result-on-timeout ,因為如果 expire-groups-upon-timeout 設定為 false ,則群組實際上可能不會過期。 |
9 | 當傳送回覆 Message 到 output-channel 或 discard-channel 時,要等待的逾時間隔。預設為 30 秒。它僅在輸出通道有一些“傳送”限制時應用,例如具有固定“容量”的 QueueChannel 。在這種情況下,會擲回 MessageDeliveryException 。對於 AbstractSubscribableChannel 實作,會忽略 send-timeout 。對於 group-timeout(-expression) ,來自排程過期任務的 MessageDeliveryException 會導致重新排程此任務。選用。 |
10 | 對實作訊息關聯 (群組) 演算法的 bean 的參考。bean 可以是 CorrelationStrategy 介面的實作或 POJO。在後一種情況下,也必須定義 correlation-strategy-method 屬性。選用 (預設情況下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 標頭)。 |
11 | 在 correlation-strategy 參考的 bean 上定義的方法。它實作關聯決策演算法。選用,帶有限制 (correlation-strategy 必須存在)。 |
12 | 代表關聯策略的 SpEL 運算式。範例:"headers['something']" 。只允許 correlation-strategy 或 correlation-strategy-expression 中的一個。 |
13 | 對在應用程式內容中定義的 bean 的參考。bean 必須實作聚合邏輯,如先前所述。選用 (預設情況下,聚合訊息的清單成為輸出訊息的有效負載)。 |
14 | 在 ref 屬性參考的 bean 上定義的方法。它實作訊息聚合演算法。選用 (它取決於 ref 屬性是否已定義)。 |
15 | 對實作釋放策略的 bean 的參考。bean 可以是 ReleaseStrategy 介面的實作或 POJO。在後一種情況下,也必須定義 release-strategy-method 屬性。選用 (預設情況下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 標頭屬性)。 |
16 | 在 release-strategy 屬性參考的 bean 上定義的方法。它實作完成決策演算法。選用,帶有限制 (release-strategy 必須存在)。 |
17 | 代表釋放策略的 SpEL 運算式。運算式的根物件是 MessageGroup 。範例:"size() == 5" 。只允許 release-strategy 或 release-strategy-expression 中的一個。 |
18 | 當設定為 true (預設值為 false ) 時,已完成的群組會從訊息儲存區中移除,讓具有相同關聯的後續訊息形成一個新群組。預設行為是將具有與已完成群組相同關聯的訊息傳送到 discard-channel 。 |
19 | 僅當為 <aggregator> 的 MessageStore 設定 MessageGroupStoreReaper 時才適用。預設情況下,當 MessageGroupStoreReaper 設定為使部分群組過期時,也會移除空群組。空群組在群組正常釋放後存在。空群組能夠偵測和丟棄延遲到達的訊息。如果您希望空群組的過期排程比部分群組的過期排程更長,請設定此屬性。然後,空群組將不會從 MessageStore 中移除,直到它們至少在此毫秒數內未被修改為止。請注意,空群組的實際過期時間也受到 reaper 的 timeout 屬性的影響,並且可能高達此值加上逾時時間。 |
20 | 對 org.springframework.integration.util.LockRegistry bean 的參考。它用於根據 groupId 取得 Lock ,以用於對 MessageGroup 進行並行操作。預設情況下,使用內部 DefaultLockRegistry 。使用分散式 LockRegistry (例如 ZookeeperLockRegistry ) 可確保只有聚合器的一個實例可以同時對群組進行操作。請參閱 Redis 鎖定註冊表 或 Zookeeper 鎖定註冊表 以取得更多資訊。 |
21 | 逾時 (以毫秒為單位),用於在 ReleaseStrategy 在目前訊息到達時未釋放群組時,強制 MessageGroup 完成。此屬性為聚合器提供內建的基於時間的釋放策略,當新的訊息在逾時時間內未到達 MessageGroup 時,如果需要發出部分結果 (或丟棄群組),則逾時時間從上次訊息到達時開始計算。若要設定從 MessageGroup 建立時間開始計算的逾時時間,請參閱 group-timeout-expression 資訊。當新訊息到達聚合器時,會取消其 MessageGroup 的任何現有 ScheduledFuture<?> 。如果 ReleaseStrategy 傳回 false (表示不釋放) 且 groupTimeout > 0 ,則會排程新任務以使群組過期。我們不建議將此屬性設定為零 (或負值)。這樣做會有效地停用聚合器,因為每個訊息群組都會立即完成。但是,您可以透過使用運算式有條件地將其設定為零 (或負值)。請參閱 group-timeout-expression 以取得資訊。完成期間採取的動作取決於 ReleaseStrategy 和 send-partial-group-on-expiry 屬性。請參閱 聚合器和群組逾時 以取得更多資訊。它與 'group-timeout-expression' 屬性互斥。 |
22 | SpEL 運算式,評估為 groupTimeout ,其中 MessageGroup 作為 #root 評估內容物件。用於排程要強制完成的 MessageGroup 。如果運算式評估為 null ,則不會排程完成。如果它評估為零,則群組會立即在目前執行緒上完成。實際上,這提供了動態 group-timeout 屬性。例如,如果您希望在群組建立時間起經過 10 秒後強制完成 MessageGroup ,您可以考慮使用以下 SpEL 運算式:timestamp + 10000 - T(System).currentTimeMillis() ,其中 timestamp 由 MessageGroup.getTimestamp() 提供,因為此處的 MessageGroup 是 #root 評估內容物件。但是請記住,群組建立時間可能與第一個到達訊息的時間不同,具體取決於其他群組過期屬性的設定。請參閱 group-timeout 以取得更多資訊。與 'group-timeout' 屬性互斥。 |
23 | 當群組由於逾時 (或由 MessageGroupStoreReaper ) 而完成時,預設情況下,群組會過期 (完全移除)。延遲到達的訊息會啟動一個新群組。您可以將其設定為 false 以完成群組,但使其元資料保持不變,以便丟棄延遲到達的訊息。稍後可以使用 MessageGroupStoreReaper 以及 empty-group-min-timeout 屬性使空群組過期。它預設為 'true'。 |
24 | TaskScheduler bean 參考,用於排程 MessageGroup 以強制完成,如果在 groupTimeout 內沒有新訊息到達 MessageGroup 。如果未提供,則使用在 ApplicationContext (ThreadPoolTaskScheduler ) 中註冊的預設排程器 (taskScheduler )。如果未指定 group-timeout 或 group-timeout-expression ,則此屬性不適用。 |
25 | 自 4.1 版本起。它允許為 forceComplete 操作啟動交易。它從 group-timeout(-expression) 或由 MessageGroupStoreReaper 啟動,並且不適用於正常的 add 、release 和 discard 操作。僅允許此子元素或 <expire-advice-chain/> 。 |
26 | 自版本 4.1 起。它允許為 forceComplete 操作設定任何 Advice 。它從 group-timeout(-expression) 或由 MessageGroupStoreReaper 啟動,並且不適用於正常的 add 、release 和 discard 操作。僅允許此子元素或 <expire-transactional/> 。也可以在此處使用 Spring tx 命名空間設定交易 Advice 。 |
使群組過期
有兩個與使群組過期 (完全移除) 相關的屬性。當群組過期時,沒有它的記錄,並且,如果新的訊息以相同的關聯到達,則會啟動一個新群組。當群組完成 (不過期) 時,空群組會保留,並且延遲到達的訊息會被丟棄。稍後可以使用
如果群組未正常完成,而是由於逾時而釋放或丟棄,則群組通常會過期。自 4.1 版本起,您可以使用
自 5.0 版本起,空群組也會在 從 5.4 版本開始,可以將聚合器 (和重新排序器) 設定為使孤立群組過期 (持久訊息儲存區中的群組,否則可能不會被釋放)。 |
如果自訂聚合器處理常式實作可以在其他 <aggregator>
定義中被參考,我們通常建議使用 ref
屬性。但是,如果自訂聚合器實作僅由 <aggregator>
的單個定義使用,則可以使用內部 bean 定義 (從 1.0.3 版本開始) 在 <aggregator>
元素中設定聚合 POJO,如下列範例所示
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一個 <aggregator> 設定中同時使用 ref 屬性和內部 bean 定義是不允許的,因為它會建立不明確的條件。在這種情況下,會擲回例外狀況。 |
以下範例顯示聚合器 bean 的實作
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
先前範例的完成策略 bean 的實作可能如下所示
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
在有意義的情況下,釋放策略方法和聚合器方法可以組合到一個 bean 中。 |
上述範例的關聯策略 bean 的實作可能如下所示
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
先前範例中的聚合器會根據某些準則 (在此範例中,是除以十後的餘數) 將數字分組,並將群組保留到有效負載提供的數字總和超過特定值為止。
在有意義的情況下,釋放策略方法、關聯策略方法和聚合器方法可以組合在一個 bean 中。(實際上,它們全部或其中任意兩個都可以組合。) |
聚合器和 Spring 運算式語言 (SpEL)
自 Spring Integration 2.0 起,您可以使用 SpEL 處理各種策略 (關聯、釋放和聚合),如果此類釋放策略背後的邏輯相對簡單,我們建議使用 SpEL。假設您有一個舊版元件,旨在接收物件陣列。我們知道預設釋放策略會將所有聚合訊息組合成 List
。現在我們有兩個問題。首先,我們需要從清單中提取個別訊息。其次,我們需要提取每個訊息的有效負載並組裝物件陣列。以下範例解決了這兩個問題
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
但是,使用 SpEL,實際上可以使用一行運算式相對輕鬆地處理此類需求,從而免去您編寫自訂類別並將其設定為 bean 的麻煩。以下範例示範如何執行此操作
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在先前的設定中,我們使用 集合投影 運算式從清單中所有訊息的有效負載組裝一個新集合,然後將其轉換為陣列,從而達到與先前 Java 程式碼相同的結果。
當處理自訂釋放和關聯策略時,您可以應用相同的基於運算式的方法。
您可以在 correlation-strategy-expression
屬性中將您的簡單關聯邏輯實作為 SpEL 運算式並進行設定,而不是在 correlation-strategy
屬性中為自訂 CorrelationStrategy
定義 bean,如下列範例所示
correlation-strategy-expression="payload.person.id"
在先前的範例中,我們假設有效負載具有帶有 id
的 person
屬性,該屬性將用於關聯訊息。
同樣,對於 ReleaseStrategy
,您可以將您的釋放邏輯實作為 SpEL 運算式,並在 release-strategy-expression
屬性中設定它。評估內容的根物件是 MessageGroup
本身。訊息的 List
可以透過使用群組的 message
屬性在運算式中參考。
在 5.0 之前的版本中,根物件是 Message<?> 的集合,如下列範例所示 |
release-strategy-expression="!messages.?[payload==5].empty"
在先前的範例中,SpEL 評估內容的根物件是 MessageGroup
本身,並且您聲明,只要此群組中存在有效負載為 5
的訊息,就應釋放該群組。
聚合器和群組逾時
自 4.0 版本起,引入了兩個新的互斥屬性:group-timeout
和 group-timeout-expression
。請參閱 使用 XML 設定聚合器。在某些情況下,如果 ReleaseStrategy
在目前訊息到達時未釋放,您可能需要在逾時後發出聚合器結果 (或丟棄群組)。為此,groupTimeout
選項允許排程強制 MessageGroup
完成,如下列範例所示
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
在此範例中,如果聚合器接收到序列中的最後一個訊息 (由 release-strategy-expression
定義),則可以正常釋放。如果該特定訊息未到達,則 groupTimeout
會在十秒後強制群組完成,只要群組至少包含兩個訊息。
強制群組完成的結果取決於 ReleaseStrategy
和 send-partial-result-on-expiry
。首先,再次諮詢釋放策略,以查看是否要進行正常釋放。雖然群組沒有變更,但 ReleaseStrategy
可以決定此時釋放群組。如果釋放策略仍然沒有釋放群組,則會過期。如果 send-partial-result-on-expiry
為 true
,則 (部分) MessageGroup
中的現有訊息會作為正常聚合器回覆訊息釋放到 output-channel
。否則,它會被丟棄。
groupTimeout
行為和 MessageGroupStoreReaper
之間存在差異 (請參閱 使用 XML 設定聚合器)。reaper 定期啟動 MessageGroupStore
中所有 MessageGroup
的強制完成。如果新訊息在 groupTimeout
期間未到達,則 groupTimeout
會為每個 MessageGroup
單獨執行此操作。此外,reaper 可以用於移除空群組 (為了在 expire-groups-upon-completion
為 false 時丟棄延遲訊息,會保留空群組)。
從 5.5 版本開始,groupTimeoutExpression
可以評估為 java.util.Date
實例。這在某些情況下很有用,例如根據群組建立時間 (MessageGroup.getTimestamp()
) 而不是目前訊息到達時間來確定排程任務時刻,因為當 groupTimeoutExpression
評估為 long
時會計算目前訊息到達時間。
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用註解設定聚合器
以下範例顯示使用註解設定的聚合器
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 指示此方法應作為聚合器使用的註解。如果此類別用作聚合器,則必須指定此註解。 |
2 | 指示此方法用作聚合器的釋放策略的註解。如果任何方法上不存在此註解,則聚合器會使用 SimpleSequenceSizeReleaseStrategy 。 |
3 | 指示此方法應用作聚合器的關聯策略的註解。如果未指示任何關聯策略,則聚合器會使用基於 CORRELATION_ID 的 HeaderAttributeCorrelationStrategy 。 |
XML 元素提供的所有設定選項也適用於 @Aggregator
註解。
可以從 XML 明確參考聚合器,或者,如果在類別上定義了 @MessageEndpoint
,則可以透過類別路徑掃描自動偵測聚合器。
聚合器元件的註解設定 (@Aggregator
和其他註解) 僅涵蓋簡單的使用案例,在這些案例中,大多數預設選項都已足夠。如果您在使用註解設定時需要更多地控制這些選項,請考慮為 AggregatingMessageHandler
使用 @Bean
定義,並使用 @ServiceActivator
標記其 @Bean
方法,如下列範例所示
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
請參閱 程式設計模型 和 @Bean
方法上的註解 以取得更多資訊。
自 4.2 版本起,AggregatorFactoryBean 可用於簡化 AggregatingMessageHandler 的 Java 設定。 |
管理聚合器中的狀態:MessageGroupStore
聚合器 (以及 Spring Integration 中的其他一些模式) 是一種有狀態模式,需要根據一段時間內到達的一組訊息 (所有訊息都具有相同的關聯鍵) 做出決策。有狀態模式中介面的設計 (例如 ReleaseStrategy
) 是由以下原則驅動的:元件 (無論是由框架定義還是由使用者定義) 應能夠保持無狀態。所有狀態都由 MessageGroup
攜帶,其管理委派給 MessageGroupStore
。MessageGroupStore
介面定義如下
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
如需更多資訊,請參閱 Javadoc。
MessageGroupStore
在等待觸發釋放策略時,在 MessageGroups
中累積狀態資訊,並且該事件可能永遠不會發生。因此,為了防止過時的訊息持續存在,並為了讓揮發性儲存區在應用程式關閉時提供清理的掛鉤,MessageGroupStore
允許您註冊回呼,以在 MessageGroups
過期時應用於它們。介面非常簡單,如下列清單所示
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回呼可以直接存取儲存區和訊息群組,以便它可以管理持久狀態 (例如,透過完全從儲存區中移除群組)。
MessageGroupStore
維護這些回呼的清單,它會根據需要將其應用於時間戳記早於作為參數提供的時間的所有訊息 (請參閱先前描述的 registerMessageGroupExpiryCallback(..)
和 expireMessageGroups(..)
方法)。
當您打算依賴 expireMessageGroups 功能時,請務必不要在不同的聚合器元件中使用相同的 MessageGroupStore 實例。每個 AbstractCorrelatingMessageHandler 都會根據 forceComplete() 回呼註冊自己的 MessageGroupCallback 。這樣,每個要過期的群組都可能被錯誤的聚合器完成或丟棄。從 5.0.10 版本開始,UniqueExpiryCallback 從 AbstractCorrelatingMessageHandler 用於 MessageGroupStore 中的註冊回呼。反過來,MessageGroupStore 會檢查此類別的實例是否存在,如果回呼集中已存在一個實例,則會記錄錯誤並顯示適當的訊息。這樣,框架會禁止在不同的聚合器/重新排序器中使用 MessageGroupStore 實例,以避免提到的過期副作用,即群組不是由特定的關聯處理常式建立的。 |
您可以使用逾時值呼叫 expireMessageGroups
方法。任何早於目前時間減去此值的訊息都會過期,並套用回呼。因此,是儲存區的使用者定義了訊息群組“過期”的含義。
為了方便使用者,Spring Integration 以 MessageGroupStoreReaper
的形式提供了訊息過期的包裝器,如下列範例所示
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
reaper 是一個 Runnable
。在先前的範例中,訊息群組儲存區的過期方法每十秒鐘呼叫一次。逾時時間本身為 30 秒。
務必了解 MessageGroupStoreReaper 的 'timeout' 屬性是一個近似值,並且會受到任務排程器速率的影響,因為此屬性僅在下次排程執行 MessageGroupStoreReaper 任務時檢查。例如,如果逾時時間設定為十分鐘,但 MessageGroupStoreReaper 任務排程為每小時執行一次,並且上次執行 MessageGroupStoreReaper 任務發生在逾時時間前一分鐘,則 MessageGroup 在接下來的 59 分鐘內不會過期。因此,我們建議將速率設定為至少等於逾時時間的值或更短。 |
除了 reaper 之外,當應用程式透過 AbstractCorrelatingMessageHandler
中的生命週期回呼關閉時,也會調用過期回呼。
AbstractCorrelatingMessageHandler
註冊了自己的過期回呼,這與聚合器 XML 設定中的布林值旗標 send-partial-result-on-expiry
相關聯。如果旗標設定為 true
,則當調用過期回呼時,尚未釋放的群組中的任何未標記訊息都可以傳送到輸出通道。
由於 MessageGroupStoreReaper 是從排程任務呼叫的,並且可能會導致產生訊息 (取決於 sendPartialResultOnExpiry 選項) 到下游整合流程,因此建議提供具有 MessagePublishingErrorHandler 的自訂 TaskScheduler ,以透過 errorChannel 處理例外狀況,因為常規聚合器釋放功能可能會預期此情況。相同的邏輯適用於也依賴 TaskScheduler 的群組逾時功能。請參閱 錯誤處理 以取得更多資訊。 |
當不同的關聯端點使用共用的 某些 關於 |
Flux 聚合器
在 5.2 版本中,引入了 FluxAggregatorMessageHandler
組件。 它基於 Project Reactor 的 Flux.groupBy()
和 Flux.window()
運算子。 傳入的消息會被發送到由這個組件的建構子中 Flux.create()
初始化的 FluxSink
。 如果未提供 outputChannel
,或者它不是 ReactiveStreamsSubscribableChannel
的實例,則對主 Flux
的訂閱會從 Lifecycle.start()
實作中完成。 否則,它會延遲到由 ReactiveStreamsSubscribableChannel
實作完成的訂閱。 消息透過 Flux.groupBy()
使用 CorrelationStrategy
進行分組,以取得群組鍵。 預設情況下,會參考消息的 IntegrationMessageHeaderAccessor.CORRELATION_ID
標頭。
預設情況下,每個關閉的視窗都會作為消息酬載中的 Flux
發布以供產生。 這個消息包含視窗中第一個消息的所有標頭。 輸出消息酬載中的這個 Flux
必須被訂閱並在下游處理。 這種邏輯可以透過 FluxAggregatorMessageHandler
的 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
配置選項進行自訂(或取代)。 例如,如果我們希望最終消息中包含酬載的 List
,我們可以像這樣配置 Flux.collectList()
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
FluxAggregatorMessageHandler
中有幾個選項可以選擇適當的視窗策略
-
setBoundaryTrigger(Predicate<Message<?>>)
- 傳遞到Flux.windowUntil()
運算子。 有關更多資訊,請參閱其 JavaDocs。 優先於所有其他視窗選項。 -
setWindowSize(int)
和setWindowSizeFunction(Function<Message<?>, Integer>)
- 傳遞到Flux.window(int)
或windowTimeout(int, Duration)
。 預設情況下,視窗大小是從群組中的第一個消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
標頭計算得出的。 -
setWindowTimespan(Duration)
- 傳遞到Flux.window(Duration)
或windowTimeout(int, Duration)
,具體取決於視窗大小配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一個將轉換應用於分組的 fluxes 的函數,用於任何未涵蓋在公開選項中的自訂視窗操作。
由於這個組件是一個 MessageHandler
實作,它可以簡單地作為 @Bean
定義與 @ServiceActivator
消息註解一起使用。 使用 Java DSL,它可以從 .handle()
EIP 方法中使用。 下面的範例示範了我們如何在運行時註冊 IntegrationFlow
,以及如何將 FluxAggregatorMessageHandler
與上游的 splitter 相關聯
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);
消息群組的條件
從 5.5 版本開始,AbstractCorrelatingMessageHandler
(包括其 Java 和 XML DSL)公開了 groupConditionSupplier
選項,即 BiFunction<Message<?>, String, String>
實作。 這個函數用於添加到群組的每個消息,並且結果條件語句會儲存在群組中以供未來考慮。 ReleaseStrategy
可以參考這個條件,而不是迭代群組中的所有消息。 有關更多資訊,請參閱 GroupConditionProvider
JavaDocs 和 Message Group Condition。
另請參閱 File Aggregator。