Kafka Streams 應用程式中基於事件類型的路由
常規的基於訊息通道的 Binder 中可用的路由函數在 Kafka Streams Binder 中不受支援。然而,Kafka Streams Binder 仍然透過入站記錄上的事件類型記錄標頭提供路由功能。
若要啟用基於事件類型的路由,應用程式必須提供以下屬性。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes
.
這可以是逗號分隔值。
例如,假設我們有這個函數
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
我們也假設我們只想在此函數中執行業務邏輯,如果傳入的記錄具有事件類型 foo
或 bar
。這可以使用繫結上的 eventTypes
屬性表示如下。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
現在,當應用程式執行時,Binder 會檢查每個傳入記錄的標頭 event_type
,並查看其值是否設定為 foo
或 bar
。如果找不到其中任何一個,則會跳過函數執行。
預設情況下,Binder 預期記錄標頭金鑰為 event_type
,但可以針對每個繫結進行變更。例如,如果我們想要將此繫結上的標頭金鑰變更為 my_event
而不是預設值,則可以如下變更。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event
.
當在 Kafka Streams Binder 中使用事件路由功能時,它會使用位元組陣列 Serde
來反序列化所有傳入記錄。如果記錄標頭與事件類型相符,則它只會使用實際的 Serde
,使用已組態或推斷的 Serde
進行適當的反序列化。如果您在繫結上設定了反序列化例外處理常式,這會引入問題,因為預期的反序列化僅在堆疊下方發生,導致意外錯誤。為了解決此問題,您可以設定繫結上的以下屬性,以強制 Binder 使用已組態或推斷的 Serde
,而不是位元組陣列 Serde
。
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
這樣一來,應用程式在使用事件路由功能時可以立即偵測到反序列化問題,並可以採取適當的處理決策。