訊息標頭
0.11.0.0 版本的客戶端引入了訊息標頭的支援。從 2.0 版本開始,Spring for Apache Kafka 現在支援將這些標頭映射到 spring-messaging MessageHeaders 以及從其映射。
先前的版本將 ConsumerRecord 和 ProducerRecord 映射到 spring-messaging Message<?> ,其中 value 屬性映射到 payload 以及從 payload 映射,而其他屬性 (topic 、partition 等) 則映射到標頭。目前情況仍然如此,但現在可以映射額外的(任意)標頭。 |
Apache Kafka 標頭具有簡單的 API,如下列介面定義所示
public interface Header {
String key();
byte[] value();
}
提供了 KafkaHeaderMapper
策略,用於在 Kafka Headers
和 MessageHeaders
之間映射標頭條目。其介面定義如下:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
SimpleKafkaHeaderMapper
將原始標頭映射為 byte[]
,並提供轉換為 String
值的組態選項。
DefaultKafkaHeaderMapper
將金鑰映射到 MessageHeaders
標頭名稱,為了支援傳出訊息的豐富標頭類型,會執行 JSON 轉換。一個「special
」標頭(金鑰為 spring_json_header_types
)包含 <key>:<type>
的 JSON 映射。此標頭用於入站端,以提供每個標頭值到原始類型的適當轉換。
在入站端,所有 Kafka Header
實例都會映射到 MessageHeaders
。在出站端,預設情況下,會映射所有 MessageHeaders
,但 id
、timestamp
以及映射到 ConsumerRecord
屬性的標頭除外。
您可以透過為 mapper 提供模式,指定要為出站訊息映射哪些標頭。以下清單顯示了一些範例映射:
public DefaultKafkaHeaderMapper() { (1)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public DefaultKafkaHeaderMapper(String... patterns) { (3)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
1 | 使用預設的 Jackson ObjectMapper 並映射大多數標頭,如範例之前所討論的。 |
2 | 使用提供的 Jackson ObjectMapper 並映射大多數標頭,如範例之前所討論的。 |
3 | 使用預設的 Jackson ObjectMapper 並根據提供的模式映射標頭。 |
4 | 使用提供的 Jackson ObjectMapper 並根據提供的模式映射標頭。 |
模式相當簡單,可以包含前導萬用字元 (*
)、尾隨萬用字元或兩者 (例如,*.cat.*
)。您可以使用前導 !
來否定模式。第一個符合標頭名稱的模式(無論是肯定還是否定)獲勝。
當您提供自己的模式時,我們建議包含 !id
和 !timestamp
,因為這些標頭在入站端是唯讀的。
預設情況下,mapper 僅反序列化 java.lang 和 java.util 中的類別。您可以透過使用 addTrustedPackages 方法新增受信任的套件來信任其他(或所有)套件。如果您收到來自不受信任來源的訊息,您可能希望僅新增您信任的那些套件。若要信任所有套件,您可以使用 mapper.addTrustedPackages("*") 。 |
以原始形式映射 String 標頭值在與不了解 mapper 的 JSON 格式的系統通訊時很有用。 |
從 2.2.5 版本開始,您可以指定某些字串值標頭不應使用 JSON 映射,而是映射到/從原始 byte[]
映射。AbstractKafkaHeaderMapper
具有新的屬性;當 mapAllStringsOut
設定為 true 時,所有字串值標頭都將使用 charset
屬性(預設為 UTF-8
)轉換為 byte[]
。此外,還有一個屬性 rawMappedHeaders
,它是一個 header name : boolean
的映射;如果映射包含標頭名稱,且標頭包含 String
值,則它將使用字元集映射為原始 byte[]
。如果且僅當映射值中的布林值為 true
時,此映射也用於將原始傳入的 byte[]
標頭映射到 String
。如果布林值為 false
,或者標頭名稱不在具有 true
值的映射中,則傳入的標頭僅映射為原始未映射的標頭。
以下測試案例說明了此機制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
預設情況下,這兩個標頭 mapper 都會映射所有入站標頭。從 2.8.8 版本開始,模式也可以應用於入站映射。若要建立用於入站映射的 mapper,請使用各個 mapper 上的靜態方法之一
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
這將排除所有以 abc
開頭的標頭,並包含所有其他標頭。
預設情況下,只要 Jackson 在類別路徑上,DefaultKafkaHeaderMapper
就會在 MessagingMessageConverter
和 BatchMessagingMessageConverter
中使用。
使用批次轉換器時,轉換後的標頭在 KafkaHeaders.BATCH_CONVERTED_HEADERS
中以 List<Map<String, Object>>
的形式提供,其中清單中位置的映射對應於酬載中的資料位置。
如果沒有轉換器(因為 Jackson 不存在或明確設定為 null
),則來自 consumer 記錄的標頭會在 KafkaHeaders.NATIVE_HEADERS
標頭中以未轉換的形式提供。此標頭是一個 Headers
物件(在批次轉換器的情況下,則為 List<Headers>
),其中清單中的位置對應於酬載中的資料位置。
某些類型不適合 JSON 序列化,對於這些類型,可能首選簡單的 toString() 序列化。DefaultKafkaHeaderMapper 有一個名為 addToStringClasses() 的方法,可讓您提供應以此方式處理以進行出站映射的類別名稱。在入站映射期間,它們會映射為 String 。預設情況下,只有 org.springframework.util.MimeType 和 org.springframework.http.MediaType 會以此方式映射。 |
從 2.3 版本開始,字串值標頭的處理已簡化。預設情況下,此類標頭不再進行 JSON 編碼(即,它們沒有新增封閉的 "..." )。類型仍然會新增到 JSON_TYPES 標頭,以便接收系統可以轉換回字串(從 byte[] )。mapper 可以處理(解碼)舊版本產生的標頭(它會檢查前導 " );透過這種方式,使用 2.3 的應用程式可以取用來自舊版本的記錄。 |
為了與早期版本相容,如果使用 2.3 版本的記錄可能會被使用早期版本的應用程式取用,請將 encodeStrings 設定為 true 。當所有應用程式都使用 2.3 或更高版本時,您可以將屬性保留為預設值 false 。 |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它會將此轉換器 bean 自動組態到自動組態的 KafkaTemplate
中;否則,您應該將此轉換器新增至範本。