狀態儲存

當使用高階 DSL 並進行適當的呼叫時,狀態儲存區會由 Kafka Streams 自動建立,這些呼叫會觸發狀態儲存區。

如果您想要將傳入的 KTable 綁定實體化為具名的狀態儲存區,則可以使用以下策略。

假設您有以下函數。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然後透過設定以下屬性,傳入的 KTable 資料將會被實體化到具名的狀態儲存區中。

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

您可以在應用程式中將自訂狀態儲存區定義為 bean,這些 bean 將會被偵測到,並由 binder 新增至 Kafka Streams builder。特別是當使用處理器 API 時,您需要手動註冊狀態儲存區。為了做到這一點,您可以在應用程式中建立 StateStore 作為 bean。以下是如何定義這些 bean 的範例。

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

這些狀態儲存區隨後可以直接由應用程式存取。

在啟動期間,上述 bean 將會由 binder 處理,並傳遞至 Streams builder 物件。

存取狀態儲存區

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

當涉及到註冊全域狀態儲存區時,這將無法運作。為了註冊全域狀態儲存區,請參閱以下關於自訂 StreamsBuilderFactoryBean 的章節。