消費紀錄
在上述 upppercase
函數中,我們以 `Flux<String>` 形式消費紀錄,然後以 `Flux<String>` 形式生產。在某些情況下,您可能需要以原始接收格式(`ReceiverRecord`)接收紀錄。以下是這樣的一個函數。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
在此函數中,請注意,我們以 `Flux<ReceiverRecord<byte[], byte[]>>` 形式消費紀錄,然後以 `Flux<String>` 形式生產。`ReceiverRecord` 是基本的接收紀錄,它是 Reactor Kafka 中專門化的 Kafka `ConsumerRecord`。當使用反應式 Kafka binder 時,上述函數將允許您存取每個傳入紀錄的 `ReceiverRecord` 類型。然而,在這種情況下,您需要為 RecordMessageConverter 提供自訂實作。預設情況下,反應式 Kafka binder 使用 MessagingMessageConverter,它會轉換來自 `ConsumerRecord` 的有效負載和標頭。因此,當您的處理程序方法接收到它時,有效負載已經從接收到的紀錄中提取出來,並傳遞到方法中,就像我們上面看的第一個函數的情況一樣。透過在應用程式中提供自訂的 RecordMessageConverter
實作,您可以覆寫預設行為。例如,如果您想要以原始 `Flux<ReceiverRecord<byte[], byte[]>>` 形式消費紀錄,那麼您可以在應用程式中提供以下 Bean 定義。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然後,您需要指示框架針對所需的綁定使用此轉換器。以下是基於我們的 lowercase
函數的範例。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0
是我們 lowercase
函數的輸入綁定名稱。對於輸出 (lowecase-out-0
),我們仍然使用常規的 MessagingMessageConverter
。
在上面的 toMessage
實作中,我們接收原始的 ConsumerRecord
(由於我們處於反應式 binder 環境中,因此為 ReceiverRecord
),然後將其包裝在 Message
內部。然後,作為 ReceiverRecord
的該訊息有效負載會提供給使用者方法。
如果 reactiveAutoCommit
為 false
(預設值),請呼叫 rec.receiverOffset().acknowledge()
(或 commit()
) 以導致偏移量被提交;如果 reactiveAutoCommit
為 true
,則 flux 改為提供 ConsumerRecord
。有關更多資訊,請參閱 reactor-kafka
文件和 javadocs。