手動選擇性地啟動 Kafka Streams 處理器
雖然上述方法將透過 StreamsBuilderFactoryManager
無條件地將自動啟動 false
應用於應用程式中的所有 Kafka Streams 處理器,但通常只希望個別選擇的 Kafka Streams 處理器不要自動啟動。例如,假設您的應用程式中有三個不同的函數 (處理器),並且對於其中一個處理器,您不希望在應用程式啟動時啟動它。以下是這種情況的範例。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上述情況中,如果您將 spring.kafka.streams.auto-startup
設定為 false
,則在應用程式啟動期間,所有處理器都不會自動啟動。在這種情況下,您必須如上所述,透過在底層 StreamsBuilderFactoryManager
上呼叫 start()
以程式方式啟動它們。但是,如果我們有僅選擇性停用一個處理器的用例,則必須在該處理器的個別綁定上設定 auto-startup
。假設我們不希望我們的 process3
函數自動啟動。這是一個具有兩個輸入綁定的 BiFunction
- process3-in-0
和 process3-in-1
。為了避免此處理器的自動啟動,您可以選擇任何這些輸入綁定,並在其上設定 auto-startup
。您選擇哪個綁定並不重要;如果您願意,您可以將它們兩者上的 auto-startup
設定為 false
,但一個就足夠了。因為它們共用相同的 factory bean,所以您不必在兩個綁定上都將 autoStartup 設定為 false,但為了清晰起見,這樣做可能更有意義。
以下是您可以使用的 Spring Cloud Stream 屬性,以停用此處理器的自動啟動。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然後,您可以手動啟動處理器,可以使用 REST 端點或使用如下所示的 BindingsEndpoint
API。為此,您需要確保類路徑上具有 Spring Boot actuator 依賴項。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://127.0.0.1:8080/actuator/bindings/process3-in-0
或
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
請參閱參考文件中的此章節,以取得有關此機制的更多詳細資訊。
當透過停用 auto-startup 來控制綁定 (如本節所述) 時,請注意,這僅適用於消費者綁定。換句話說,如果您使用生產者綁定 process3-out-0 ,則在停用處理器的自動啟動方面沒有任何效果,儘管此生產者綁定與消費者綁定使用相同的 StreamsBuilderFactoryBean 。 |