監控

監控監聽器效能

從 2.3 版開始,如果偵測到類別路徑上有 Micrometer,且應用程式內容中存在單一 MeterRegistry,則監聽器容器將自動為監聽器建立和更新 Micrometer Timer。可以透過將 ContainerPropertymicrometerEnabled 設定為 false 來停用計時器。

維護兩個計時器 - 一個用於成功呼叫監聽器,另一個用於失敗。

計時器命名為 spring.kafka.listener,並具有以下標籤

  • name : (容器 Bean 名稱)

  • result : successfailure

  • exception : noneListenerExecutionFailedException

您可以使用 ContainerPropertiesmicrometerTags 屬性新增其他標籤。

從 2.9.8、3.0.6 版開始,您可以在 ContainerPropertiesmicrometerTagsProvider 中提供函數;此函數接收 ConsumerRecord<?, ?> 並傳回可基於該記錄的標籤,並與 micrometerTags 中的任何靜態標籤合併。

使用並行容器時,會為每個執行緒建立計時器,且 name 標籤的後綴為 -n,其中 n 為 0concurrency-1

監控 KafkaTemplate 效能

從 2.5 版開始,如果偵測到類別路徑上有 Micrometer,且應用程式內容中存在單一 MeterRegistry,則範本將自動為傳送操作建立和更新 Micrometer Timer。可以透過將範本的 micrometerEnabled 屬性設定為 false 來停用計時器。

維護兩個計時器 - 一個用於成功呼叫監聽器,另一個用於失敗。

計時器命名為 spring.kafka.template,並具有以下標籤

  • name : (範本 Bean 名稱)

  • result : successfailure

  • exception : none 或失敗的例外類別名稱

您可以使用範本的 micrometerTags 屬性新增其他標籤。

從 2.9.8、3.0.6 版開始,您可以提供 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) 屬性;此函數接收 ProducerRecord<?, ?> 並傳回可基於該記錄的標籤,並與 micrometerTags 中的任何靜態標籤合併。

Micrometer 原生指標

從 2.5 版開始,框架提供Factory 監聽器,以便在建立和關閉生產者和消費者時管理 Micrometer KafkaClientMetrics 實例。

若要啟用此功能,只需將監聽器新增至您的生產者和消費者 Factory

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

傳遞至監聽器的消費者/生產者 id 會新增至儀表的標籤,標籤名稱為 spring.id

取得 Kafka 指標範例之一
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

StreamsBuilderFactoryBean 也提供類似的監聽器 - 請參閱KafkaStreams Micrometer 支援

Micrometer Observation

自 3.0 版起,KafkaTemplate 和監聽器容器現在支援使用 Micrometer 進行觀察。

KafkaTemplateContainerProperties 上的 observationEnabled 設定為 true 以啟用觀察;這將停用Micrometer 計時器,因為計時器現在將透過每次觀察進行管理。

Micrometer Observation 不支援批次監聽器;這將啟用 Micrometer 計時器

有關更多資訊,請參閱Micrometer Tracing

若要將標籤新增至計時器/追蹤,請分別為範本或監聽器容器組態自訂 KafkaTemplateObservationConventionKafkaListenerObservationConvention

預設實作會為範本觀察新增 bean.name 標籤,並為容器新增 listener.id 標籤。

您可以子類別化 DefaultKafkaTemplateObservationConventionDefaultKafkaListenerObservationConvention,或提供全新的實作。

請參閱Micrometer Observation 文件,以取得記錄的預設觀察詳細資訊。

從 3.0.6 版開始,您可以根據消費者或生產者記錄中的資訊,將動態標籤新增至計時器和追蹤。若要這麼做,請分別將自訂 KafkaListenerObservationConvention 和/或 KafkaTemplateObservationConvention 新增至監聽器容器屬性或 KafkaTemplate。這兩個觀察內容中的 record 屬性分別包含 ConsumerRecordProducerRecord

發送者和接收者內容 remoteServiceName 屬性設定為 Kafka clusterId 屬性;這是由 KafkaAdmin 擷取的。如果由於某些原因 (例如缺乏管理員權限) 而無法擷取叢集 ID,則從 3.1 版開始,您可以在 KafkaAdmin 上設定手動 clusterId,並將其注入 KafkaTemplate 和監聽器容器中。當它為 null (預設值) 時,管理員將調用 describeCluster 管理員操作以從代理程式擷取它。