等冪接收器企業整合模式

從 4.1 版開始,Spring Integration 提供了 等冪接收器 企業整合模式的實作。這是一個功能性模式,整個等冪邏輯應在應用程式中實作。但是,為了簡化決策過程,提供了 IdempotentReceiverInterceptor 元件。這是一個 AOP Advice,應用於 MessageHandler.handleMessage() 方法,並且可以根據其設定 篩選 請求訊息或將其標記為 重複

先前,您可能已透過在 <filter/> 中使用自訂 MessageSelector 來實作此模式(例如,請參閱篩選器)。但是,由於此模式實際上定義了端點的行為,而不是端點本身,因此等冪接收器實作不提供端點元件。相反地,它應用於應用程式中宣告的端點。

IdempotentReceiverInterceptor 的邏輯基於提供的 MessageSelector,並且如果訊息未被該選擇器接受,則會使用設定為 trueduplicateMessage 標頭來豐富它。目標 MessageHandler(或下游流程)可以查詢此標頭以實作正確的等冪邏輯。如果 IdempotentReceiverInterceptor 配置了 discardChannelthrowExceptionOnRejection = true,則重複訊息不會傳送到目標 MessageHandler.handleMessage()。而是會捨棄它。如果您想捨棄(對重複訊息不做任何處理),則應將 discardChannel 配置為 NullChannel,例如預設的 nullChannel Bean。

為了在訊息之間維護狀態並提供比較訊息以實現等冪性的能力,我們提供了 MetadataStoreSelector。它接受 MessageProcessor 實作(根據 Message 建立查找鍵)和可選的 ConcurrentMetadataStore中繼資料儲存區)。有關更多資訊,請參閱 MetadataStoreSelector Javadoc。您也可以透過使用額外的 MessageProcessor 自訂 ConcurrentMetadataStorevalue。預設情況下,MetadataStoreSelector 使用 timestamp 訊息標頭。

通常,如果金鑰沒有現有值,則選擇器會選擇訊息以接受。在某些情況下,比較金鑰的目前值和新值以確定是否應接受訊息很有用。從 5.3 版開始,提供了 compareValues 屬性,該屬性引用 BiPredicate<String, String>;第一個參數是舊值;傳回 true 以接受訊息並將舊值替換為 MetadataStore 中的新值。這對於減少金鑰數量可能很有用;例如,在處理檔案中的行時,您可以將檔案名稱儲存在金鑰中,並將目前行號儲存在值中。然後,在重新啟動後,您可以跳過已處理的行。有關範例,請參閱 等冪下游處理分割檔案

為了方便起見,MetadataStoreSelector 選項可以直接在 <idempotent-receiver> 元件上配置。以下列表顯示了所有可能的屬性

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1 IdempotentReceiverInterceptor Bean 的 ID。可選。
2 要將此攔截器應用於的消費者端點名稱(或模式)。用逗號 (,) 分隔名稱(模式),例如 endpoint="aaa, bbb*, ccc, *ddd, eee*fff"。然後,與這些模式相符的端點 Bean 名稱將用於檢索目標端點的 MessageHandler Bean(使用其 .handler 後綴),並且 IdempotentReceiverInterceptor 將應用於這些 Bean。必填。
3 MessageSelector Bean 參考。與 metadata-storekey-strategy (key-expression) 互斥。當未提供 selector 時,需要 key-strategykey-strategy-expression 之一。
4 識別當 IdempotentReceiverInterceptor 不接受訊息時要將訊息傳送到的通道。省略時,重複訊息將轉發到具有 duplicateMessage 標頭的處理器。可選。
5 ConcurrentMetadataStore 參考。由底層 MetadataStoreSelector 使用。與 selector 互斥。可選。預設的 MetadataStoreSelector 使用內部 SimpleMetadataStore,該儲存區不會在應用程式執行之間維護狀態。
6 MessageProcessor 參考。由底層 MetadataStoreSelector 使用。從請求訊息評估 idempotentKey。與 selectorkey-expression 互斥。當未提供 selector 時,需要 key-strategykey-strategy-expression 之一。
7 用於填入 ExpressionEvaluatingMessageProcessor 的 SpEL 運算式。由底層 MetadataStoreSelector 使用。透過使用請求訊息作為評估內容根物件來評估 idempotentKey。與 selectorkey-strategy 互斥。當未提供 selector 時,需要 key-strategykey-strategy-expression 之一。
8 MessageProcessor 參考。由底層 MetadataStoreSelector 使用。從請求訊息評估 idempotentKeyvalue。與 selectorvalue-expression 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息標頭作為 Metadata 'value'。
9 用於填入 ExpressionEvaluatingMessageProcessor 的 SpEL 運算式。由底層 MetadataStoreSelector 使用。透過使用請求訊息作為評估內容根物件來評估 idempotentKeyvalue。與 selectorvalue-strategy 互斥。預設情況下,'MetadataStoreSelector' 使用 'timestamp' 訊息標頭作為中繼資料 'value'。
10 BiPredicate<String, String> Bean 的參考,允許您透過比較金鑰的舊值和新值來選擇性地選擇訊息;預設為 null
11 如果 IdempotentReceiverInterceptor 拒絕訊息,是否擲回例外。預設值為 false。無論是否提供 discard-channel,都會應用它。

對於 Java 設定,Spring Integration 提供了方法層級的 @IdempotentReceiver 註解。它用於標記具有訊息傳遞註解 (@ServiceActivator@Router 等)的 method,以指定將哪些 `IdempotentReceiverInterceptor 物件應用於此端點。以下範例顯示如何使用 @IdempotentReceiver 註解

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

當您使用 Java DSL 時,您可以將攔截器新增至端點的 Advice 鏈,如下列範例所示

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor 僅針對 MessageHandler.handleMessage(Message<?>) 方法設計。從 4.3.1 版開始,它實作了 HandleMessageAdvice,並以 AbstractHandleMessageAdvice 作為基底類別,以實現更好的分離。有關更多資訊,請參閱 處理訊息 Advice