交易支援

從 5.0 版本開始,引入了新的 TransactionHandleMessageAdvice,藉由 HandleMessageAdvice 的實作,使整個下游流程具備交易性。當在 <request-handler-advice-chain> 元素中使用常規的 TransactionInterceptor(例如,透過設定 <tx:advice>)時,啟動的交易僅適用於內部的 AbstractReplyProducingMessageHandler.handleRequestMessage(),而不會傳播到下游流程。

為了簡化 XML 設定,除了 <request-handler-advice-chain> 之外,還為所有 <outbound-gateway><service-activator> 及相關元件新增了 <transactional> 元素。以下範例展示了 <transactional> 的使用方式

<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
        <int-jdbc:transactional/>
</int-jdbc:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
    <constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

如果您熟悉 JPA 整合元件,這樣的設定並不陌生,但現在我們可以從流程中的任何點開始交易,而不僅僅是從 <poller> 或訊息驅動的通道配接器(例如 JMS)。

Java 設定可以使用 TransactionInterceptorBuilder 簡化,並且結果 bean 名稱可以用於 訊息傳遞註解 adviceChain 屬性中,如下例所示

@Bean
public ConcurrentMetadataStore store() {
    return new SimpleMetadataStore(hazelcastInstance()
                       .getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    message -> message.getPayload().toString(),
                    message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(this.transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRES_NEW)
                .build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
         outputChannel = "output",
         adviceChain = { "idempotentReceiverInterceptor",
                 "transactionInterceptor" })
public Transformer transformer() {
    return message -> message;
}

請注意 TransactionInterceptorBuilder 建構子上的 true 參數。它會導致建立 TransactionHandleMessageAdvice,而不是常規的 TransactionInterceptor

Java DSL 透過端點設定上的 .transactional() 選項支援 Advice,如下例所示

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
        .handle(Jpa.updatingGateway(this.entityManagerFactory),
                e -> e.transactional(true))
        .channel(c -> c.queue("persistResults"));
}