在 Kafka Streams binder 中繫結視覺化和控制

從 3.1.2 版本開始,Kafka Streams binder 支援繫結視覺化和控制。唯一支援的生命週期階段是 STOPPEDSTARTED。生命週期階段 PAUSEDRESUMED 在 Kafka Streams binder 中不可用。

為了啟用繫結視覺化和控制,應用程式需要包含以下兩個相依性。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

如果您偏好使用 webflux,那麼您可以包含 spring-boot-starter-webflux 而不是標準的 web 相依性。

此外,您還需要設定以下屬性

management.endpoints.web.exposure.include=bindings

為了進一步說明此功能,讓我們使用以下應用程式作為指南

@SpringBootApplication
public class KafkaStreamsApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaStreamsApplication.class, args);
	}

	@Bean
	public Consumer<KStream<String, String>> consumer() {
		return s -> s.foreach((key, value) -> System.out.println(value));
	}

	@Bean
	public Function<KStream<String, String>, KStream<String, String>> function() {
		return ks -> ks;
	}

}

正如我們所見,該應用程式有兩個 Kafka Streams 函數 - 一個是消費者,另一個是函數。消費者繫結預設命名為 consumer-in-0。同樣地,對於該函數,輸入繫結為 function-in-0,輸出繫結為 function-out-0

應用程式啟動後,我們可以透過以下繫結端點找到有關繫結的詳細資訊。

 curl https://127.0.0.1:8080/actuator/bindings | jq .
[
  {
    "bindingName": "consumer-in-0",
    "name": "consumer-in-0",
    "group": "consumer-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-in-0",
    "name": "function-in-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-out-0",
    "name": "function-out-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": false,
    "extendedInfo": {}
  }
]

以上可以找到有關所有三個繫結的詳細資訊。

現在讓我們停止 consumer-in-0 繫結。

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST https://127.0.0.1:8080/actuator/bindings/consumer-in-0

此時,將不會透過此繫結接收任何記錄。

再次啟動繫結。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://127.0.0.1:8080/actuator/bindings/consumer-in-0

當單一函數上存在多個繫結時,在這些繫結中的任何一個上調用這些操作都將起作用。這是因為單一函數上的所有繫結都由相同的 StreamsBuilderFactoryBean 支援。因此,對於上面的函數,function-in-0function-out-0 都可以運作。