交易支援
從 5.0 版本開始,引入了新的 TransactionHandleMessageAdvice
,藉由 HandleMessageAdvice
的實作,使整個下游流程具備交易性。當在 <request-handler-advice-chain>
元素中使用常規的 TransactionInterceptor
(例如,透過設定 <tx:advice>
)時,啟動的交易僅適用於內部的 AbstractReplyProducingMessageHandler.handleRequestMessage()
,而不會傳播到下游流程。
為了簡化 XML 設定,除了 <request-handler-advice-chain>
之外,還為所有 <outbound-gateway>
和 <service-activator>
及相關元件新增了 <transactional>
元素。以下範例展示了 <transactional>
的使用方式
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
Java 設定可以使用 TransactionInterceptorBuilder
簡化,並且結果 bean 名稱可以用於 訊息傳遞註解 adviceChain
屬性中,如下例所示
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
請注意 TransactionInterceptorBuilder
建構子上的 true
參數。它會導致建立 TransactionHandleMessageAdvice
,而不是常規的 TransactionInterceptor
。
Java DSL 透過端點設定上的 .transactional()
選項支援 Advice
,如下例所示
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}