Kafka Streams 應用程式中基於事件類型的路由

常規的基於訊息通道的 Binder 中可用的路由函數在 Kafka Streams Binder 中不受支援。然而,Kafka Streams Binder 仍然透過入站記錄上的事件類型記錄標頭提供路由功能。

若要啟用基於事件類型的路由,應用程式必須提供以下屬性。

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.

這可以是逗號分隔值。

例如,假設我們有這個函數

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

我們也假設我們只想在此函數中執行業務邏輯,如果傳入的記錄具有事件類型 foobar。這可以使用繫結上的 eventTypes 屬性表示如下。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

現在,當應用程式執行時,Binder 會檢查每個傳入記錄的標頭 event_type,並查看其值是否設定為 foobar。如果找不到其中任何一個,則會跳過函數執行。

預設情況下,Binder 預期記錄標頭金鑰為 event_type,但可以針對每個繫結進行變更。例如,如果我們想要將此繫結上的標頭金鑰變更為 my_event 而不是預設值,則可以如下變更。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.

當在 Kafka Streams Binder 中使用事件路由功能時,它會使用位元組陣列 Serde 來反序列化所有傳入記錄。如果記錄標頭與事件類型相符,則它只會使用實際的 Serde,使用已組態或推斷的 Serde 進行適當的反序列化。如果您在繫結上設定了反序列化例外處理常式,這會引入問題,因為預期的反序列化僅在堆疊下方發生,導致意外錯誤。為了解決此問題,您可以設定繫結上的以下屬性,以強制 Binder 使用已組態或推斷的 Serde,而不是位元組陣列 Serde

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents

這樣一來,應用程式在使用事件路由功能時可以立即偵測到反序列化問題,並可以採取適當的處理決策。