記錄序列化與反序列化
Kafka Streams binder 允許您以兩種方式序列化和反序列化記錄。一種是 Kafka 提供的原生序列化和反序列化功能,另一種是 Spring Cloud Stream 框架的訊息轉換功能。讓我們來看一些細節。
入站反序列化
金鑰始終使用原生 Serdes 進行反序列化。
對於值,預設情況下,入站的反序列化由 Kafka 原生執行。請注意,這與先前版本的 Kafka Streams binder 的預設行為有重大變更,在先前版本中,反序列化是由框架完成的。
Kafka Streams binder 將嘗試透過查看 java.util.function.Function|Consumer
的類型簽章來推斷符合的 Serde
類型。以下是它比對 Serdes 的順序。
-
如果應用程式提供
Serde
類型的 Bean,並且如果返回類型使用傳入金鑰或值類型的實際類型進行參數化,則它將使用該Serde
進行入站反序列化。例如,如果您在應用程式中有以下內容,binder 會偵測到KStream
的傳入值類型與在Serde
Bean 上參數化的類型相符。它將使用該類型進行入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下來,它會查看類型,並查看它們是否為 Kafka Streams 公開的類型之一。如果是,則使用它們。以下是 binder 將嘗試從 Kafka Streams 比對的 Serde 類型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serdes 都與類型不符,則它將使用 Spring Kafka 提供的 JsonSerde。在這種情況下,binder 假設類型是 JSON 友好的。如果您有多個值物件作為輸入,這會很有用,因為 binder 將在內部將它們推斷為正確的 Java 類型。不過,在回退到
JsonSerde
之前,binder 會檢查 Kafka Streams 組態中設定的預設Serde
,以查看它是否是可以與傳入 KStream 類型比對的Serde
。
如果以上策略都無效,則應用程式必須透過組態提供 Serde
。這可以透過兩種方式進行組態 - 綁定或預設。
首先,binder 將查看是否在綁定層級提供了 Serde
。例如,如果您有以下處理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
那麼,您可以使用以下方式提供綁定層級 Serde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您為每個輸入綁定提供如上的 Serde ,那麼它將具有更高的優先順序,並且 binder 將避免任何 Serde 推斷。 |
如果您希望預設的金鑰/值 Serdes 用於入站反序列化,您可以在 binder 層級執行此操作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不想要 Kafka 提供的原生解碼,您可以依賴 Spring Cloud Stream 提供的訊息轉換功能。由於原生解碼是預設值,為了讓 Spring Cloud Stream 反序列化入站值物件,您需要明確停用原生解碼。
例如,如果您有與上述相同的 BiFunction 處理器,則 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要個別停用所有輸入的原生解碼。否則,原生解碼仍將應用於您未停用的輸入。
預設情況下,Spring Cloud Stream 將使用 application/json
作為內容類型,並使用適當的 json 訊息轉換器。您可以使用以下屬性和適當的 MessageConverter
Bean 來使用自訂訊息轉換器。
spring.cloud.stream.bindings.process-in-0.contentType
出站序列化
出站序列化幾乎遵循與上述入站反序列化相同的規則。與入站反序列化一樣,Spring Cloud Stream 先前版本的一個主要變更是,出站的序列化由 Kafka 原生處理。在 3.0 版本的 binder 之前,這是由框架本身完成的。
出站的金鑰始終由 Kafka 使用 binder 推斷的符合 Serde
進行序列化。如果它無法推斷金鑰的類型,則需要使用組態指定。
值 Serdes 的推斷使用與入站反序列化相同的規則。首先,它會比對以查看出站類型是否來自應用程式中提供的 Bean。如果不是,它會檢查以查看它是否與 Kafka 公開的 Serde
相符,例如 - Integer
、Long
、Short
、Double
、Float
、byte[]
、UUID
和 String
。如果這不起作用,則它會回退到 Spring Kafka 專案提供的 JsonSerde
,但首先查看預設 Serde
組態以查看是否有相符項。請記住,所有這些都對應用程式透明地發生。如果這些都不起作用,則使用者必須透過組態提供要使用的 Serde
。
假設您使用與上述相同的 BiFunction
處理器。然後,您可以如下組態出站金鑰/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推斷失敗,並且未提供任何綁定層級 Serdes,則 binder 會回退到 JsonSerde
,但會查看預設 Serdes 以尋找相符項。
預設 serdes 的組態方式與上述反序列化下描述的方式相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的應用程式使用分支功能並具有多個輸出綁定,則這些必須針對每個綁定進行組態。再次強調,如果 binder 能夠推斷 Serde
類型,則您不需要執行此組態。
如果您不想要 Kafka 提供的原生編碼,但想要使用框架提供的訊息轉換,則您需要明確停用原生編碼,因為原生編碼是預設值。例如,如果您有與上述相同的 BiFunction 處理器,則 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
在分支的情況下,您需要個別停用所有輸出的原生編碼。否則,原生編碼仍將應用於您未停用的輸出。
當轉換由 Spring Cloud Stream 完成時,預設情況下,它將使用 application/json
作為內容類型,並使用適當的 json 訊息轉換器。您可以使用以下屬性和對應的 MessageConverter
Bean 來使用自訂訊息轉換器。
spring.cloud.stream.bindings.process-out-0.contentType
當停用原生編碼/解碼時,binder 將不會像原生 Serdes 的情況一樣進行任何推斷。應用程式需要明確提供所有組態選項。因此,一般建議在使用 Spring Cloud Stream Kafka Streams 應用程式時,保留預設的 de/serialization 選項,並堅持使用 Kafka Streams 提供的原生 de/serialization。您必須使用框架提供的訊息轉換功能的一種情況是,當您的上游生產者使用特定的序列化策略時。在這種情況下,您希望使用相符的反序列化策略,因為原生機制可能會失敗。當依賴預設的 Serde
機制時,應用程式必須確保 binder 有一種正確映射入站和出站與適當 Serde
的方法,否則事情可能會失敗。
值得一提的是,上面概述的資料 de/serialization 方法僅適用於處理器的邊緣,即 - 入站和出站。您的業務邏輯可能仍然需要呼叫明確需要 Serde
物件的 Kafka Streams API。這些仍然是應用程式的責任,並且必須由開發人員相應地處理。