交易
本節說明 Spring for Apache Kafka 如何支援交易。
概觀
0.11.0.0 用戶端程式庫新增了對交易的支援。Spring for Apache Kafka 以以下方式新增支援
-
KafkaTransactionManager
:與一般 Spring 交易支援搭配使用 (@Transactional
、TransactionTemplate
等) -
交易式
KafkaMessageListenerContainer
-
使用
KafkaTemplate
的本地交易 -
與其他交易管理員的交易同步
透過提供具有 transactionIdPrefix
的 DefaultKafkaProducerFactory
來啟用交易。在這種情況下,工廠不是管理單一共享的 Producer
,而是維護交易式 Producer 的快取。當使用者在 Producer 上呼叫 close()
時,它會傳回快取以供重複使用,而不是實際關閉。每個 Producer 的 transactional.id
屬性為 transactionIdPrefix
+ n
,其中 n
從 0
開始,並針對每個新的 Producer 遞增。在先前版本的 Spring for Apache Kafka 中,由基於記錄的監聽器之監聽器容器啟動的交易,其 transactional.id
的產生方式不同,以支援 fencing zombies,這已不再必要,因為從 3.0 開始,EOSMode.V2
是唯一選項。對於在多個執行個體上執行的應用程式,每個執行個體的 transactionIdPrefix
必須是唯一的。
另請參閱精確一次語意。
另請參閱transactionIdPrefix
。
使用 Spring Boot 時,只需設定 spring.kafka.producer.transaction-id-prefix
屬性 - Spring Boot 將自動配置 KafkaTransactionManager
bean 並將其連接到監聽器容器。
從 2.5.8 版開始,您現在可以在 Producer 工廠上配置 maxAge 屬性。當使用可能在 Broker 的 transactional.id.expiration.ms 期間閒置的交易式 Producer 時,這非常有用。使用目前的 kafka-clients ,這可能會導致 ProducerFencedException ,而不會重新平衡。透過將 maxAge 設定為小於 transactional.id.expiration.ms ,如果 Producer 超過其最大存留期,工廠將會重新整理 Producer。 |
使用 KafkaTransactionManager
KafkaTransactionManager
是 Spring Framework 的 PlatformTransactionManager
的實作。它在其建構子中提供 Producer 工廠的參考。如果您提供自訂 Producer 工廠,它必須支援交易。請參閱 ProducerFactory.transactionCapable()
。
您可以將 KafkaTransactionManager
與一般 Spring 交易支援 (@Transactional
、TransactionTemplate
和其他) 搭配使用。如果交易處於活動狀態,則在交易範圍內執行的任何 KafkaTemplate
作業都會使用交易的 Producer
。管理員會根據成功或失敗來提交或回滾交易。您必須將 KafkaTemplate
配置為使用與交易管理員相同的 ProducerFactory
。
交易同步
本節指的是僅限 Producer 的交易 (非監聽器容器啟動的交易);如需容器啟動交易時鏈結交易的相關資訊,請參閱使用 Consumer 啟動的交易。
如果您想要將記錄傳送到 Kafka 並執行一些資料庫更新,您可以使用一般 Spring 交易管理,例如,DataSourceTransactionManager
。
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
@Transactional
註解的攔截器會啟動交易,而 KafkaTemplate
會將交易與該交易管理員同步;每個傳送都會參與該交易。當方法結束時,資料庫交易將會提交,然後是 Kafka 交易。如果您希望以相反的順序 (先 Kafka) 執行提交,請使用巢狀 @Transactional
方法,外部方法配置為使用 DataSourceTransactionManager
,而內部方法配置為使用 KafkaTransactionManager
。
如需在 Kafka 優先或 DB 優先配置中同步 JDBC 和 Kafka 交易的應用程式範例,請參閱Kafka 交易與其他交易管理員的範例。
從 2.5.17、2.6.12、2.7.9 和 2.8.0 版開始,如果同步交易上的提交失敗 (在主要交易提交之後),例外狀況將會拋出給呼叫者。先前,這會被靜默忽略 (以偵錯層級記錄)。應用程式應採取補救措施 (如有必要),以補償已提交的主要交易。 |
使用 Consumer 啟動的交易
ChainedKafkaTransactionManager
現在已棄用,自 2.7 版起;如需更多資訊,請參閱其父類別 ChainedTransactionManager
的 JavaDocs。相反地,在容器中使用 KafkaTransactionManager
來啟動 Kafka 交易,並使用 @Transactional
註解監聽器方法以啟動其他交易。
如需鏈結 JDBC 和 Kafka 交易的範例應用程式,請參閱Kafka 交易與其他交易管理員的範例。
KafkaTemplate
本地交易
您可以使用 KafkaTemplate
在本地交易中執行一系列操作。以下範例示範如何執行此操作
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回呼中的引數是範本本身 (this
)。如果回呼正常結束,則會提交交易。如果擲回例外狀況,則會回滾交易。
如果正在進行 KafkaTransactionManager (或同步) 交易,則不會使用它。而是使用新的「巢狀」交易。 |
TransactionIdPrefix
使用 EOSMode.V2
(又名 BETA
),這是唯一支援的模式,即使是 Consumer 啟動的交易,也不再需要使用相同的 transactional.id
;實際上,它在每個執行個體上都必須是唯一的,與 Producer 啟動的交易相同。此屬性在每個應用程式執行個體上必須具有不同的值。
TransactionIdSuffix Fixed
自 3.2 起,引入了新的 TransactionIdSuffixStrategy
介面來管理 transactional.id
後綴。當設定 maxCache
大於零時,預設實作是 DefaultTransactionIdSuffixStrategy
,可以在特定範圍內重複使用 transactional.id
,否則後綴將透過遞增計數器即時產生。當請求交易 Producer 且 transactional.id
全部在使用中時,會擲回 NoProducerAvailableException
。然後,使用者可以使用配置為重試該例外狀況的 RetryTemplate
,並具有適當配置的退避策略。
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
當將 maxCache
設定為 5 時,transactional.id
為 my.txid.
+`{0-4}`。
當將 KafkaTransactionManager 與 ConcurrentMessageListenerContainer 搭配使用並啟用 maxCache 時,必須將 maxCache 設定為大於或等於 concurrency 的值。如果 MessageListenerContainer 無法取得 transactional.id 後綴,它將擲回 NoProducerAvailableException 。當在 ConcurrentMessageListenerContainer 中使用巢狀交易時,必須調整 maxCache 設定以處理增加的巢狀交易數量。 |
KafkaTemplate
交易式和非交易式發布
通常,當 KafkaTemplate
是交易式的 (使用支援交易的 Producer 工廠配置) 時,需要交易。交易可以由 TransactionTemplate
、@Transactional
方法、呼叫 executeInTransaction
或由監聽器容器 (在配置了 KafkaTransactionManager
時) 啟動。在交易範圍之外嘗試使用範本會導致範本擲回 IllegalStateException
。從 2.4.3 版開始,您可以將範本的 allowNonTransactional
屬性設定為 true
。在這種情況下,範本將允許在沒有交易的情況下執行操作,方法是呼叫 ProducerFactory
的 createNonTransactionalProducer()
方法;Producer 將像正常情況一樣快取或線程繫結以供重複使用。請參閱使用 DefaultKafkaProducerFactory
。
批次監聽器的交易
當在使用交易時監聽器失敗時,會叫用 AfterRollbackProcessor
以在回滾發生後採取某些動作。當將預設 AfterRollbackProcessor
與記錄監聽器搭配使用時,會執行尋找,以便重新傳遞失敗的記錄。但是,對於批次監聽器,將會重新傳遞整個批次,因為框架不知道批次中的哪個記錄失敗。如需更多資訊,請參閱回滾後處理器。
當使用批次監聽器時,2.4.2 版引入了一種處理處理批次時失敗的替代機制:BatchToRecordAdapter
。當將 batchListener
設定為 true 的容器工廠配置為 BatchToRecordAdapter
時,會一次使用一個記錄叫用監聽器。這可以在批次中啟用錯誤處理,同時仍然可以停止處理整個批次,具體取決於例外狀況類型。提供了一個預設的 BatchToRecordAdapter
,可以使用標準 ConsumerRecordRecoverer
(例如 DeadLetterPublishingRecoverer
) 進行配置。以下測試案例配置片段說明了如何使用此功能
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}