序列化、反序列化和訊息轉換

概觀

Apache Kafka 為序列化和反序列化記錄值及其鍵提供了高階 API。它透過 org.apache.kafka.common.serialization.Serializer<T>org.apache.kafka.common.serialization.Deserializer<T> 抽象概念以及一些內建實作來呈現。同時,我們可以透過使用 ProducerConsumer 組態屬性來指定序列化器和反序列化器類別。以下範例示範如何執行此操作

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

對於更複雜或特殊的情況,KafkaConsumer(以及因此 KafkaProducer)提供了多載建構子,以接受 keysvaluesSerializerDeserializer 實例。

當您使用此 API 時,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 也提供屬性(透過建構子或 setter 方法),以將自訂 SerializerDeserializer 實例注入到目標 ProducerConsumer 中。此外,您可以透過建構子傳入 Supplier<Serializer>Supplier<Deserializer> 實例 - 這些 Supplier 會在每次建立 ProducerConsumer 時呼叫。

字串序列化

自 2.5 版起,Spring for Apache Kafka 提供了 ToStringSerializerParseStringDeserializer 類別,它們使用實體的字串表示形式。它們依賴 toString 方法和某些 Function<String>BiFunction<String, Headers> 來解析字串並填入實例的屬性。通常,這會調用類別上的某些靜態方法,例如 parse

ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);

預設情況下,ToStringSerializer 設定為在記錄 Headers 中傳達有關序列化實體的類型資訊。您可以透過將 addTypeInfo 屬性設定為 false 來停用此功能。接收端的 ParseStringDeserializer 可以使用此資訊。

  • ToStringSerializer.ADD_TYPE_INFO_HEADERS(預設值為 true):您可以將其設定為 false 以在 ToStringSerializer 上停用此功能(設定 addTypeInfo 屬性)。

ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});

您可以設定用於將 String 轉換為/從 byte[]Charset,預設值為 UTF-8

您可以使用 ConsumerConfig 屬性來設定具有解析器方法名稱的反序列化器

  • ParseStringDeserializer.KEY_PARSER

  • ParseStringDeserializer.VALUE_PARSER

這些屬性必須包含類別的完整限定名稱,後跟方法名稱,並以句點 . 分隔。該方法必須是靜態的,並且具有 (String, Headers)(String) 的簽章。

還提供了 ToFromStringSerde,用於 Kafka Streams。

JSON

Spring for Apache Kafka 還提供了基於 Jackson JSON 物件對應器的 JsonSerializerJsonDeserializer 實作。JsonSerializer 允許將任何 Java 物件寫入為 JSON byte[]JsonDeserializer 需要額外的 Class<?> targetType 引數,以允許將已消費的 byte[] 反序列化為正確的目標物件。以下範例示範如何建立 JsonDeserializer

JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);

您可以使用 ObjectMapper 自訂 JsonSerializerJsonDeserializer。您還可以擴充它們以在 configure(Map<String, ?> configs, boolean isKey) 方法中實作一些特定的組態邏輯。

從 2.3 版開始,所有 JSON 感知元件預設都使用 JacksonUtils.enhancedObjectMapper() 實例進行組態,該實例隨附停用的 MapperFeature.DEFAULT_VIEW_INCLUSIONDeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 功能。此外,此類實例還提供用於自訂資料類型的知名模組,例如 Java time 和 Kotlin 支援。有關更多資訊,請參閱 JacksonUtils.enhancedObjectMapper() JavaDocs。此方法還註冊了 org.springframework.kafka.support.JacksonMimeTypeModule,用於將 org.springframework.util.MimeType 物件序列化為純字串,以實現跨平台網路相容性。JacksonMimeTypeModule 可以註冊為應用程式內容中的 bean,並且將自動組態到 Spring Boot ObjectMapper 實例中。

同樣從 2.3 版開始,JsonDeserializer 提供了基於 TypeReference 的建構子,以更好地處理目標泛型容器類型。

從 2.1 版開始,您可以在記錄 Headers 中傳達類型資訊,從而允許處理多種類型。此外,您可以透過使用以下 Kafka 屬性來組態序列化器和反序列化器。如果您已為 KafkaConsumerKafkaProducer 提供 SerializerDeserializer 實例,則它們無效。

組態屬性

  • JsonSerializer.ADD_TYPE_INFO_HEADERS(預設值為 true):您可以將其設定為 false 以在 JsonSerializer 上停用此功能(設定 addTypeInfo 屬性)。

  • JsonSerializer.TYPE_MAPPINGS(預設值為 empty):請參閱 對應類型

  • JsonDeserializer.USE_TYPE_INFO_HEADERS(預設值為 true):您可以將其設定為 false 以忽略序列化器設定的標頭。

  • JsonDeserializer.REMOVE_TYPE_INFO_HEADERS(預設值為 true):您可以將其設定為 false 以保留序列化器設定的標頭。

  • JsonDeserializer.KEY_DEFAULT_TYPE:如果沒有標頭資訊,則用於反序列化鍵的回退類型。

  • JsonDeserializer.VALUE_DEFAULT_TYPE:如果沒有標頭資訊,則用於反序列化值的回退類型。

  • JsonDeserializer.TRUSTED_PACKAGES(預設值為 java.utiljava.lang):反序列化允許的套件模式逗號分隔清單。* 表示反序列化所有內容。

  • JsonDeserializer.TYPE_MAPPINGS(預設值為 empty):請參閱 對應類型

  • JsonDeserializer.KEY_TYPE_METHOD(預設值為 empty):請參閱 使用方法判斷類型

  • JsonDeserializer.VALUE_TYPE_METHOD(預設值為 empty):請參閱 使用方法判斷類型

從 2.2 版開始,類型資訊標頭(如果由序列化器新增)將由反序列化器移除。您可以透過將 removeTypeHeaders 屬性設定為 false 來恢復到先前的行為,可以直接在反序列化器上設定,也可以使用先前描述的組態屬性設定。

從 2.8 版開始,如果您以程式設計方式建構序列化器或反序列化器,如 程式化建構 中所示,只要您沒有明確設定任何屬性(使用 set*() 方法或使用流暢 API),工廠就會套用上述屬性。先前,當以程式設計方式建立時,永遠不會套用組態屬性;如果您直接在物件上明確設定屬性,則仍然是這種情況。

對應類型

從 2.2 版開始,當使用 JSON 時,您現在可以使用前面清單中的屬性來提供類型對應。先前,您必須自訂序列化器和反序列化器中的類型對應器。對應由 token:className 配對的逗號分隔清單組成。在出站時,酬載的類別名稱會對應到對應的權杖。在入站時,類型標頭中的權杖會對應到對應的類別名稱。

以下範例建立一組對應

senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
對應的物件必須相容。

如果您使用 Spring Boot,您可以在 application.properties(或 yaml)檔案中提供這些屬性。以下範例示範如何執行此操作

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat

您可以使用屬性執行簡單的組態。對於更進階的組態(例如在序列化器和反序列化器中使用自訂 ObjectMapper),您應該使用接受預先建置的序列化器和反序列化器的生產者和消費者工廠建構子。以下 Spring Boot 範例覆寫了預設工廠

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}

也提供了 Setter,作為使用這些建構子的替代方案。

當使用 Spring Boot 並覆寫 ConsumerFactoryProducerFactory(如上所示)時,必須將萬用字元泛型類型與 bean 方法傳回類型一起使用。如果改為提供具體的泛型類型,則 Spring Boot 將忽略這些 bean,並且仍然使用預設的 bean。

從 2.2 版開始,您可以明確地組態反序列化器以使用提供的目標類型,並透過使用其中一個具有布林值 useHeadersIfPresent 引數(預設值為 true)的多載建構子來忽略標頭中的類型資訊。以下範例示範如何執行此操作

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

使用方法判斷類型

從 2.5 版開始,您現在可以透過屬性組態反序列化器,以調用方法來判斷目標類型。如果存在,這將覆寫上述討論的任何其他技術。如果資料是由未使用 Spring 序列化器的應用程式發布的,並且您需要根據資料或其他標頭反序列化為不同的類型,則這會很有用。將這些屬性設定為方法名稱 - 完整限定的類別名稱,後跟方法名稱,並以句點 . 分隔。該方法必須宣告為 public static,具有三個簽章之一 (String topic, byte[] data, Headers headers)(byte[] data, Headers headers)(byte[] data),並傳回 Jackson JavaType

  • JsonDeserializer.KEY_TYPE_METHOD : spring.json.key.type.method

  • JsonDeserializer.VALUE_TYPE_METHOD : spring.json.value.type.method

您可以使用任意標頭或檢查資料來判斷類型。

JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}

對於更複雜的資料檢查,請考慮使用 JsonPath 或類似工具,但是,用於判斷類型的測試越簡單,流程效率就越高。

以下是以程式設計方式建立反序列化器的範例(當在建構子中為消費者工廠提供反序列化器時)

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}

程式化建構

自 2.3 版起,當以程式設計方式建構序列化器/反序列化器以用於生產者/消費者工廠時,您可以使用流暢 API,這簡化了組態。

@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}

若要以程式設計方式提供類型對應,類似於 使用方法判斷類型,請使用 typeFunction 屬性。

JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);

或者,只要您不使用流暢 API 來組態屬性,或使用 set*() 方法設定它們,工廠就會使用組態屬性組態序列化器/反序列化器;請參閱 組態屬性

委派序列化器和反序列化器

使用標頭

2.3 版引入了 DelegatingSerializerDelegatingDeserializer,它們允許生產和消費具有不同鍵和/或值類型的記錄。生產者必須將標頭 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 設定為選擇器值,該值用於選擇要用於值的序列化器,並將 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 用於鍵;如果找不到相符項,則會擲回 IllegalStateException

對於傳入記錄,反序列化器使用相同的標頭來選擇要使用的反序列化器;如果找不到相符項或標頭不存在,則會傳回原始 byte[]

您可以透過建構子組態選擇器到 Serializer / Deserializer 的對應,也可以透過 Kafka 生產者/消費者屬性以及鍵 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIGDelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG 來組態它。對於序列化器,生產者屬性可以是 Map<String, Object>,其中鍵是選擇器,值是 Serializer 實例、序列化器 Class 或類別名稱。該屬性也可以是逗號分隔的對應項目字串,如下所示。

對於反序列化器,消費者屬性可以是 Map<String, Object>,其中鍵是選擇器,值是 Deserializer 實例、反序列化器 Class 或類別名稱。該屬性也可以是逗號分隔的對應項目字串,如下所示。

若要使用屬性進行組態,請使用以下語法

producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")

然後,生產者會將 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 標頭設定為 thing1thing2

此技術支援將不同類型傳送到相同主題(或不同主題)。

從 2.5.1 版開始,如果類型(鍵或值)是 SerdesLongInteger 等)支援的標準類型之一,則無需設定選擇器標頭。相反,序列化器會將標頭設定為類型的類別名稱。無需為這些類型組態序列化器或反序列化器,它們將(一次)動態建立。

對於將不同類型傳送到不同主題的另一種技術,請參閱 使用 RoutingKafkaTemplate

依類型

2.8 版引入了 DelegatingByTypeSerializer

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}

從 2.8.3 版開始,您可以組態序列化器以檢查對應鍵是否可從目標物件指派,當委派序列化器可以序列化子類別時很有用。在這種情況下,如果存在不明確的相符項,則應提供已排序的 Map,例如 LinkedHashMap

依主題

從 2.8 版開始,DelegatingByTopicSerializerDelegatingByTopicDeserializer 允許根據主題名稱選擇序列化器/反序列化器。Regex Pattern 用於查閱要使用的實例。可以使用建構子或透過屬性(pattern:serializer 的逗號分隔清單)來組態對應。

producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());

當將其用於鍵時,請使用 KEY_SERIALIZATION_TOPIC_CONFIG

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}

當沒有模式相符項時,您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULTDelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 來指定要使用的預設序列化器/反序列化器。

另一個屬性 DelegatingByTopicSerialization.CASE_SENSITIVE(預設值為 true),當設定為 false 時,會使主題查閱不區分大小寫。

重試反序列化器

RetryingDeserializer 使用委派 DeserializerRetryTemplate,以便在委派可能發生暫時性錯誤(例如網路問題)時,在反序列化期間重試反序列化。

ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));

3.1.2 版開始,可以選擇性地在 RetryingDeserializer 上設定 RecoveryCallback

有關使用重試策略、退避策略等組態 RetryTemplate 的資訊,請參閱 spring-retry 專案。

Spring Messaging 訊息轉換

雖然從低階 Kafka ConsumerProducer 的角度來看,SerializerDeserializer API 非常簡單且靈活,但當使用 @KafkaListenerSpring Integration 的 Apache Kafka 支援時,您可能需要在 Spring Messaging 層級獲得更大的靈活性。為了讓您輕鬆地轉換為和從 org.springframework.messaging.Message 轉換,Spring for Apache Kafka 提供了 MessageConverter 抽象概念,以及 MessagingMessageConverter 實作及其 JsonMessageConverter(和子類別)自訂。您可以將 MessageConverter 直接注入到 KafkaTemplate 實例中,並透過使用 @KafkaListener.containerFactory() 屬性的 AbstractKafkaListenerContainerFactory bean 定義來注入。以下範例示範如何執行此操作

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}

當使用 Spring Boot 時,只需將轉換器定義為 @Bean,Spring Boot 自動組態就會將其連線到自動組態的範本和容器工廠。

當您使用 @KafkaListener 時,會將參數類型提供給訊息轉換器,以協助轉換。

只有在方法層級宣告 @KafkaListener 註解時,才能實現此類型推斷。對於類別層級的 @KafkaListener,酬載類型用於選擇要調用哪個 @KafkaHandler 方法,因此它必須在選擇方法之前已轉換。

在消費者端,您可以組態 JsonMessageConverter;它可以處理類型為 byte[]BytesStringConsumerRecord 值,因此應與 ByteArrayDeserializerBytesDeserializerStringDeserializer 結合使用。(byte[]Bytes 更有效率,因為它們避免了不必要的 byte[]String 轉換)。如果您願意,也可以組態與反序列化器對應的 JsonMessageConverter 的特定子類別。

在生產者端,當您使用 Spring Integration 或 KafkaTemplate.send(Message<?> message) 方法時(請參閱 使用 KafkaTemplate),您必須組態與已組態的 Kafka Serializer 相容的訊息轉換器。

  • 使用 StringSerializerStringJsonMessageConverter

  • 使用 BytesSerializerBytesJsonMessageConverter

  • 使用 ByteArraySerializerByteArrayJsonMessageConverter

同樣,使用 byte[]Bytes 更有效率,因為它們避免了 Stringbyte[] 的轉換。

為了方便起見,從 2.3 版開始,框架還提供了 StringOrBytesSerializer,它可以序列化所有三種值類型,因此可以與任何訊息轉換器一起使用。

從 2.7.1 版開始,訊息酬載轉換可以委派給 spring-messaging SmartMessageConverter;這使得可以根據 MessageHeaders.CONTENT_TYPE 標頭進行轉換。

呼叫 KafkaMessageConverter.fromMessage() 方法以進行出站轉換為 ProducerRecord,其中訊息酬載位於 ProducerRecord.value() 屬性中。呼叫 KafkaMessageConverter.toMessage() 方法以進行從 ConsumerRecord 進行的入站轉換,其中酬載為 ConsumerRecord.value() 屬性。呼叫 SmartMessageConverter.toMessage() 方法以從傳遞給 fromMessage()Message 建立新的出站 Message<?>(通常由 KafkaTemplate.send(Message<?> msg) 建立)。同樣,在 KafkaMessageConverter.toMessage() 方法中,在轉換器從 ConsumerRecord 建立新的 Message<?> 之後,會呼叫 SmartMessageConverter.fromMessage() 方法,然後使用新轉換的酬載建立最終的入站訊息。在任何一種情況下,如果 SmartMessageConverter 傳回 null,則會使用原始訊息。

當在 KafkaTemplate 和監聽器容器工廠中使用預設轉換器時,您可以透過在範本上呼叫 setMessagingConverter() 並透過 @KafkaListener 方法上的 contentTypeConverter 屬性來組態 SmartMessageConverter

範例

template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}

使用 Spring Data Projection 介面

從 2.1.1 版開始,您可以將 JSON 轉換為 Spring Data Projection 介面,而不是具體類型。這允許對資料進行非常有選擇性且低耦合的繫結,包括從 JSON 文件內的多個位置查閱值。例如,可以將以下介面定義為訊息酬載類型

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

預設情況下,存取器方法將用於查閱屬性名稱作為接收到的 JSON 文件中的欄位。@JsonPath 運算式允許自訂值查閱,甚至定義多個 JSON Path 運算式,以從多個位置查閱值,直到運算式傳回實際值。

若要啟用此功能,請使用組態了適當委派轉換器的 ProjectingMessageConverter(用於出站轉換和轉換非 projection 介面)。您還必須將 spring-data:spring-data-commonscom.jayway.jsonpath:json-path 新增到類別路徑中。

當用作 @KafkaListener 方法的參數時,介面類型會自動作為正常類型傳遞給轉換器。

使用 ErrorHandlingDeserializer

當反序列化器無法反序列化訊息時,Spring 無法處理此問題,因為此問題發生在 poll() 傳回之前。為了解決這個問題,引入了 ErrorHandlingDeserializer。此反序列化器委派給真正的反序列化器(鍵或值)。如果委派的反序列化器無法反序列化記錄內容,則 ErrorHandlingDeserializer 會傳回 null 值,並在包含原因和原始位元組的標頭中傳回 DeserializationException。當您使用記錄層級的 MessageListener 時,如果 ConsumerRecord 包含鍵或值的 DeserializationException 標頭,則容器的 ErrorHandler 會被呼叫,並傳入失敗的 ConsumerRecord。該記錄不會傳遞給監聽器。

或者,您可以設定 ErrorHandlingDeserializer 以透過提供 failedDeserializationFunction(一個 Function<FailedDeserializationInfo, T>)來建立自訂值。此函數會被呼叫以建立 T 的實例,該實例會以通常的方式傳遞給監聽器。類型為 FailedDeserializationInfo 的物件(包含所有上下文資訊)會提供給此函數。您可以在標頭中找到 DeserializationException(作為序列化的 Java 物件)。有關 ErrorHandlingDeserializer 的更多資訊,請參閱 Javadoc

您可以使用接受鍵和值 Deserializer 物件的 DefaultKafkaConsumerFactory 建構子,並連結您已使用適當委派設定的 ErrorHandlingDeserializer 實例。或者,您可以使用消費者組態屬性(ErrorHandlingDeserializer 使用這些屬性)來實例化委派。屬性名稱為 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASSErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。屬性值可以是類別或類別名稱。以下範例示範如何設定這些屬性

... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

以下範例使用 failedDeserializationFunction

public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}

前面的範例使用以下組態

...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消費者設定了 ErrorHandlingDeserializer,則務必將 KafkaTemplate 及其生產者設定為可以使用正常物件以及來自反序列化異常的原始 byte[] 值的序列化器。範本的泛型值類型應為 Object。一種技術是使用 DelegatingByTypeSerializer;以下範例示範了用法
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}

當將 ErrorHandlingDeserializer 與批次監聽器搭配使用時,您必須檢查訊息標頭中的反序列化異常。當與 DefaultBatchErrorHandler 一起使用時,您可以使用該標頭來判斷哪個記錄發生異常失敗,並透過 BatchListenerFailedException 與錯誤處理器溝通。

@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}

SerializationUtils.byteArrayToDeserializationException() 可用於將標頭轉換為 DeserializationException

當使用 List<ConsumerRecord<?, ?> 時,改為使用 SerializationUtils.getExceptionFromHeader()

@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
如果您也使用 DeadLetterPublishingRecoverer,則為 DeserializationException 發佈的記錄將具有 byte[] 類型的 record.value();這不應被序列化。考慮使用 DelegatingByTypeSerializer,將其設定為對 byte[] 使用 ByteArraySerializer,對所有其他類型使用正常序列化器(Json、Avro 等)。

從 3.1 版開始,您可以將 Validator 新增至 ErrorHandlingDeserializer。如果委派的 Deserializer 成功反序列化物件,但該物件驗證失敗,則會拋出類似於發生反序列化異常的異常。這允許將原始原始資料傳遞給錯誤處理器。當您自行建立反序列化器時,只需呼叫 setValidator;如果您使用屬性設定序列化器,請將消費者組態屬性 ErrorHandlingDeserializer.VALIDATOR_CLASS 設定為您的 Validator 的類別或完整類別名稱。當使用 Spring Boot 時,此屬性名稱為 spring.kafka.consumer.properties.spring.deserializer.validator.class

批次處理的有效負載轉換

當您使用批次監聽器容器工廠時,您也可以在 BatchMessagingMessageConverter 中使用 JsonMessageConverter 來轉換批次訊息。有關更多資訊,請參閱序列化、反序列化和訊息轉換Spring Messaging 訊息轉換

預設情況下,轉換的類型是從監聽器引數推斷出來的。如果您使用 DefaultJackson2TypeMapper 設定 JsonMessageConverter,並將其 TypePrecedence 設定為 TYPE_ID(而不是預設的 INFERRED),則轉換器會改為使用標頭中的類型資訊(如果存在)。例如,這允許使用介面而不是具體類別來宣告監聽器方法。此外,類型轉換器支援映射,因此反序列化可以轉換為與來源不同的類型(只要資料相容)。當您使用類別層級 @KafkaListener 實例時,這也很有用,在這種情況下,有效負載必須已轉換才能確定要調用哪個方法。以下範例建立使用此方法的 bean

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}

請注意,為了使其正常運作,轉換目標的方法簽章必須是具有單一泛型參數類型的容器物件,例如以下範例

@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

請注意,您仍然可以存取批次標頭。

如果批次轉換器具有支援它的記錄轉換器,您也可以接收訊息列表,其中有效負載根據泛型類型進行轉換。以下範例示範如何執行此操作

@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}

ConversionService 自訂

從 2.1.1 版開始,預設 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory 使用的 org.springframework.core.convert.ConversionService,用於解析監聽器方法調用的參數,將會提供給所有實作以下任何介面的 bean

  • org.springframework.core.convert.converter.Converter

  • org.springframework.core.convert.converter.GenericConverter

  • org.springframework.format.Formatter

這讓您可以進一步自訂監聽器反序列化,而無需更改 ConsumerFactoryKafkaListenerContainerFactory 的預設組態。

透過 KafkaListenerConfigurer bean 在 KafkaListenerEndpointRegistrar 上設定自訂 MessageHandlerMethodFactory 會停用此功能。

@KafkaListener 新增自訂 HandlerMethodArgumentResolver

從 2.4.2 版開始,您可以新增自己的 HandlerMethodArgumentResolver 並解析自訂方法參數。您只需要實作 KafkaListenerConfigurer 並使用類別 KafkaListenerEndpointRegistrar 中的方法 setCustomMethodArgumentResolvers()

@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}

您也可以透過將自訂 MessageHandlerMethodFactory 新增至 KafkaListenerEndpointRegistrar bean 來完全取代框架的引數解析。如果您這樣做,並且您的應用程式需要處理 tombstone 記錄,以及 null value()(例如來自壓縮主題),您應該將 KafkaNullAwarePayloadArgumentResolver 新增至工廠;它必須是最後一個解析器,因為它支援所有類型,並且可以比對沒有 @Payload 註解的引數。如果您使用 DefaultMessageHandlerMethodFactory,請將此解析器設定為最後一個自訂解析器;工廠將確保此解析器將在標準 PayloadMethodArgumentResolver 之前使用,後者不了解 KafkaNull 有效負載。