互動式查詢

Kafka Streams binder API 公開了一個名為 InteractiveQueryService 的類別,用於以互動方式查詢狀態儲存。您可以在應用程式中將其作為 Spring bean 存取。從應用程式存取此 bean 的簡單方法是 autowire 此 bean。

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦您取得此 bean 的存取權,您就可以查詢您感興趣的特定狀態儲存。請參閱下方。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在啟動期間,上述擷取儲存的方法呼叫可能會失敗。例如,它可能仍在初始化狀態儲存的中間。在這種情況下,重試此操作將會很有用。Kafka Streams binder 提供了一個簡單的重試機制來容納這種情況。

以下是您可以用來控制此重試的兩個屬性。

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 預設值為 1

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 預設值為 1000 毫秒。

如果有多個 kafka streams 應用程式實例正在執行,那麼在您可以互動式查詢它們之前,您需要識別哪個應用程式實例託管您正在查詢的特定金鑰。InteractiveQueryService API 提供了用於識別主機資訊的方法。

為了使此功能正常運作,您必須如下設定屬性 application.server

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些程式碼片段

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

有關這些主機尋找方法的更多資訊,請參閱這些方法上的 Javadoc。對於這些方法,在啟動期間,如果底層的 KafkaStreams 物件尚未準備就緒,它們可能會拋出例外。上述重試屬性也適用於這些方法。

透過 InteractiveQueryService 提供的其他 API 方法

使用以下 API 方法來擷取與給定儲存和金鑰組合關聯的 KeyQueryMetadata 物件。

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

使用以下 API 方法來擷取與給定儲存和金鑰組合關聯的 KakfaStreams 物件。

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

自訂儲存查詢參數

有時,您可能需要微調儲存查詢參數,然後再透過 InteractiveQueryService 查詢儲存。為此,從 binder 的 4.0.1 版本開始,您可以為 StoreQueryParametersCustomizer 提供一個 bean,它是一個函數介面,具有一個 customize 方法,該方法將 StoreQueryParameter 作為引數。以下是其方法簽章。

StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);

使用這種方法,應用程式可以進一步自訂 StoreQueryParameters,例如啟用陳舊的儲存。

當此 bean 存在於此應用程式中時,InteractiveQueryService 將在查詢狀態儲存之前呼叫其 customize 方法。

請記住,應用程式中必須有一個唯一的 StoreQueryParametersCustomizer bean 可用。