事務性繫結器
透過設定 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
為非空值,例如 tx-
,來啟用事務。當在處理器應用程式中使用時,消費者會啟動事務;在消費者線程上發送的任何紀錄都會參與同一個事務。當監聽器正常退出時,監聽器容器會將偏移量發送到事務並提交它。通用的生產者工廠用於所有使用 spring.cloud.stream.kafka.binder.transaction.producer.*
屬性配置的生產者繫結;個別繫結 Kafka 生產者屬性將被忽略。
正常的繫結器重試 (和死信) 在事務中不受支援,因為重試將在原始事務中運行,這可能會回滾,並且任何發布的紀錄也將被回滾。當啟用重試時 (通用屬性 maxAttempts 大於零),重試屬性會被用來配置 DefaultAfterRollbackProcessor ,以在容器層級啟用重試。同樣地,不是在事務中發布死信紀錄,此功能被移動到監聽器容器,再次透過 DefaultAfterRollbackProcessor ,它在主事務回滾後運行。 |
如果您希望在來源應用程式中使用事務,或從某些任意線程進行僅生產者事務 (例如 @Scheduled
方法),您必須取得對事務性生產者工廠的參考,並使用它定義一個 KafkaTransactionManager
Bean。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
請注意,我們使用 BinderFactory
取得對繫結器的參考;當只配置一個繫結器時,在第一個參數中使用 null
。如果配置了多個繫結器,請使用繫結器名稱來取得參考。一旦我們有了對繫結器的參考,我們可以取得對 ProducerFactory
的參考並創建一個事務管理器。
然後您可以使用正常的 Spring 事務支援,例如 TransactionTemplate
或 @Transactional
,例如
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望將僅生產者事務與來自其他事務管理器的事務同步,請使用 ChainedTransactionManager
。
如果您部署應用程式的多個實例,則每個實例都需要唯一的 transactionIdPrefix 。 |