使用 Spring Cloud Sleuth 追蹤

當 Spring Cloud Sleuth 位於基於 Spring Cloud Stream Kafka Streams binder 的應用程式的 classpath 中時,其消費者和生產者都會自動檢測追蹤資訊。但是,為了追蹤任何應用程式特定的操作,需要由使用者程式碼顯式地檢測這些操作。這可以通過在應用程式中注入來自 Spring Cloud Sleuth 的 KafkaStreamsTracing bean,然後通過注入的 bean 調用各種 Kafka Streams 操作來完成。以下是一些使用範例。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
                LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
                return new KeyValue<>(value.getRegion(),
                        value.getClicks());
            }))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum, Materialized.as(CLICK_UPDATES))
            .toStream());
}

在上面的範例中,有兩個地方新增了顯式的追蹤檢測。首先,我們正在記錄來自傳入的 KStream 的鍵/值資訊。當記錄此資訊時,關聯的 span 和追蹤 ID 也會被記錄下來,以便監控系統可以追蹤它們並與相同的 span id 關聯。其次,當我們調用 map 操作時,我們不是直接在 KStream 類別上調用它,而是將其包裝在 transform 操作中,然後從 KafkaStreamsTracing 調用 map。在這種情況下,記錄的訊息也將包含 span ID 和追蹤 ID。

這是另一個範例,我們在其中使用低階轉換器 API 來存取各種 Kafka Streams 標頭。當 spring-cloud-sleuth 在 classpath 上時,所有追蹤標頭也可以像這樣存取。

@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
    return input -> input.transform(kafkaStreamsTracing.transformer(
            "transformer-1",
            () -> new Transformer<String, String, KeyValue<String, String>>() {
                ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    LOG.info("Headers: " + this.context.headers());
                    LOG.info("K/V:" + key + "/" + value);
                    // More transformations, business logic execution, etc. go here.
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                }
            }));
}