手動選擇性地啟動 Kafka Streams 處理器

雖然上述方法將透過 StreamsBuilderFactoryManager 無條件地將自動啟動 false 應用於應用程式中的所有 Kafka Streams 處理器,但通常只希望個別選擇的 Kafka Streams 處理器不要自動啟動。例如,假設您的應用程式中有三個不同的函數 (處理器),並且對於其中一個處理器,您不希望在應用程式啟動時啟動它。以下是這種情況的範例。

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

在上述情況中,如果您將 spring.kafka.streams.auto-startup 設定為 false,則在應用程式啟動期間,所有處理器都不會自動啟動。在這種情況下,您必須如上所述,透過在底層 StreamsBuilderFactoryManager 上呼叫 start() 以程式方式啟動它們。但是,如果我們有僅選擇性停用一個處理器的用例,則必須在該處理器的個別綁定上設定 auto-startup。假設我們不希望我們的 process3 函數自動啟動。這是一個具有兩個輸入綁定的 BiFunction - process3-in-0process3-in-1。為了避免此處理器的自動啟動,您可以選擇任何這些輸入綁定,並在其上設定 auto-startup。您選擇哪個綁定並不重要;如果您願意,您可以將它們兩者上的 auto-startup 設定為 false,但一個就足夠了。因為它們共用相同的 factory bean,所以您不必在兩個綁定上都將 autoStartup 設定為 false,但為了清晰起見,這樣做可能更有意義。

以下是您可以使用的 Spring Cloud Stream 屬性,以停用此處理器的自動啟動。

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false

spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

然後,您可以手動啟動處理器,可以使用 REST 端點或使用如下所示的 BindingsEndpoint API。為此,您需要確保類路徑上具有 Spring Boot actuator 依賴項。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://127.0.0.1:8080/actuator/bindings/process3-in-0

@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

請參閱參考文件中的此章節,以取得有關此機制的更多詳細資訊。

當透過停用 auto-startup 來控制綁定 (如本節所述) 時,請注意,這僅適用於消費者綁定。換句話說,如果您使用生產者綁定 process3-out-0,則在停用處理器的自動啟動方面沒有任何效果,儘管此生產者綁定與消費者綁定使用相同的 StreamsBuilderFactoryBean