使用 Reactive Kafka Binder 的基本範例

在本節中,我們將展示一些基本程式碼片段,用於使用反應式 Binder 編寫反應式 Kafka 應用程式,並詳細說明相關細節。

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
    return s -> s.map(String::toUpperCase);
}

您可以將上述 upppercase 函數與基於訊息通道的 Kafka Binder (spring-cloud-stream-binder-kafka) 以及反應式 Kafka Binder (spring-cloud-stream-binder-kafka-reactive) 一起使用,本節將討論後者。當將此函數與常規 Kafka Binder 一起使用時,儘管您在應用程式 (即 uppercase 函數) 中使用了反應式類型,但您只能在函數執行期間獲得反應式串流。在函數的執行上下文之外,由於底層 Binder 並非基於反應式堆疊,因此沒有反應式優勢。因此,儘管這看起來像是帶來了完整的端對端反應式堆疊,但此應用程式僅是部分反應式的。

現在假設您正在將適當的反應式 Binder 用於 Kafka - spring-cloud-stream-binder-kafka-reactive 與上述函數的應用程式一起使用。此 Binder 實作將提供完整的反應式優勢,從頂端的消費一直到鏈底端的發布。這是因為底層 Binder 是建立在 Reactor Kafka 的核心 API 之上。在消費者端,它使用了 KafkaReceiver,這是 Kafka 消費者的反應式實作。同樣地,在生產者端,它使用了 KafkaSender API,這是 Kafka 生產者的反應式實作。由於反應式 Kafka Binder 的基礎是建立在適當的反應式 Kafka API 之上,因此應用程式可以充分利用使用反應式技術的好處。例如,自動背壓等反應式功能在使用此反應式 Kafka Binder 時已內建於應用程式中。

從 4.0.2 版開始,您可以透過提供一個或多個 ReceiverOptionsCustomizerSenderOptionsCustomizer Bean 分別自訂 ReceiverOptionsSenderOptions。它們是 BiFunction,用於接收綁定名稱和初始選項,並傳回自訂選項。這些介面擴展了 Ordered,因此當存在多個自訂器時,將按照要求的順序應用它們。

Binder 預設不提交偏移量。從 4.0.2 版開始,KafkaHeaders.ACKNOWLEDGMENT 標頭包含一個 ReceiverOffset 物件,您可以透過呼叫其 acknowledge()commit() 方法來使偏移量被提交。
@Bean
public Consumer<Flux<Message<String>> consume() {
    return msg -> {
        process(msg.getPayload());
        msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
    }
}

有關更多資訊,請參閱 reactor-kafka 文件和 Javadoc。

此外,從 4.0.3 版開始,Kafka 消費者屬性 reactiveAtmostOnce 可以設定為 true,並且 Binder 將在處理每次輪詢傳回的記錄之前自動提交偏移量。此外,從 4.0.3 版開始,您可以將消費者屬性 reactiveAutoCommit 設定為 true,並且 Binder 將在處理每次輪詢傳回的記錄之後自動提交偏移量。在這些情況下,確認標頭不存在。

4.0.2 也提供了 reactiveAutoCommit,但實作不正確,其行為類似於 reactiveAtMostOnce

以下是如何使用 reaciveAutoCommit 的範例。

@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
	return flux -> flux
			.doOnNext(inner -> inner
				.doOnNext(val -> {
					log.info(val.value());
				})
				.subscribe())
			.subscribe();
}

請注意,當使用自動提交時,reactor-kafka 會傳回 Flux<Flux<ConsumerRecord<?, ?>>>。鑑於 Spring 無法存取內部 Flux 的內容,應用程式必須處理原生 ConsumerRecord;沒有訊息轉換或轉換服務應用於內容。這需要使用原生解碼 (透過在組態中指定適當類型的 Deserializer) 以傳回所需類型的記錄鍵/值。