序列化、反序列化和訊息轉換
概觀
Apache Kafka 為序列化和反序列化記錄值及其鍵提供了高階 API。它透過 org.apache.kafka.common.serialization.Serializer<T>
和 org.apache.kafka.common.serialization.Deserializer<T>
抽象概念以及一些內建實作來呈現。同時,我們可以透過使用 Producer
或 Consumer
組態屬性來指定序列化器和反序列化器類別。以下範例示範如何執行此操作
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
對於更複雜或特殊的情況,KafkaConsumer
(以及因此 KafkaProducer
)提供了多載建構子,以接受 keys
和 values
的 Serializer
和 Deserializer
實例。
當您使用此 API 時,DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
也提供屬性(透過建構子或 setter 方法),以將自訂 Serializer
和 Deserializer
實例注入到目標 Producer
或 Consumer
中。此外,您可以透過建構子傳入 Supplier<Serializer>
或 Supplier<Deserializer>
實例 - 這些 Supplier
會在每次建立 Producer
或 Consumer
時呼叫。
字串序列化
自 2.5 版起,Spring for Apache Kafka 提供了 ToStringSerializer
和 ParseStringDeserializer
類別,它們使用實體的字串表示形式。它們依賴 toString
方法和某些 Function<String>
或 BiFunction<String, Headers>
來解析字串並填入實例的屬性。通常,這會調用類別上的某些靜態方法,例如 parse
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
預設情況下,ToStringSerializer
設定為在記錄 Headers
中傳達有關序列化實體的類型資訊。您可以透過將 addTypeInfo
屬性設定為 false
來停用此功能。接收端的 ParseStringDeserializer
可以使用此資訊。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS
(預設值為true
):您可以將其設定為false
以在ToStringSerializer
上停用此功能(設定addTypeInfo
屬性)。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
您可以設定用於將 String
轉換為/從 byte[]
的 Charset
,預設值為 UTF-8
。
您可以使用 ConsumerConfig
屬性來設定具有解析器方法名稱的反序列化器
-
ParseStringDeserializer.KEY_PARSER
-
ParseStringDeserializer.VALUE_PARSER
這些屬性必須包含類別的完整限定名稱,後跟方法名稱,並以句點 .
分隔。該方法必須是靜態的,並且具有 (String, Headers)
或 (String)
的簽章。
還提供了 ToFromStringSerde
,用於 Kafka Streams。
JSON
Spring for Apache Kafka 還提供了基於 Jackson JSON 物件對應器的 JsonSerializer
和 JsonDeserializer
實作。JsonSerializer
允許將任何 Java 物件寫入為 JSON byte[]
。JsonDeserializer
需要額外的 Class<?> targetType
引數,以允許將已消費的 byte[]
反序列化為正確的目標物件。以下範例示範如何建立 JsonDeserializer
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以使用 ObjectMapper
自訂 JsonSerializer
和 JsonDeserializer
。您還可以擴充它們以在 configure(Map<String, ?> configs, boolean isKey)
方法中實作一些特定的組態邏輯。
從 2.3 版開始,所有 JSON 感知元件預設都使用 JacksonUtils.enhancedObjectMapper()
實例進行組態,該實例隨附停用的 MapperFeature.DEFAULT_VIEW_INCLUSION
和 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES
功能。此外,此類實例還提供用於自訂資料類型的知名模組,例如 Java time 和 Kotlin 支援。有關更多資訊,請參閱 JacksonUtils.enhancedObjectMapper()
JavaDocs。此方法還註冊了 org.springframework.kafka.support.JacksonMimeTypeModule
,用於將 org.springframework.util.MimeType
物件序列化為純字串,以實現跨平台網路相容性。JacksonMimeTypeModule
可以註冊為應用程式內容中的 bean,並且將自動組態到 Spring Boot ObjectMapper
實例中。
同樣從 2.3 版開始,JsonDeserializer
提供了基於 TypeReference
的建構子,以更好地處理目標泛型容器類型。
從 2.1 版開始,您可以在記錄 Headers
中傳達類型資訊,從而允許處理多種類型。此外,您可以透過使用以下 Kafka 屬性來組態序列化器和反序列化器。如果您已為 KafkaConsumer
和 KafkaProducer
提供 Serializer
和 Deserializer
實例,則它們無效。
組態屬性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS
(預設值為true
):您可以將其設定為false
以在JsonSerializer
上停用此功能(設定addTypeInfo
屬性)。 -
JsonSerializer.TYPE_MAPPINGS
(預設值為empty
):請參閱 對應類型。 -
JsonDeserializer.USE_TYPE_INFO_HEADERS
(預設值為true
):您可以將其設定為false
以忽略序列化器設定的標頭。 -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS
(預設值為true
):您可以將其設定為false
以保留序列化器設定的標頭。 -
JsonDeserializer.KEY_DEFAULT_TYPE
:如果沒有標頭資訊,則用於反序列化鍵的回退類型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果沒有標頭資訊,則用於反序列化值的回退類型。 -
JsonDeserializer.TRUSTED_PACKAGES
(預設值為java.util
、java.lang
):反序列化允許的套件模式逗號分隔清單。*
表示反序列化所有內容。 -
JsonDeserializer.TYPE_MAPPINGS
(預設值為empty
):請參閱 對應類型。 -
JsonDeserializer.KEY_TYPE_METHOD
(預設值為empty
):請參閱 使用方法判斷類型。 -
JsonDeserializer.VALUE_TYPE_METHOD
(預設值為empty
):請參閱 使用方法判斷類型。
從 2.2 版開始,類型資訊標頭(如果由序列化器新增)將由反序列化器移除。您可以透過將 removeTypeHeaders
屬性設定為 false
來恢復到先前的行為,可以直接在反序列化器上設定,也可以使用先前描述的組態屬性設定。
從 2.8 版開始,如果您以程式設計方式建構序列化器或反序列化器,如 程式化建構 中所示,只要您沒有明確設定任何屬性(使用 set*() 方法或使用流暢 API),工廠就會套用上述屬性。先前,當以程式設計方式建立時,永遠不會套用組態屬性;如果您直接在物件上明確設定屬性,則仍然是這種情況。 |
對應類型
從 2.2 版開始,當使用 JSON 時,您現在可以使用前面清單中的屬性來提供類型對應。先前,您必須自訂序列化器和反序列化器中的類型對應器。對應由 token:className
配對的逗號分隔清單組成。在出站時,酬載的類別名稱會對應到對應的權杖。在入站時,類型標頭中的權杖會對應到對應的類別名稱。
以下範例建立一組對應
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
對應的物件必須相容。 |
如果您使用 Spring Boot,您可以在 application.properties
(或 yaml)檔案中提供這些屬性。以下範例示範如何執行此操作
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
您可以使用屬性執行簡單的組態。對於更進階的組態(例如在序列化器和反序列化器中使用自訂
也提供了 Setter,作為使用這些建構子的替代方案。 |
當使用 Spring Boot 並覆寫 ConsumerFactory 和 ProducerFactory (如上所示)時,必須將萬用字元泛型類型與 bean 方法傳回類型一起使用。如果改為提供具體的泛型類型,則 Spring Boot 將忽略這些 bean,並且仍然使用預設的 bean。 |
從 2.2 版開始,您可以明確地組態反序列化器以使用提供的目標類型,並透過使用其中一個具有布林值 useHeadersIfPresent
引數(預設值為 true
)的多載建構子來忽略標頭中的類型資訊。以下範例示範如何執行此操作
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法判斷類型
從 2.5 版開始,您現在可以透過屬性組態反序列化器,以調用方法來判斷目標類型。如果存在,這將覆寫上述討論的任何其他技術。如果資料是由未使用 Spring 序列化器的應用程式發布的,並且您需要根據資料或其他標頭反序列化為不同的類型,則這會很有用。將這些屬性設定為方法名稱 - 完整限定的類別名稱,後跟方法名稱,並以句點 .
分隔。該方法必須宣告為 public static
,具有三個簽章之一 (String topic, byte[] data, Headers headers)
、(byte[] data, Headers headers)
或 (byte[] data)
,並傳回 Jackson JavaType
。
-
JsonDeserializer.KEY_TYPE_METHOD
:spring.json.key.type.method
-
JsonDeserializer.VALUE_TYPE_METHOD
:spring.json.value.type.method
您可以使用任意標頭或檢查資料來判斷類型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
對於更複雜的資料檢查,請考慮使用 JsonPath
或類似工具,但是,用於判斷類型的測試越簡單,流程效率就越高。
以下是以程式設計方式建立反序列化器的範例(當在建構子中為消費者工廠提供反序列化器時)
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程式化建構
自 2.3 版起,當以程式設計方式建構序列化器/反序列化器以用於生產者/消費者工廠時,您可以使用流暢 API,這簡化了組態。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
若要以程式設計方式提供類型對應,類似於 使用方法判斷類型,請使用 typeFunction
屬性。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要您不使用流暢 API 來組態屬性,或使用 set*()
方法設定它們,工廠就會使用組態屬性組態序列化器/反序列化器;請參閱 組態屬性。
委派序列化器和反序列化器
使用標頭
2.3 版引入了 DelegatingSerializer
和 DelegatingDeserializer
,它們允許生產和消費具有不同鍵和/或值類型的記錄。生產者必須將標頭 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
設定為選擇器值,該值用於選擇要用於值的序列化器,並將 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR
用於鍵;如果找不到相符項,則會擲回 IllegalStateException
。
對於傳入記錄,反序列化器使用相同的標頭來選擇要使用的反序列化器;如果找不到相符項或標頭不存在,則會傳回原始 byte[]
。
您可以透過建構子組態選擇器到 Serializer
/ Deserializer
的對應,也可以透過 Kafka 生產者/消費者屬性以及鍵 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG
和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG
來組態它。對於序列化器,生產者屬性可以是 Map<String, Object>
,其中鍵是選擇器,值是 Serializer
實例、序列化器 Class
或類別名稱。該屬性也可以是逗號分隔的對應項目字串,如下所示。
對於反序列化器,消費者屬性可以是 Map<String, Object>
,其中鍵是選擇器,值是 Deserializer
實例、反序列化器 Class
或類別名稱。該屬性也可以是逗號分隔的對應項目字串,如下所示。
若要使用屬性進行組態,請使用以下語法
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
然後,生產者會將 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR
標頭設定為 thing1
或 thing2
。
此技術支援將不同類型傳送到相同主題(或不同主題)。
從 2.5.1 版開始,如果類型(鍵或值)是 Serdes (Long 、Integer 等)支援的標準類型之一,則無需設定選擇器標頭。相反,序列化器會將標頭設定為類型的類別名稱。無需為這些類型組態序列化器或反序列化器,它們將(一次)動態建立。 |
對於將不同類型傳送到不同主題的另一種技術,請參閱 使用 RoutingKafkaTemplate
。
依類型
2.8 版引入了 DelegatingByTypeSerializer
。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
從 2.8.3 版開始,您可以組態序列化器以檢查對應鍵是否可從目標物件指派,當委派序列化器可以序列化子類別時很有用。在這種情況下,如果存在不明確的相符項,則應提供已排序的 Map
,例如 LinkedHashMap
。
依主題
從 2.8 版開始,DelegatingByTopicSerializer
和 DelegatingByTopicDeserializer
允許根據主題名稱選擇序列化器/反序列化器。Regex Pattern
用於查閱要使用的實例。可以使用建構子或透過屬性(pattern:serializer
的逗號分隔清單)來組態對應。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
當將其用於鍵時,請使用 KEY_SERIALIZATION_TOPIC_CONFIG
。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
當沒有模式相符項時,您可以使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT
和 DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT
來指定要使用的預設序列化器/反序列化器。
另一個屬性 DelegatingByTopicSerialization.CASE_SENSITIVE
(預設值為 true
),當設定為 false
時,會使主題查閱不區分大小寫。
重試反序列化器
RetryingDeserializer
使用委派 Deserializer
和 RetryTemplate
,以便在委派可能發生暫時性錯誤(例如網路問題)時,在反序列化期間重試反序列化。
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
從 3.1.2
版開始,可以選擇性地在 RetryingDeserializer
上設定 RecoveryCallback
。
有關使用重試策略、退避策略等組態 RetryTemplate
的資訊,請參閱 spring-retry 專案。
Spring Messaging 訊息轉換
雖然從低階 Kafka Consumer
和 Producer
的角度來看,Serializer
和 Deserializer
API 非常簡單且靈活,但當使用 @KafkaListener
或 Spring Integration 的 Apache Kafka 支援時,您可能需要在 Spring Messaging 層級獲得更大的靈活性。為了讓您輕鬆地轉換為和從 org.springframework.messaging.Message
轉換,Spring for Apache Kafka 提供了 MessageConverter
抽象概念,以及 MessagingMessageConverter
實作及其 JsonMessageConverter
(和子類別)自訂。您可以將 MessageConverter
直接注入到 KafkaTemplate
實例中,並透過使用 @KafkaListener.containerFactory()
屬性的 AbstractKafkaListenerContainerFactory
bean 定義來注入。以下範例示範如何執行此操作
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
當使用 Spring Boot 時,只需將轉換器定義為 @Bean
,Spring Boot 自動組態就會將其連線到自動組態的範本和容器工廠。
當您使用 @KafkaListener
時,會將參數類型提供給訊息轉換器,以協助轉換。
只有在方法層級宣告 |
在消費者端,您可以組態 在生產者端,當您使用 Spring Integration 或
同樣,使用 為了方便起見,從 2.3 版開始,框架還提供了 |
從 2.7.1 版開始,訊息酬載轉換可以委派給 spring-messaging
SmartMessageConverter
;這使得可以根據 MessageHeaders.CONTENT_TYPE
標頭進行轉換。
呼叫 KafkaMessageConverter.fromMessage() 方法以進行出站轉換為 ProducerRecord ,其中訊息酬載位於 ProducerRecord.value() 屬性中。呼叫 KafkaMessageConverter.toMessage() 方法以進行從 ConsumerRecord 進行的入站轉換,其中酬載為 ConsumerRecord.value() 屬性。呼叫 SmartMessageConverter.toMessage() 方法以從傳遞給 fromMessage() 的 Message 建立新的出站 Message<?> (通常由 KafkaTemplate.send(Message<?> msg) 建立)。同樣,在 KafkaMessageConverter.toMessage() 方法中,在轉換器從 ConsumerRecord 建立新的 Message<?> 之後,會呼叫 SmartMessageConverter.fromMessage() 方法,然後使用新轉換的酬載建立最終的入站訊息。在任何一種情況下,如果 SmartMessageConverter 傳回 null ,則會使用原始訊息。 |
當在 KafkaTemplate
和監聽器容器工廠中使用預設轉換器時,您可以透過在範本上呼叫 setMessagingConverter()
並透過 @KafkaListener
方法上的 contentTypeConverter
屬性來組態 SmartMessageConverter
。
範例
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data Projection 介面
從 2.1.1 版開始,您可以將 JSON 轉換為 Spring Data Projection 介面,而不是具體類型。這允許對資料進行非常有選擇性且低耦合的繫結,包括從 JSON 文件內的多個位置查閱值。例如,可以將以下介面定義為訊息酬載類型
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
預設情況下,存取器方法將用於查閱屬性名稱作為接收到的 JSON 文件中的欄位。@JsonPath
運算式允許自訂值查閱,甚至定義多個 JSON Path 運算式,以從多個位置查閱值,直到運算式傳回實際值。
若要啟用此功能,請使用組態了適當委派轉換器的 ProjectingMessageConverter
(用於出站轉換和轉換非 projection 介面)。您還必須將 spring-data:spring-data-commons
和 com.jayway.jsonpath:json-path
新增到類別路徑中。
當用作 @KafkaListener
方法的參數時,介面類型會自動作為正常類型傳遞給轉換器。
使用 ErrorHandlingDeserializer
當反序列化器無法反序列化訊息時,Spring 無法處理此問題,因為此問題發生在 poll()
傳回之前。為了解決這個問題,引入了 ErrorHandlingDeserializer
。此反序列化器委派給真正的反序列化器(鍵或值)。如果委派的反序列化器無法反序列化記錄內容,則 ErrorHandlingDeserializer
會傳回 null
值,並在包含原因和原始位元組的標頭中傳回 DeserializationException
。當您使用記錄層級的 MessageListener
時,如果 ConsumerRecord
包含鍵或值的 DeserializationException
標頭,則容器的 ErrorHandler
會被呼叫,並傳入失敗的 ConsumerRecord
。該記錄不會傳遞給監聽器。
或者,您可以設定 ErrorHandlingDeserializer
以透過提供 failedDeserializationFunction
(一個 Function<FailedDeserializationInfo, T>
)來建立自訂值。此函數會被呼叫以建立 T
的實例,該實例會以通常的方式傳遞給監聽器。類型為 FailedDeserializationInfo
的物件(包含所有上下文資訊)會提供給此函數。您可以在標頭中找到 DeserializationException
(作為序列化的 Java 物件)。有關 ErrorHandlingDeserializer
的更多資訊,請參閱 Javadoc。
您可以使用接受鍵和值 Deserializer
物件的 DefaultKafkaConsumerFactory
建構子,並連結您已使用適當委派設定的 ErrorHandlingDeserializer
實例。或者,您可以使用消費者組態屬性(ErrorHandlingDeserializer
使用這些屬性)來實例化委派。屬性名稱為 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS
和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS
。屬性值可以是類別或類別名稱。以下範例示範如何設定這些屬性
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下範例使用 failedDeserializationFunction
。
public class BadThing extends Thing {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {
@Override
public Thing apply(FailedDeserializationInfo info) {
return new BadThing(info);
}
}
前面的範例使用以下組態
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
如果消費者設定了 ErrorHandlingDeserializer ,則務必將 KafkaTemplate 及其生產者設定為可以使用正常物件以及來自反序列化異常的原始 byte[] 值的序列化器。範本的泛型值類型應為 Object 。一種技術是使用 DelegatingByTypeSerializer ;以下範例示範了用法 |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
當將 ErrorHandlingDeserializer
與批次監聽器搭配使用時,您必須檢查訊息標頭中的反序列化異常。當與 DefaultBatchErrorHandler
一起使用時,您可以使用該標頭來判斷哪個記錄發生異常失敗,並透過 BatchListenerFailedException
與錯誤處理器溝通。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
try {
DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
}
catch (Exception ex) {
logger.error(ex, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
SerializationUtils.byteArrayToDeserializationException()
可用於將標頭轉換為 DeserializationException
。
當使用 List<ConsumerRecord<?, ?>
時,改為使用 SerializationUtils.getExceptionFromHeader()
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果您也使用 DeadLetterPublishingRecoverer ,則為 DeserializationException 發佈的記錄將具有 byte[] 類型的 record.value() ;這不應被序列化。考慮使用 DelegatingByTypeSerializer ,將其設定為對 byte[] 使用 ByteArraySerializer ,對所有其他類型使用正常序列化器(Json、Avro 等)。 |
從 3.1 版開始,您可以將 Validator
新增至 ErrorHandlingDeserializer
。如果委派的 Deserializer
成功反序列化物件,但該物件驗證失敗,則會拋出類似於發生反序列化異常的異常。這允許將原始原始資料傳遞給錯誤處理器。當您自行建立反序列化器時,只需呼叫 setValidator
;如果您使用屬性設定序列化器,請將消費者組態屬性 ErrorHandlingDeserializer.VALIDATOR_CLASS
設定為您的 Validator
的類別或完整類別名稱。當使用 Spring Boot 時,此屬性名稱為 spring.kafka.consumer.properties.spring.deserializer.validator.class
。
批次處理的有效負載轉換
當您使用批次監聽器容器工廠時,您也可以在 BatchMessagingMessageConverter
中使用 JsonMessageConverter
來轉換批次訊息。有關更多資訊,請參閱序列化、反序列化和訊息轉換和Spring Messaging 訊息轉換。
預設情況下,轉換的類型是從監聽器引數推斷出來的。如果您使用 DefaultJackson2TypeMapper
設定 JsonMessageConverter
,並將其 TypePrecedence
設定為 TYPE_ID
(而不是預設的 INFERRED
),則轉換器會改為使用標頭中的類型資訊(如果存在)。例如,這允許使用介面而不是具體類別來宣告監聽器方法。此外,類型轉換器支援映射,因此反序列化可以轉換為與來源不同的類型(只要資料相容)。當您使用類別層級 @KafkaListener
實例時,這也很有用,在這種情況下,有效負載必須已轉換才能確定要調用哪個方法。以下範例建立使用此方法的 bean
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
請注意,為了使其正常運作,轉換目標的方法簽章必須是具有單一泛型參數類型的容器物件,例如以下範例
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
請注意,您仍然可以存取批次標頭。
如果批次轉換器具有支援它的記錄轉換器,您也可以接收訊息列表,其中有效負載根據泛型類型進行轉換。以下範例示範如何執行此操作
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
...
}
ConversionService
自訂
從 2.1.1 版開始,預設 org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory
使用的 org.springframework.core.convert.ConversionService
,用於解析監聽器方法調用的參數,將會提供給所有實作以下任何介面的 bean
-
org.springframework.core.convert.converter.Converter
-
org.springframework.core.convert.converter.GenericConverter
-
org.springframework.format.Formatter
這讓您可以進一步自訂監聽器反序列化,而無需更改 ConsumerFactory
和 KafkaListenerContainerFactory
的預設組態。
透過 KafkaListenerConfigurer bean 在 KafkaListenerEndpointRegistrar 上設定自訂 MessageHandlerMethodFactory 會停用此功能。 |
為 @KafkaListener
新增自訂 HandlerMethodArgumentResolver
從 2.4.2 版開始,您可以新增自己的 HandlerMethodArgumentResolver
並解析自訂方法參數。您只需要實作 KafkaListenerConfigurer
並使用類別 KafkaListenerEndpointRegistrar
中的方法 setCustomMethodArgumentResolvers()
。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
您也可以透過將自訂 MessageHandlerMethodFactory
新增至 KafkaListenerEndpointRegistrar
bean 來完全取代框架的引數解析。如果您這樣做,並且您的應用程式需要處理 tombstone 記錄,以及 null
value()
(例如來自壓縮主題),您應該將 KafkaNullAwarePayloadArgumentResolver
新增至工廠;它必須是最後一個解析器,因為它支援所有類型,並且可以比對沒有 @Payload
註解的引數。如果您使用 DefaultMessageHandlerMethodFactory
,請將此解析器設定為最後一個自訂解析器;工廠將確保此解析器將在標準 PayloadMethodArgumentResolver
之前使用,後者不了解 KafkaNull
有效負載。