訊息標頭

0.11.0.0 版本的客戶端引入了訊息標頭的支援。從 2.0 版本開始,Spring for Apache Kafka 現在支援將這些標頭映射到 spring-messaging MessageHeaders 以及從其映射。

先前的版本將 ConsumerRecordProducerRecord 映射到 spring-messaging Message<?>,其中 value 屬性映射到 payload 以及從 payload 映射,而其他屬性 (topicpartition 等) 則映射到標頭。目前情況仍然如此,但現在可以映射額外的(任意)標頭。

Apache Kafka 標頭具有簡單的 API,如下列介面定義所示

public interface Header {

    String key();

    byte[] value();

}

提供了 KafkaHeaderMapper 策略,用於在 Kafka HeadersMessageHeaders 之間映射標頭條目。其介面定義如下:

public interface KafkaHeaderMapper {

    void fromHeaders(MessageHeaders headers, Headers target);

    void toHeaders(Headers source, Map<String, Object> target);

}

SimpleKafkaHeaderMapper 將原始標頭映射為 byte[],並提供轉換為 String 值的組態選項。

DefaultKafkaHeaderMapper 將金鑰映射到 MessageHeaders 標頭名稱,為了支援傳出訊息的豐富標頭類型,會執行 JSON 轉換。一個「special」標頭(金鑰為 spring_json_header_types)包含 <key>:<type> 的 JSON 映射。此標頭用於入站端,以提供每個標頭值到原始類型的適當轉換。

在入站端,所有 Kafka Header 實例都會映射到 MessageHeaders。在出站端,預設情況下,會映射所有 MessageHeaders,但 idtimestamp 以及映射到 ConsumerRecord 屬性的標頭除外。

您可以透過為 mapper 提供模式,指定要為出站訊息映射哪些標頭。以下清單顯示了一些範例映射:

public DefaultKafkaHeaderMapper() { (1)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
    ...
}

public DefaultKafkaHeaderMapper(String... patterns) { (3)
    ...
}

public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
    ...
}
1 使用預設的 Jackson ObjectMapper 並映射大多數標頭,如範例之前所討論的。
2 使用提供的 Jackson ObjectMapper 並映射大多數標頭,如範例之前所討論的。
3 使用預設的 Jackson ObjectMapper 並根據提供的模式映射標頭。
4 使用提供的 Jackson ObjectMapper 並根據提供的模式映射標頭。

模式相當簡單,可以包含前導萬用字元 (*)、尾隨萬用字元或兩者 (例如,*.cat.*)。您可以使用前導 ! 來否定模式。第一個符合標頭名稱的模式(無論是肯定還是否定)獲勝。

當您提供自己的模式時,我們建議包含 !id!timestamp,因為這些標頭在入站端是唯讀的。

預設情況下,mapper 僅反序列化 java.langjava.util 中的類別。您可以透過使用 addTrustedPackages 方法新增受信任的套件來信任其他(或所有)套件。如果您收到來自不受信任來源的訊息,您可能希望僅新增您信任的那些套件。若要信任所有套件,您可以使用 mapper.addTrustedPackages("*")
以原始形式映射 String 標頭值在與不了解 mapper 的 JSON 格式的系統通訊時很有用。

從 2.2.5 版本開始,您可以指定某些字串值標頭不應使用 JSON 映射,而是映射到/從原始 byte[] 映射。AbstractKafkaHeaderMapper 具有新的屬性;當 mapAllStringsOut 設定為 true 時,所有字串值標頭都將使用 charset 屬性(預設為 UTF-8)轉換為 byte[]。此外,還有一個屬性 rawMappedHeaders,它是一個 header name : boolean 的映射;如果映射包含標頭名稱,且標頭包含 String 值,則它將使用字元集映射為原始 byte[]。如果且僅當映射值中的布林值為 true 時,此映射也用於將原始傳入的 byte[] 標頭映射到 String。如果布林值為 false,或者標頭名稱不在具有 true 值的映射中,則傳入的標頭僅映射為原始未映射的標頭。

以下測試案例說明了此機制。

@Test
public void testSpecificStringConvert() {
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    Map<String, Boolean> rawMappedHeaders = new HashMap<>();
    rawMappedHeaders.put("thisOnesAString", true);
    rawMappedHeaders.put("thisOnesBytes", false);
    mapper.setRawMappedHeaders(rawMappedHeaders);
    Map<String, Object> headersMap = new HashMap<>();
    headersMap.put("thisOnesAString", "thing1");
    headersMap.put("thisOnesBytes", "thing2");
    headersMap.put("alwaysRaw", "thing3".getBytes());
    MessageHeaders headers = new MessageHeaders(headersMap);
    Headers target = new RecordHeaders();
    mapper.fromHeaders(headers, target);
    assertThat(target).containsExactlyInAnyOrder(
            new RecordHeader("thisOnesAString", "thing1".getBytes()),
            new RecordHeader("thisOnesBytes", "thing2".getBytes()),
            new RecordHeader("alwaysRaw", "thing3".getBytes()));
    headersMap.clear();
    mapper.toHeaders(target, headersMap);
    assertThat(headersMap).contains(
            entry("thisOnesAString", "thing1"),
            entry("thisOnesBytes", "thing2".getBytes()),
            entry("alwaysRaw", "thing3".getBytes()));
}

預設情況下,這兩個標頭 mapper 都會映射所有入站標頭。從 2.8.8 版本開始,模式也可以應用於入站映射。若要建立用於入站映射的 mapper,請使用各個 mapper 上的靜態方法之一

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}

public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}

例如

DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");

這將排除所有以 abc 開頭的標頭,並包含所有其他標頭。

預設情況下,只要 Jackson 在類別路徑上,DefaultKafkaHeaderMapper 就會在 MessagingMessageConverterBatchMessagingMessageConverter 中使用。

使用批次轉換器時,轉換後的標頭在 KafkaHeaders.BATCH_CONVERTED_HEADERS 中以 List<Map<String, Object>> 的形式提供,其中清單中位置的映射對應於酬載中的資料位置。

如果沒有轉換器(因為 Jackson 不存在或明確設定為 null),則來自 consumer 記錄的標頭會在 KafkaHeaders.NATIVE_HEADERS 標頭中以未轉換的形式提供。此標頭是一個 Headers 物件(在批次轉換器的情況下,則為 List<Headers>),其中清單中的位置對應於酬載中的資料位置。

某些類型不適合 JSON 序列化,對於這些類型,可能首選簡單的 toString() 序列化。DefaultKafkaHeaderMapper 有一個名為 addToStringClasses() 的方法,可讓您提供應以此方式處理以進行出站映射的類別名稱。在入站映射期間,它們會映射為 String。預設情況下,只有 org.springframework.util.MimeTypeorg.springframework.http.MediaType 會以此方式映射。
從 2.3 版本開始,字串值標頭的處理已簡化。預設情況下,此類標頭不再進行 JSON 編碼(即,它們沒有新增封閉的 "...")。類型仍然會新增到 JSON_TYPES 標頭,以便接收系統可以轉換回字串(從 byte[])。mapper 可以處理(解碼)舊版本產生的標頭(它會檢查前導 ");透過這種方式,使用 2.3 的應用程式可以取用來自舊版本的記錄。
為了與早期版本相容,如果使用 2.3 版本的記錄可能會被使用早期版本的應用程式取用,請將 encodeStrings 設定為 true。當所有應用程式都使用 2.3 或更高版本時,您可以將屬性保留為預設值 false
@Bean
MessagingMessageConverter converter() {
    MessagingMessageConverter converter = new MessagingMessageConverter();
    DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
    mapper.setEncodeStrings(true);
    converter.setHeaderMapper(mapper);
    return converter;
}

如果使用 Spring Boot,它會將此轉換器 bean 自動組態到自動組態的 KafkaTemplate 中;否則,您應該將此轉換器新增至範本。