手動啟動 Kafka Streams 處理器

Spring Cloud Stream Kafka Streams binder 在 Spring for Apache Kafka 的 StreamsBuilderFactoryBean 之上提供了一個名為 StreamsBuilderFactoryManager 的抽象層。此管理 API 用於控制基於 binder 的應用程式中每個處理器的多個 StreamsBuilderFactoryBean。因此,當使用 binder 時,如果您想要手動控制應用程式中各種 StreamsBuilderFactoryBean 物件的自動啟動,則需要使用 StreamsBuilderFactoryManager。您可以使用屬性 spring.kafka.streams.auto-startup 並將其設定為 false,以關閉處理器的自動啟動。然後,在應用程式中,您可以使用如下所示的方法,使用 StreamsBuilderFactoryManager 啟動處理器。

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
    return args -> {
        sbfm.start();
    };
}

當您希望您的應用程式在主執行緒中啟動,並讓 Kafka Streams 處理器單獨啟動時,此功能非常方便。例如,當您有一個大型狀態儲存需要還原時,如果處理器像預設情況一樣正常啟動,這可能會阻止您的應用程式啟動。如果您正在使用某種類型的活性探測機制 (例如在 Kubernetes 上),它可能會認為應用程式已關閉並嘗試重新啟動。為了修正此問題,您可以將 spring.kafka.streams.auto-startup 設定為 false 並遵循上述方法。

請記住,當使用 Spring Cloud Stream binder 時,您不是直接處理 Spring for Apache Kafka 的 StreamsBuilderFactoryBean,而是 StreamsBuilderFactoryManager,因為 StreamsBuilderFactoryBean 物件是由 binder 在內部管理的。