交易

Spring Rabbit 框架在同步和非同步使用案例中,透過許多不同的語意支援自動交易管理,這些語意可以宣告方式選擇,如同 Spring 交易的現有使用者所熟悉的方式。這使得許多(如果不是大多數)常見的訊息傳遞模式易於實作。

有兩種方式可以向框架發出訊號,表示所需的交易語意。在 RabbitTemplateSimpleMessageListenerContainer 中,都有一個旗標 channelTransacted,如果為 true,則告知框架使用交易通道,並以 commit 或 rollback 結束所有操作(傳送或接收),具體取決於結果,例外狀況表示 rollback。另一個訊號是提供 Spring 的 PlatformTransactionManager 實作之一的外部交易,作為正在進行的操作的內容。如果在框架傳送或接收訊息時,已經有一個正在進行的交易,並且 channelTransacted 旗標為 true,則訊息交易的 commit 或 rollback 會延遲到目前交易結束。如果 channelTransacted 旗標為 false,則沒有交易語意適用於訊息操作(它是自動確認的)。

channelTransacted 旗標是組態時間設定。它在建立 AMQP 元件時宣告和處理一次,通常在應用程式啟動時。外部交易原則上更具動態性,因為系統會回應執行時的目前執行緒狀態。但是,實際上,它通常也是組態設定,當交易以宣告方式分層到應用程式上時。

對於 RabbitTemplate 的同步使用案例,外部交易由呼叫者提供,可以根據喜好以宣告方式或命令方式提供(通常的 Spring 交易模型)。以下範例顯示了宣告式方法(通常是首選,因為它是非侵入性的),其中範本已設定 channelTransacted=true

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在先前的範例中,接收到 String payload,轉換後,作為訊息主體在標記為 @Transactional 的方法內部傳送。如果資料庫處理因例外狀況而失敗,則傳入的訊息會傳回給 Broker,並且不會傳送傳出的訊息。這適用於交易方法鏈中的任何 RabbitTemplate 操作(除非,例如,直接操作 Channel 以提前 commit 交易)。

對於 SimpleMessageListenerContainer 的非同步使用案例,如果需要外部交易,則必須在容器設定監聽器時由容器請求。為了表示需要外部交易,使用者在設定容器時向容器提供 PlatformTransactionManager 的實作。以下範例顯示如何執行此操作

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在先前的範例中,交易管理器作為從另一個 bean 定義(未顯示)注入的相依性加入,並且 channelTransacted 旗標也設定為 true。效果是,如果監聽器因例外狀況而失敗,則交易會 rollback,並且訊息也會傳回給 Broker。重要的是,如果交易 commit 失敗(例如,由於資料庫約束錯誤或連線問題),AMQP 交易也會 rollback,並且訊息會傳回給 Broker。這有時稱為「盡力單階段提交」,並且是可靠訊息傳遞的非常強大的模式。如果在先前的範例中,channelTransacted 旗標設定為 false(預設值),則外部交易仍會為監聽器提供,但是所有訊息操作都將自動確認,因此效果是在業務操作 rollback 時也 commit 訊息操作。

條件式回滾

在 1.6.6 版本之前,當使用外部交易管理器(例如 JDBC)時,向容器的 transactionAttribute 新增回滾規則沒有任何效果。例外狀況總是會回滾交易。

此外,當在容器的建議鏈中使用交易建議時,條件式回滾不是很有用,因為所有監聽器例外狀況都包裝在 ListenerExecutionFailedException 中。

第一個問題已修正,規則現在已正確套用。此外,現在提供 ListenerFailedRuleBasedTransactionAttribute。它是 RuleBasedTransactionAttribute 的子類別,唯一的區別是它知道 ListenerExecutionFailedException,並使用此類例外狀況的原因作為規則。此交易屬性可以直接在容器中使用,也可以透過交易建議使用。

以下範例使用此規則

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

關於接收訊息回滾的注意事項

AMQP 交易僅適用於傳送至 Broker 的訊息和確認。因此,當 Spring 交易回滾且已接收訊息時,Spring AMQP 不僅必須回滾交易,還必須手動拒絕訊息(有點像 nack,但這不是規格所稱的)。對訊息拒絕採取的動作與交易無關,並且取決於 defaultRequeueRejected 屬性(預設值:true)。有關拒絕失敗訊息的更多資訊,請參閱訊息監聽器與非同步案例

有關 RabbitMQ 交易及其限制的更多資訊,請參閱RabbitMQ Broker 語意

在 RabbitMQ 2.7.0 之前,此類訊息(以及在通道關閉或中止時未確認的任何訊息)會移至 Rabbit Broker 上佇列的後端。自 2.7.0 起,拒絕的訊息會移至佇列的前端,方式與 JMS 回滾訊息類似。
先前,交易回滾時的訊息重新排隊在本機交易與提供 TransactionManager 時不一致。在前一種情況下,套用了正常的重新排隊邏輯(AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false)(請參閱訊息監聽器與非同步案例)。使用交易管理器時,訊息會在回滾時無條件地重新排隊。從 2.0 版本開始,行為變得一致,並且在這兩種情況下都套用了正常的重新排隊邏輯。若要還原為先前的行為,您可以將容器的 alwaysRequeueWithTxManagerRollback 屬性設定為 true。請參閱訊息監聽器容器設定

使用 RabbitTransactionManager

RabbitTransactionManager 是在外部交易中執行 Rabbit 作業並與之同步的替代方案。此交易管理器是 PlatformTransactionManager 介面的實作,應與單個 Rabbit ConnectionFactory 一起使用。

此策略無法提供 XA 交易 - 例如,為了在訊息傳遞和資料庫存取之間共享交易。

應用程式碼需要透過 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 擷取交易式 Rabbit 資源,而不是標準的 Connection.createChannel() 呼叫以及後續的通道建立。當使用 Spring AMQP 的 RabbitTemplate 時,它將自動偵測執行緒繫結的 Channel,並自動參與其交易。

使用 Java Configuration,您可以使用以下 bean 設定新的 RabbitTransactionManager

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果您偏好 XML 組態,您可以在您的 XML 應用程式內容檔案中宣告以下 bean

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

交易同步

將 RabbitMQ 交易與其他(例如 DBMS)交易同步,可提供「盡力一次提交」語意。RabbitMQ 交易有可能在交易同步的完成後階段提交失敗。這會由 spring-tx 基礎架構記錄為錯誤,但不會向呼叫程式碼擲回例外狀況。從 2.3.10 版本開始,您可以在交易完成後,在處理交易的相同執行緒上呼叫 ConnectionUtils.checkAfterCompletion()。如果未發生例外狀況,它只會傳回;否則它會擲回 AfterCompletionFailedException,其中將具有表示完成同步狀態的屬性。

透過呼叫 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 啟用此功能;這是一個全域旗標,適用於所有執行緒。