程式設計模型
當使用 Kafka Streams Binder 提供的程式設計模型時,高階的 Streams DSL 以及高階和低階 Processor-API 的混合都可以作為選項使用。當混合使用高階和低階 API 時,通常是透過在 KStream
上調用 transform
或 process
API 方法來實現。
函式式風格
從 Spring Cloud Stream 3.0.0
開始,Kafka Streams Binder 允許應用程式使用 Java 8 中可用的函式式程式設計風格來設計和開發。這表示應用程式可以簡潔地表示為 java.util.function.Function
或 java.util.function.Consumer
類型的 Lambda 運算式。
讓我們來看一個非常基本的範例。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
儘管簡單,這是一個完整的獨立 Spring Boot 應用程式,它利用 Kafka Streams 進行串流處理。這是一個消費者應用程式,沒有對外綁定,只有單一的對內綁定。該應用程式消費資料,並且僅在標準輸出上記錄來自 KStream
鍵和值的資訊。該應用程式包含 SpringBootApplication
註解和一個標記為 Bean
的方法。Bean 方法的類型為 java.util.function.Consumer
,它以 KStream
參數化。然後在實作中,我們回傳一個本質上是 Lambda 運算式的 Consumer 物件。在 Lambda 運算式內部,提供了處理資料的程式碼。
在這個應用程式中,有一個 KStream
類型的單一輸入綁定。Binder 為應用程式建立了這個綁定,名稱為 process-in-0
,即函式 Bean 名稱後跟一個破折號字元 (-
),然後是文字 in
,再後跟另一個破折號,然後是參數的序數位置。您可以使用這個綁定名稱來設定其他屬性,例如目的地。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic
。
如果沒有在綁定上設定目的地屬性,則會建立一個與綁定名稱相同的主題(如果應用程式具有足夠的權限),或者預期該主題已經可用。 |
一旦建置為 uber-jar(例如,kstream-consumer-app.jar
),您就可以像下面這樣執行上面的範例。
如果應用程式選擇使用 Spring 的 Component
註解來定義函式 Bean,則 Binder 也支援該模型。上面的函式 Bean 可以重寫如下。
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
這是另一個範例,它是一個完整的處理器,同時具有輸入和輸出綁定。這是經典的單字計數範例,其中應用程式從主題接收資料,然後在滾動時間視窗中計算每個單字的出現次數。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
同樣地,這是一個完整的 Spring Boot 應用程式。這裡與第一個應用程式的不同之處在於,Bean 方法的類型為 java.util.function.Function
。Function
的第一個參數化類型用於輸入 KStream
,第二個參數化類型用於輸出。在方法主體中,提供了一個 Function
類型的 Lambda 運算式,並將實際的業務邏輯作為實作給出。與先前討論的基於 Consumer 的應用程式類似,此處的輸入綁定預設命名為 process-in-0
。對於輸出,綁定名稱也會自動設定為 process-out-0
。
一旦建置為 uber-jar(例如,wordcount-processor.jar
),您就可以像下面這樣執行上面的範例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此應用程式將從 Kafka 主題 words
消費訊息,並且將計算結果發布到輸出主題 counts
。
Spring Cloud Stream 將確保來自輸入和輸出主題的訊息自動綁定為 KStream 物件。作為開發人員,您可以專注於程式碼的業務方面,即編寫處理器中所需的邏輯。Kafka Streams 基礎架構所需的 Kafka Streams 特定組態由框架自動處理。
我們上面看到的兩個範例都只有單一的 KStream
輸入綁定。在這兩種情況下,綁定都從單一主題接收記錄。如果您想將多個主題多工處理到單一的 KStream
綁定中,您可以提供以逗號分隔的 Kafka 主題作為以下目的地。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果您想比對符合正則表達式的主題,您也可以提供主題模式作為目的地。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多個輸入綁定
許多非平凡的 Kafka Streams 應用程式通常透過多個綁定從多個主題消費資料。例如,一個主題以 Kstream
形式消費,另一個主題以 KTable
或 GlobalKTable
形式消費。應用程式可能想要以表格類型接收資料的原因有很多。想像一個使用案例,其中底層主題是透過來自資料庫的變更資料擷取 (CDC) 機制填充的,或者應用程式可能只關心下游處理的最新更新。如果應用程式指定資料需要以 KTable
或 GlobalKTable
形式綁定,則 Kafka Streams Binder 將正確地將目的地綁定到 KTable
或 GlobalKTable
,並使其可供應用程式操作。我們將查看 Kafka Streams Binder 中如何處理多個輸入綁定的幾種不同情境。
Kafka Streams Binder 中的 BiFunction
這是一個範例,其中我們有兩個輸入和一個輸出。在這種情況下,應用程式可以利用 java.util.function.BiFunction
。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
同樣地,基本主題與之前的範例相同,但這裡我們有兩個輸入。Java 的 BiFunction
支援用於將輸入綁定到所需的目的地。Binder 為輸入產生的預設綁定名稱分別為 process-in-0
和 process-in-1
。預設輸出綁定為 process-out-0
。在此範例中,BiFunction
的第一個參數綁定為第一個輸入的 KStream
,第二個參數綁定為第二個輸入的 KTable
。
Kafka Streams Binder 中的 BiConsumer
如果只有兩個輸入,但沒有輸出,在這種情況下,我們可以使用 java.util.function.BiConsumer
,如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超過兩個輸入
如果您有超過兩個輸入怎麼辦?在某些情況下,您需要超過兩個輸入。在這種情況下,Binder 允許您鏈接部分函式。在函式式程式設計術語中,這種技術通常稱為柯里化 (currying)。透過作為 Java 8 一部分新增的函式式程式設計支援,Java 現在使您能夠編寫柯里化函式。Spring Cloud Stream Kafka Streams Binder 可以利用此功能來啟用多個輸入綁定。
讓我們看一個範例。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
讓我們看看上面提出的綁定模型的細節。在這個模型中,我們在對內有 3 個部分應用的函式。讓我們將它們稱為 f(x)
、f(y)
和 f(z)
。如果我們從真正的數學函式的意義上擴展這些函式,它將看起來像這樣:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
。x
變數代表 KStream<Long, Order>
,y
變數代表 GlobalKTable<Long, Customer>
,z
變數代表 GlobalKTable<Long, Product>
。第一個函式 f(x)
具有應用程式的第一個輸入綁定 (KStream<Long, Order>
),其輸出是函式 f(y)。函式 f(y)
具有應用程式的第二個輸入綁定 (GlobalKTable<Long, Customer>
),其輸出是另一個函式 f(z)
。函式 f(z)
的輸入是應用程式的第三個輸入 (GlobalKTable<Long, Product>
),其輸出是 KStream<Long, EnrichedOrder>
,這是應用程式的最終輸出綁定。來自三個部分函式的輸入,分別是 KStream
、GlobalKTable
、GlobalKTable
,在方法主體中可用於實作業務邏輯作為 Lambda 運算式的一部分。
輸入綁定分別命名為 enrichOrder-in-0
、enrichOrder-in-1
和 enrichOrder-in-2
。輸出綁定命名為 enrichOrder-out-0
。
使用柯里化函式,您可以虛擬地擁有任意數量的輸入。但是,請記住,在 Java 中,超過少量輸入和部分應用於它們的函式可能會導致程式碼難以閱讀。因此,如果您的 Kafka Streams 應用程式需要超過合理數量的輸入綁定,並且您想使用此函式式模型,那麼您可能需要重新思考您的設計並適當地分解應用程式。
輸出綁定
Kafka Streams Binder 允許 KStream
或 KTable
類型作為輸出綁定。在幕後,Binder 使用 KStream
上的 to
方法將結果記錄傳送到輸出主題。如果應用程式在函式中提供 KTable
作為輸出,則 Binder 仍然使用此技術,方法是委派給 KStream
的 to
方法。
例如,以下兩個函式都將起作用
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
多個輸出綁定
Kafka Streams 允許將對外資料寫入多個主題。此功能在 Kafka Streams 中稱為分支。當使用多個輸出綁定時,您需要提供 KStream 陣列 (KStream[]
) 作為對外回傳類型。
這是一個範例
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
程式設計模型保持不變,但對外參數化類型為 KStream[]
。上述函式的預設輸出綁定名稱分別為 process-out-0
、process-out-1
、process-out-2
。Binder 產生三個輸出綁定的原因是它偵測到回傳的 KStream
陣列的長度為三。請注意,在此範例中,我們提供了 noDefaultBranch()
;如果我們改為使用 defaultBranch()
,則需要額外的輸出綁定,實際上回傳長度為四的 KStream
陣列。
Kafka Streams 函式式程式設計風格摘要
總而言之,下表顯示了可以在函式式範例中使用的各種選項。
輸入數量 | 輸出數量 | 要使用的元件 |
---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
使用柯里化函式 |
-
在此表中有多個輸出的情況下,類型僅變為
KStream[]
。
Kafka Streams Binder 中的函式組合
Kafka Streams Binder 支援線性拓撲的最小形式的函式組合。使用 Java 函式式 API 支援,您可以編寫多個函式,然後使用 andThen
方法自行組合它們。例如,假設您有以下兩個函式。
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
即使沒有 Binder 中的函式組合支援,您也可以如下組合這兩個函式。
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
然後您可以提供 spring.cloud.function.definition=foo;bar;composed
形式的定義。使用 Binder 中的函式組合支援,您不需要編寫第三個函式來執行明確的函式組合。
您可以改為簡單地這樣做
spring.cloud.function.definition=foo|bar
您甚至可以這樣做
spring.cloud.function.definition=foo|bar;foo;bar
在此範例中,組合函式的預設綁定名稱變為 foobar-in-0
和 foobar-out-0
。
Kafka Streams Binder 中函式組合的限制
當您有 java.util.function.Function
Bean 時,它可以與另一個或多個函式組合。相同的函式 Bean 也可以與 java.util.function.Consumer
組合。在這種情況下,消費者是最後組合的元件。一個函式可以與多個函式組合,然後以 java.util.function.Consumer
Bean 結尾。
當組合 java.util.function.BiFunction
類型的 Bean 時,BiFunction
必須是定義中的第一個函式。組合實體必須是 java.util.function.Function
或 java.util.funciton.Consumer
類型。換句話說,您不能採用 BiFunction
Bean,然後與另一個 BiFunction
組合。
您不能與 BiConsumer
類型組合,也不能與 Consumer
是第一個元件的定義組合。您也不能與輸出是陣列(用於分支的 KStream[]
)的函式組合,除非這是定義中的最後一個元件。
函式定義中的第一個 Function
或 BiFunction
也可能使用柯里化形式。例如,以下是可能的。
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
函式定義可以是 curriedFoo|bar
。在幕後,Binder 將為柯里化函式建立兩個輸入綁定,並根據定義中的最終函式建立一個輸出綁定。在這種情況下,預設輸入綁定將為 curriedFoobar-in-0
和 curriedFoobar-in-1
。此範例的預設輸出綁定變為 curriedFoobar-out-0
。
關於在函式組合中使用 KTable
作為輸出的特別注意事項
假設您有以下兩個函式。
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
您可以將它們組合為 foo|bar
,但請記住,第二個函式(在本例中為 bar
)必須以 KTable
作為輸入,因為第一個函式 (foo
) 以 KTable
作為輸出。