互動式查詢
Kafka Streams binder API 公開了一個名為 InteractiveQueryService
的類別,用於以互動方式查詢狀態儲存。您可以在應用程式中將其作為 Spring bean 存取。從應用程式存取此 bean 的簡單方法是 autowire
此 bean。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦您取得此 bean 的存取權,您就可以查詢您感興趣的特定狀態儲存。請參閱下方。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在啟動期間,上述擷取儲存的方法呼叫可能會失敗。例如,它可能仍在初始化狀態儲存的中間。在這種情況下,重試此操作將會很有用。Kafka Streams binder 提供了一個簡單的重試機制來容納這種情況。
以下是您可以用來控制此重試的兩個屬性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 預設值為
1
。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 預設值為
1000
毫秒。
如果有多個 kafka streams 應用程式實例正在執行,那麼在您可以互動式查詢它們之前,您需要識別哪個應用程式實例託管您正在查詢的特定金鑰。InteractiveQueryService
API 提供了用於識別主機資訊的方法。
為了使此功能正常運作,您必須如下設定屬性 application.server
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些程式碼片段
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
有關這些主機尋找方法的更多資訊,請參閱這些方法上的 Javadoc。對於這些方法,在啟動期間,如果底層的 KafkaStreams 物件尚未準備就緒,它們可能會拋出例外。上述重試屬性也適用於這些方法。
透過 InteractiveQueryService 提供的其他 API 方法
使用以下 API 方法來擷取與給定儲存和金鑰組合關聯的 KeyQueryMetadata
物件。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法來擷取與給定儲存和金鑰組合關聯的 KakfaStreams
物件。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
自訂儲存查詢參數
有時,您可能需要微調儲存查詢參數,然後再透過 InteractiveQueryService
查詢儲存。為此,從 binder 的 4.0.1
版本開始,您可以為 StoreQueryParametersCustomizer
提供一個 bean,它是一個函數介面,具有一個 customize
方法,該方法將 StoreQueryParameter
作為引數。以下是其方法簽章。
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
使用這種方法,應用程式可以進一步自訂 StoreQueryParameters
,例如啟用陳舊的儲存。
當此 bean 存在於此應用程式中時,InteractiveQueryService
將在查詢狀態儲存之前呼叫其 customize
方法。
請記住,應用程式中必須有一個唯一的 StoreQueryParametersCustomizer bean 可用。 |