混合高階 DSL 和低階處理器 API
Kafka Streams 提供了兩種 API 變體。它具有較高階的 DSL 類 API,您可以在其中鏈接各種操作,這些操作對於許多函數式程式設計師來說可能很熟悉。 Kafka Streams 也允許存取低階處理器 API。儘管處理器 API 非常強大,並且能夠在更低的層級控制事物,但本質上是命令式的。 Spring Cloud Stream 的 Kafka Streams binder 允許您使用高階 DSL 或混合使用 DSL 和處理器 API。混合使用這兩種變體為您提供了許多選項,可以控制應用程式中的各種用例。應用程式可以使用 transform
或 process
方法 API 呼叫來存取處理器 API。
以下是如何在 Spring Cloud Stream 應用程式中使用 process
API 結合 DSL 和處理器 API 的範例。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
以下是使用 transform
API 的範例。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
process
API 方法呼叫是一個終端操作,而 transform
API 則是非終端的,並為您提供一個潛在轉換後的 KStream
,您可以使用它繼續使用 DSL 或處理器 API 進行進一步處理。