交易支援

本章涵蓋 Spring Integration 對交易的支援。它涵蓋以下主題

了解訊息流程中的交易

Spring Integration 公開了幾個掛鉤,以解決您的訊息流程的交易需求。為了更好地理解這些掛鉤以及如何從中受益,我們必須首先回顧可用於啟動訊息流程的六種機制,並了解如何在這些機制的每一個中解決這些流程的交易需求。

以下六種機制啟動訊息流程(每種機制的詳細資訊在本手冊中提供)

  • 閘道代理:基本訊息傳遞閘道。

  • 訊息通道:與 MessageChannel 方法的直接互動(例如,channel.send(message))。

  • 訊息發佈者:將訊息流程作為 Spring Bean 上方法調用的副產品啟動的方式。

  • 輸入通道配接器和閘道:基於將第三方系統與 Spring Integration 訊息傳遞系統連接來啟動訊息流程的方式(例如,[JmsMessage] → Jms 輸入配接器[SI 訊息] → SI 通道)。

  • 排程器:基於預先設定的排程器分發的排程事件來啟動訊息流程的方式。

  • Poller:與排程器類似,這是基於預先設定的 poller 分發的排程或基於間隔的事件來啟動訊息流程的方式。

我們可以將這六種機制分為兩個一般類別

  • 由使用者程序啟動的訊息流程:此類別中的範例情境包括調用閘道方法或將 Message 明確地發送到 MessageChannel。換句話說,這些訊息流程取決於第三方程序(例如您編寫的一些程式碼)來啟動。

  • 由守護程序啟動的訊息流程:此類別中的範例情境包括 Poller 輪詢訊息佇列以使用輪詢訊息啟動新的訊息流程,或排程器在預定義時間排程程序,方法是建立新訊息並啟動訊息流程。

顯然,閘道代理、MessageChannel.send(…​)MessagePublisher 都屬於第一類,而輸入配接器和閘道、排程器和 poller 屬於第二類。

那麼,如何在每個類別中各種情境中解決交易需求?Spring Integration 是否需要針對特定情境提供一些明確的交易相關內容?或者您可以改用 Spring 的交易支援嗎?

Spring 本身為交易管理提供一流的支援。因此,我們在這裡的目標不是提供新的東西,而是使用 Spring 從其現有的交易支援中受益。換句話說,作為一個框架,我們必須公開 Spring 交易管理功能的掛鉤。但是,由於 Spring Integration 設定是基於 Spring 設定,因此我們不總是需要公開這些掛鉤,因為 Spring 已經公開了它們。畢竟,每個 Spring Integration 元件都是一個 Spring Bean。

考慮到這個目標,我們可以再次考慮兩種情境:由使用者程序啟動的訊息流程和由守護程序啟動的訊息流程。

由使用者程序啟動並在 Spring 應用程式內容中設定的訊息流程,受此類程序的一般交易設定的約束。因此,它們不需要由 Spring Integration 明確設定為支援交易。交易可以而且應該透過 Spring 的標準交易支援啟動。Spring Integration 訊息流程自然地遵循元件的交易語意,因為它本身是由 Spring 設定的。例如,閘道或服務啟動器方法可以使用 @Transactional 註解,或者可以在 XML 設定中定義 TransactionInterceptor,其中包含指向應為交易的特定方法的 pointcut 運算式。最重要的是,在這些情境中,您可以完全控制交易設定和邊界。

但是,當涉及到由守護程序啟動的訊息流程時,情況會有些不同。儘管由開發人員設定,但這些流程並不直接涉及人員或其他程序來啟動。這些是基於觸發器的流程,由觸發器程序(守護程序)根據程序的設定啟動。例如,我們可以讓排程器每週五晚上啟動訊息流程。我們也可以設定一個觸發器,每秒啟動一個訊息流程等等。因此,我們需要一種方法讓這些基於觸發器的程序知道我們打算使產生的訊息流程成為交易性的,以便在每次啟動新的訊息流程時都可以建立交易內容。換句話說,我們需要公開一些交易設定,但只需足夠委派給 Spring 已經提供的交易支援即可(就像我們在其他情境中所做的那樣)。

Poller 交易支援

Spring Integration 為 pollers 提供交易支援。Pollers 是一種特殊的元件類型,因為在 poller 任務中,我們可以針對本身是交易性的資源調用 receive(),從而將 receive() 調用包含在交易的邊界內,這使其可以在任務失敗時回滾。如果我們要為通道新增相同的支援,則新增的交易將影響所有下游元件,從 send() 調用開始。這為交易劃分提供了相當廣泛的範圍,而沒有任何強烈的理由,尤其是當 Spring 已經提供了幾種方法來解決任何下游元件的交易需求時。但是,receive() 方法包含在交易邊界中是 pollers 的「強烈理由」。

每次您設定 Poller 時,都可以透過使用 transactional 子元素及其屬性來提供交易設定,如下例所示

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager"
                   isolation="DEFAULT"
                   propagation="REQUIRED"
                   read-only="true"
                   timeout="1000"/>
</poller>

先前的設定看起來與原生 Spring 交易設定類似。您仍然必須提供對交易管理員的參考,並指定交易屬性或依賴預設值(例如,如果未指定 'transaction-manager' 屬性,則預設為名為 'transactionManager' 的 bean)。在內部,該程序包裝在 Spring 的原生交易中,其中 TransactionInterceptor 負責處理交易。有關如何設定交易管理員、交易管理員的類型(例如 JTA、Datasource 等)以及與交易設定相關的其他詳細資訊,請參閱 Spring Framework 參考指南

使用先前的設定,由此 poller 啟動的所有訊息流程都是交易性的。有關 poller 交易設定的更多資訊和詳細資訊,請參閱 輪詢和交易

除了交易之外,您在執行 poller 時可能還需要解決更多跨領域問題。為了幫助解決這個問題,poller 元素接受 <advice-chain> 子元素,該元素允許您定義要應用於 Poller 的自訂 advice 實例鏈。(請參閱 可輪詢訊息來源 以取得更多詳細資訊。)在 Spring Integration 2.0 中,Poller 經歷了一次重構工作,現在使用代理機制來解決交易問題以及其他跨領域問題。從這項工作中發展出來的重大變更之一是,我們使 <transactional><advice-chain> 元素互斥。這背後的理由是,如果您需要多個 advice,其中一個是 Transaction advice,您可以將其包含在 <advice-chain> 中,就像以前一樣方便,但具有更多的控制權,因為您現在可以選擇將 advice 放置在所需的順序中。以下範例顯示如何執行此操作

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someOtherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

先前的範例顯示了 Spring Transaction advice (txAdvice) 的基本 XML 設定,並將其包含在 Poller 定義的 <advice-chain> 中。如果您只需要解決 poller 的交易問題,您仍然可以使用 <transactional> 元素作為便利方式。

交易邊界

另一個重要因素是訊息流程中交易的邊界。當交易開始時,交易內容會繫結到當前執行緒。因此,無論您的訊息流程中有多少端點和通道,只要您確保流程在同一個執行緒上繼續,您的交易內容都將被保留。一旦您透過引入可輪詢通道執行器通道來中斷它,或者在某些服務中手動啟動新的執行緒,交易邊界也將被中斷。本質上,交易將在那裡結束,如果執行緒之間成功交接,則流程將被視為成功,並且將發送 COMMIT 訊號,即使流程將繼續並且可能仍然在下游某處導致異常。如果此類流程是同步的,則可以將該異常拋回給訊息流程的啟動者,訊息流程的啟動者也是交易內容的啟動者,並且交易將導致 ROLLBACK。折衷方案是在任何執行緒邊界被中斷的點使用交易通道。例如,您可以使用委派給交易性 MessageStore 策略的佇列支援通道,或者您可以使用 JMS 支援的通道。

交易同步

在某些環境中,將操作與包含整個流程的交易同步化會有所幫助。例如,考慮流程開始時的 <file:inbound-channel-adapter/>,它執行許多資料庫更新。如果交易提交,我們可能希望將檔案移動到 success 目錄,而如果交易回滾,我們可能希望將其移動到 failure 目錄。

Spring Integration 2.2 引入了將這些操作與交易同步化的功能。此外,如果您沒有「真實」交易,但仍然希望在成功或失敗時執行不同的動作,則可以設定 PseudoTransactionManager。有關更多資訊,請參閱 虛擬交易

以下清單顯示了此功能的關鍵策略介面

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

Factory 負責建立 TransactionSynchronization 物件。您可以實作自己的物件,或使用框架提供的物件:DefaultTransactionSynchronizationFactory。此實作傳回一個 TransactionSynchronization,它委派給 TransactionSynchronizationProcessor 的預設實作:ExpressionEvaluatingTransactionSynchronizationProcessor。此處理器支援三個 SpEL 運算式:beforeCommitExpressionafterCommitExpressionafterRollbackExpression

對於熟悉交易的人來說,這些動作應該是不言自明的。在每種情況下,#root 變數都是原始 Message。在某些情況下,會提供其他 SpEL 變數,具體取決於 poller 輪詢的 MessageSource。例如,MongoDbMessageSource 提供 #mongoTemplate 變數,該變數參考訊息來源的 MongoTemplate。同樣,RedisStoreMessageSource 提供 #store 變數,該變數參考 poller 建立的 RedisStore

若要為特定 poller 啟用此功能,您可以透過使用 synchronization-factory 屬性,在 poller 的 <transactional/> 元素上提供對 TransactionSynchronizationFactory 的參考。

從 5.0 版開始,Spring Integration 提供了 PassThroughTransactionSynchronizationFactory,當未設定 TransactionSynchronizationFactory,但在 advice 鏈中存在 TransactionInterceptor 類型的 advice 時,預設會將其應用於輪詢端點。當使用任何現成的 TransactionSynchronizationFactory 實作時,輪詢端點會將輪詢訊息繫結到當前交易內容,如果交易 advice 之後拋出異常,則會將其作為 MessagingException 中的 failedMessage 提供。當使用不實作 TransactionInterceptor 的自訂交易 advice 時,您可以明確設定 PassThroughTransactionSynchronizationFactory 以實現此行為。在任何一種情況下,MessagingException 都會變成發送到 errorChannelErrorMessage 的 payload,而原因是由 advice 拋出的原始異常。以前,ErrorMessage 的 payload 是由 advice 拋出的原始異常,並且沒有提供對 failedMessage 資訊的參考,因此難以確定交易提交問題的原因。

為了簡化這些元件的設定,Spring Integration 為預設 factory 提供了命名空間支援。以下範例顯示如何使用命名空間來設定檔案輸入通道配接器

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

SpEL 評估的結果會作為 payload 發送到 committedChannelrolledBackChannel(在本例中,這將是 Boolean.TRUEBoolean.FALSE — java.io.File.renameTo() 方法調用的結果)。

如果您希望發送整個 payload 以進行進一步的 Spring Integration 處理,請使用 'payload' 運算式。

重要的是要理解,這會將動作與交易同步化。它不會使本質上不是交易性的資源實際上成為交易性的。相反,交易(無論是 JDBC 還是其他)在輪詢之前開始,並在流程完成時提交或回滾,然後是同步動作。

如果您提供自訂 TransactionSynchronizationFactory,則它負責建立資源同步,該同步會在交易完成時自動取消繫結繫結的資源。預設 TransactionSynchronizationFactory 透過傳回 ResourceHolderSynchronization 的子類別來執行此操作,預設 shouldUnbindAtCompletion() 傳回 true

除了 after-commitafter-rollback 運算式之外,也支援 before-commit。在這種情況下,如果評估(或下游處理)拋出異常,則交易將回滾而不是提交。

虛擬交易

閱讀 交易同步 章節後,您可能會認為在流程完成時採取這些「成功」或「失敗」動作會很有用,即使在 poller 的下游沒有「真實」交易資源(例如 JDBC)。例如,考慮一個「<file:inbound-channel-adapter/>」,後跟一個「<ftp:outbout-channel-adapter/>」。這些元件都不是交易性的,但我們可能希望根據 FTP 傳輸的成功或失敗將輸入檔案移動到不同的目錄。

為了提供此功能,框架提供了 PseudoTransactionManager,即使沒有涉及真實的交易資源,也可以啟用上述設定。如果流程正常完成,則會調用 beforeCommitafterCommit 同步。失敗時,會調用 afterRollback 同步。由於它不是真實的交易,因此不會發生實際的提交或回滾。虛擬交易是一種用於啟用同步功能的工具。

若要使用 PseudoTransactionManager,您可以將其定義為 <bean/>,就像您設定真實交易管理員的方式一樣。以下範例顯示如何執行此操作

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

反應式交易

從 5.3 版開始,ReactiveTransactionManager 也可以與 TransactionInterceptor advice 一起用於傳回反應式類型的端點。這包括 MessageSourceReactiveMessageHandler 實作(例如 ReactiveMongoDbMessageSource),它們產生具有 FluxMono payload 的訊息。當其回覆 payload 也是一些反應式類型時,所有其他回覆產生訊息處理器實作都可以依賴 ReactiveTransactionManager