自訂 Kafka Binder 健康指示器
覆寫預設 Kafka Binder 健康指示器
當 Spring Boot Actuator 位於類別路徑上時,Kafka binder 會啟動預設的健康指示器。此健康指示器會檢查 binder 的健康狀況,以及與 Kafka broker 的任何通訊問題。如果應用程式想要停用此預設健康檢查實作並包含自訂實作,則可以為 KafkaBinderHealth
介面提供實作。KafkaBinderHealth
是一個標記介面,它擴充自 HealthIndicator
。在自訂實作中,它必須為 health()
方法提供實作。自訂實作必須以 bean 的形式存在於應用程式組態中。當 binder 發現自訂實作時,它將使用該實作來取代預設實作。以下是在應用程式中此類自訂實作 bean 的範例。
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
自訂 kafka Binder 健康指示器範例
以下是撰寫自訂 Kafka binder HealthIndicator 的虛擬碼。在此範例中,我們嘗試覆寫 binder 提供的 Kafka HealthIndicator,方法是先特別檢查叢集連線能力,然後再檢查與主題相關的問題。
首先,我們需要建立 KafkaBinderHealth
介面的自訂實作。
public class KafkaBinderHealthImplementation implements KafkaBinderHealth {
@Value("${spring.cloud.bus.destination}")
private String topic;
private final AdminClient client;
public KafkaBinderHealthImplementation(final KafkaAdmin admin) {
// More about configuring Kafka
// https://spring-docs.dev.org.tw/spring-kafka/reference/html/#configuring-topics
this.client = AdminClient.create(admin.getConfigurationProperties());
}
@Override
public Health health() {
if (!checkBrokersConnection()) {
logger.error("Error when connect brokers");
return Health.down().withDetail("BrokersConnectionError", "Error message").build();
}
if (!checkTopicConnection()) {
logger.error("Error when trying to connect with specific topic");
return Health.down().withDetail("TopicError", "Error message with topic name").build();
}
return Health.up().build();
}
public boolean checkBrokersConnection() {
// Your implementation
}
public boolean checkTopicConnection() {
// Your implementation
}
}
然後,我們需要為自訂實作建立一個 bean。
@Configuration
public class KafkaBinderHealthIndicatorConfiguration {
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator(final KafkaAdmin admin) {
return new KafkaBinderHealthImplementation(admin);
}
}