交易

本節說明 Spring for Apache Kafka 如何支援交易。

概觀

0.11.0.0 用戶端程式庫新增了對交易的支援。Spring for Apache Kafka 以以下方式新增支援

  • KafkaTransactionManager:與一般 Spring 交易支援搭配使用 (@TransactionalTransactionTemplate 等)

  • 交易式 KafkaMessageListenerContainer

  • 使用 KafkaTemplate 的本地交易

  • 與其他交易管理員的交易同步

透過提供具有 transactionIdPrefixDefaultKafkaProducerFactory 來啟用交易。在這種情況下,工廠不是管理單一共享的 Producer,而是維護交易式 Producer 的快取。當使用者在 Producer 上呼叫 close() 時,它會傳回快取以供重複使用,而不是實際關閉。每個 Producer 的 transactional.id 屬性為 transactionIdPrefix + n,其中 n0 開始,並針對每個新的 Producer 遞增。在先前版本的 Spring for Apache Kafka 中,由基於記錄的監聽器之監聽器容器啟動的交易,其 transactional.id 的產生方式不同,以支援 fencing zombies,這已不再必要,因為從 3.0 開始,EOSMode.V2 是唯一選項。對於在多個執行個體上執行的應用程式,每個執行個體的 transactionIdPrefix 必須是唯一的。

另請參閱精確一次語意

另請參閱transactionIdPrefix

使用 Spring Boot 時,只需設定 spring.kafka.producer.transaction-id-prefix 屬性 - Spring Boot 將自動配置 KafkaTransactionManager bean 並將其連接到監聽器容器。

從 2.5.8 版開始,您現在可以在 Producer 工廠上配置 maxAge 屬性。當使用可能在 Broker 的 transactional.id.expiration.ms 期間閒置的交易式 Producer 時,這非常有用。使用目前的 kafka-clients,這可能會導致 ProducerFencedException,而不會重新平衡。透過將 maxAge 設定為小於 transactional.id.expiration.ms,如果 Producer 超過其最大存留期,工廠將會重新整理 Producer。

使用 KafkaTransactionManager

KafkaTransactionManager 是 Spring Framework 的 PlatformTransactionManager 的實作。它在其建構子中提供 Producer 工廠的參考。如果您提供自訂 Producer 工廠,它必須支援交易。請參閱 ProducerFactory.transactionCapable()

您可以將 KafkaTransactionManager 與一般 Spring 交易支援 (@TransactionalTransactionTemplate 和其他) 搭配使用。如果交易處於活動狀態,則在交易範圍內執行的任何 KafkaTemplate 作業都會使用交易的 Producer。管理員會根據成功或失敗來提交或回滾交易。您必須將 KafkaTemplate 配置為使用與交易管理員相同的 ProducerFactory

交易同步

本節指的是僅限 Producer 的交易 (非監聽器容器啟動的交易);如需容器啟動交易時鏈結交易的相關資訊,請參閱使用 Consumer 啟動的交易

如果您想要將記錄傳送到 Kafka 並執行一些資料庫更新,您可以使用一般 Spring 交易管理,例如,DataSourceTransactionManager

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

@Transactional 註解的攔截器會啟動交易,而 KafkaTemplate 會將交易與該交易管理員同步;每個傳送都會參與該交易。當方法結束時,資料庫交易將會提交,然後是 Kafka 交易。如果您希望以相反的順序 (先 Kafka) 執行提交,請使用巢狀 @Transactional 方法,外部方法配置為使用 DataSourceTransactionManager,而內部方法配置為使用 KafkaTransactionManager

如需在 Kafka 優先或 DB 優先配置中同步 JDBC 和 Kafka 交易的應用程式範例,請參閱Kafka 交易與其他交易管理員的範例

從 2.5.17、2.6.12、2.7.9 和 2.8.0 版開始,如果同步交易上的提交失敗 (在主要交易提交之後),例外狀況將會拋出給呼叫者。先前,這會被靜默忽略 (以偵錯層級記錄)。應用程式應採取補救措施 (如有必要),以補償已提交的主要交易。

使用 Consumer 啟動的交易

ChainedKafkaTransactionManager 現在已棄用,自 2.7 版起;如需更多資訊,請參閱其父類別 ChainedTransactionManager 的 JavaDocs。相反地,在容器中使用 KafkaTransactionManager 來啟動 Kafka 交易,並使用 @Transactional 註解監聽器方法以啟動其他交易。

如需鏈結 JDBC 和 Kafka 交易的範例應用程式,請參閱Kafka 交易與其他交易管理員的範例

非阻塞重試無法與容器交易結合。當監聽器程式碼擲回例外狀況時,容器交易提交成功,記錄會傳送到可重試的主題。

KafkaTemplate 本地交易

您可以使用 KafkaTemplate 在本地交易中執行一系列操作。以下範例示範如何執行此操作

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回呼中的引數是範本本身 (this)。如果回呼正常結束,則會提交交易。如果擲回例外狀況,則會回滾交易。

如果正在進行 KafkaTransactionManager (或同步) 交易,則不會使用它。而是使用新的「巢狀」交易。

TransactionIdPrefix

使用 EOSMode.V2 (又名 BETA),這是唯一支援的模式,即使是 Consumer 啟動的交易,也不再需要使用相同的 transactional.id;實際上,它在每個執行個體上都必須是唯一的,與 Producer 啟動的交易相同。此屬性在每個應用程式執行個體上必須具有不同的值。

TransactionIdSuffix Fixed

自 3.2 起,引入了新的 TransactionIdSuffixStrategy 介面來管理 transactional.id 後綴。當設定 maxCache 大於零時,預設實作是 DefaultTransactionIdSuffixStrategy,可以在特定範圍內重複使用 transactional.id,否則後綴將透過遞增計數器即時產生。當請求交易 Producer 且 transactional.id 全部在使用中時,會擲回 NoProducerAvailableException。然後,使用者可以使用配置為重試該例外狀況的 RetryTemplate,並具有適當配置的退避策略。

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

當將 maxCache 設定為 5 時,transactional.idmy.txid.+`{0-4}`。

當將 KafkaTransactionManagerConcurrentMessageListenerContainer 搭配使用並啟用 maxCache 時,必須將 maxCache 設定為大於或等於 concurrency 的值。如果 MessageListenerContainer 無法取得 transactional.id 後綴,它將擲回 NoProducerAvailableException。當在 ConcurrentMessageListenerContainer 中使用巢狀交易時,必須調整 maxCache 設定以處理增加的巢狀交易數量。

KafkaTemplate 交易式和非交易式發布

通常,當 KafkaTemplate 是交易式的 (使用支援交易的 Producer 工廠配置) 時,需要交易。交易可以由 TransactionTemplate@Transactional 方法、呼叫 executeInTransaction 或由監聽器容器 (在配置了 KafkaTransactionManager 時) 啟動。在交易範圍之外嘗試使用範本會導致範本擲回 IllegalStateException。從 2.4.3 版開始,您可以將範本的 allowNonTransactional 屬性設定為 true。在這種情況下,範本將允許在沒有交易的情況下執行操作,方法是呼叫 ProducerFactorycreateNonTransactionalProducer() 方法;Producer 將像正常情況一樣快取或線程繫結以供重複使用。請參閱使用 DefaultKafkaProducerFactory

批次監聽器的交易

當在使用交易時監聽器失敗時,會叫用 AfterRollbackProcessor 以在回滾發生後採取某些動作。當將預設 AfterRollbackProcessor 與記錄監聽器搭配使用時,會執行尋找,以便重新傳遞失敗的記錄。但是,對於批次監聽器,將會重新傳遞整個批次,因為框架不知道批次中的哪個記錄失敗。如需更多資訊,請參閱回滾後處理器

當使用批次監聽器時,2.4.2 版引入了一種處理處理批次時失敗的替代機制:BatchToRecordAdapter。當將 batchListener 設定為 true 的容器工廠配置為 BatchToRecordAdapter 時,會一次使用一個記錄叫用監聽器。這可以在批次中啟用錯誤處理,同時仍然可以停止處理整個批次,具體取決於例外狀況類型。提供了一個預設的 BatchToRecordAdapter,可以使用標準 ConsumerRecordRecoverer (例如 DeadLetterPublishingRecoverer) 進行配置。以下測試案例配置片段說明了如何使用此功能

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}