等冪接收器企業整合模式
從 4.1 版開始,Spring Integration 提供了 等冪接收器 企業整合模式的實作。這是一個功能性模式,整個等冪邏輯應在應用程式中實作。但是,為了簡化決策過程,提供了 IdempotentReceiverInterceptor
元件。這是一個 AOP Advice
,應用於 MessageHandler.handleMessage()
方法,並且可以根據其設定 篩選
請求訊息或將其標記為 重複
。
先前,您可能已透過在 <filter/>
中使用自訂 MessageSelector
來實作此模式(例如,請參閱篩選器)。但是,由於此模式實際上定義了端點的行為,而不是端點本身,因此等冪接收器實作不提供端點元件。相反地,它應用於應用程式中宣告的端點。
IdempotentReceiverInterceptor
的邏輯基於提供的 MessageSelector
,並且如果訊息未被該選擇器接受,則會使用設定為 true
的 duplicateMessage
標頭來豐富它。目標 MessageHandler
(或下游流程)可以查詢此標頭以實作正確的等冪邏輯。如果 IdempotentReceiverInterceptor
配置了 discardChannel
或 throwExceptionOnRejection = true
,則重複訊息不會傳送到目標 MessageHandler.handleMessage()
。而是會捨棄它。如果您想捨棄(對重複訊息不做任何處理),則應將 discardChannel
配置為 NullChannel
,例如預設的 nullChannel
Bean。
為了在訊息之間維護狀態並提供比較訊息以實現等冪性的能力,我們提供了 MetadataStoreSelector
。它接受 MessageProcessor
實作(根據 Message
建立查找鍵)和可選的 ConcurrentMetadataStore
(中繼資料儲存區)。有關更多資訊,請參閱 MetadataStoreSelector
Javadoc。您也可以透過使用額外的 MessageProcessor
自訂 ConcurrentMetadataStore
的 value
。預設情況下,MetadataStoreSelector
使用 timestamp
訊息標頭。
通常,如果金鑰沒有現有值,則選擇器會選擇訊息以接受。在某些情況下,比較金鑰的目前值和新值以確定是否應接受訊息很有用。從 5.3 版開始,提供了 compareValues
屬性,該屬性引用 BiPredicate<String, String>
;第一個參數是舊值;傳回 true
以接受訊息並將舊值替換為 MetadataStore
中的新值。這對於減少金鑰數量可能很有用;例如,在處理檔案中的行時,您可以將檔案名稱儲存在金鑰中,並將目前行號儲存在值中。然後,在重新啟動後,您可以跳過已處理的行。有關範例,請參閱 等冪下游處理分割檔案。
為了方便起見,MetadataStoreSelector
選項可以直接在 <idempotent-receiver>
元件上配置。以下列表顯示了所有可能的屬性
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | IdempotentReceiverInterceptor Bean 的 ID。可選。 |
2 | 要將此攔截器應用於的消費者端點名稱(或模式)。用逗號 (, ) 分隔名稱(模式),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff" 。然後,與這些模式相符的端點 Bean 名稱將用於檢索目標端點的 MessageHandler Bean(使用其 .handler 後綴),並且 IdempotentReceiverInterceptor 將應用於這些 Bean。必填。 |
3 | MessageSelector Bean 參考。與 metadata-store 和 key-strategy (key-expression) 互斥。當未提供 selector 時,需要 key-strategy 或 key-strategy-expression 之一。 |
4 | 識別當 IdempotentReceiverInterceptor 不接受訊息時要將訊息傳送到的通道。省略時,重複訊息將轉發到具有 duplicateMessage 標頭的處理器。可選。 |
5 | ConcurrentMetadataStore 參考。由底層 MetadataStoreSelector 使用。與 selector 互斥。可選。預設的 MetadataStoreSelector 使用內部 SimpleMetadataStore ,該儲存區不會在應用程式執行之間維護狀態。 |
6 | MessageProcessor 參考。由底層 MetadataStoreSelector 使用。從請求訊息評估 idempotentKey 。與 selector 和 key-expression 互斥。當未提供 selector 時,需要 key-strategy 或 key-strategy-expression 之一。 |
7 | 用於填入 ExpressionEvaluatingMessageProcessor 的 SpEL 運算式。由底層 MetadataStoreSelector 使用。透過使用請求訊息作為評估內容根物件來評估 idempotentKey 。與 selector 和 key-strategy 互斥。當未提供 selector 時,需要 key-strategy 或 key-strategy-expression 之一。 |
8 | MessageProcessor 參考。由底層 MetadataStoreSelector 使用。從請求訊息評估 idempotentKey 的 value 。與 selector 和 value-expression 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息標頭作為 Metadata 'value'。 |
9 | 用於填入 ExpressionEvaluatingMessageProcessor 的 SpEL 運算式。由底層 MetadataStoreSelector 使用。透過使用請求訊息作為評估內容根物件來評估 idempotentKey 的 value 。與 selector 和 value-strategy 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息標頭作為中繼資料 'value'。 |
10 | 對 BiPredicate<String, String> Bean 的參考,允許您透過比較金鑰的舊值和新值來選擇性地選擇訊息;預設為 null 。 |
11 | 如果 IdempotentReceiverInterceptor 拒絕訊息,是否擲回例外。預設值為 false 。無論是否提供 discard-channel ,都會應用它。 |
對於 Java 設定,Spring Integration 提供了方法層級的 @IdempotentReceiver
註解。它用於標記具有訊息傳遞註解 (@ServiceActivator
、@Router
等)的 method
,以指定將哪些 `IdempotentReceiverInterceptor 物件應用於此端點。以下範例顯示如何使用 @IdempotentReceiver
註解
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
當您使用 Java DSL 時,您可以將攔截器新增至端點的 Advice 鏈,如下列範例所示
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
IdempotentReceiverInterceptor 僅針對 MessageHandler.handleMessage(Message<?>) 方法設計。從 4.3.1 版開始,它實作了 HandleMessageAdvice ,並以 AbstractHandleMessageAdvice 作為基底類別,以實現更好的分離。有關更多資訊,請參閱 處理訊息 Advice。 |