發送訊息

本節涵蓋如何發送訊息。

使用 KafkaTemplate

本節涵蓋如何使用 KafkaTemplate 發送訊息。

總覽

KafkaTemplate 包裝了生產者並提供便捷的方法來將資料發送到 Kafka 主題。以下列表顯示了 KafkaTemplate 中的相關方法

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

請參閱 Javadoc 以取得更多詳細資訊。

在 3.0 版本中,先前返回 ListenableFuture 的方法已變更為返回 CompletableFuture。為了方便遷移,2.9 版本新增了一個方法 usingCompletableFuture(),該方法提供了相同的方法,但返回類型為 CompletableFuture;此方法已不再可用。

sendDefault API 要求已為範本提供預設主題。

API 接受 timestamp 作為參數,並將此時間戳記儲存在記錄中。使用者提供的时间戳記的儲存方式取決於 Kafka 主題上設定的時間戳記類型。如果主題設定為使用 CREATE_TIME,則會記錄使用者指定的時間戳記(如果未指定,則會產生)。如果主題設定為使用 LOG_APPEND_TIME,則會忽略使用者指定的時間戳記,並且 Broker 會加入本機 Broker 時間。

metricspartitionsFor 方法委派給底層 Producer 上的相同方法。execute 方法提供對底層 Producer 的直接存取。

若要使用範本,您可以設定生產者 factory 並在範本的建構子中提供它。以下範例顯示如何執行此操作

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

從 2.5 版本開始,您現在可以覆寫 factory 的 ProducerConfig 屬性,以從同一個 factory 建立具有不同生產者組態的範本。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

請注意,類型為 ProducerFactory<?, ?> 的 Bean(例如 Spring Boot 自動設定的 Bean)可以使用不同的縮小泛型類型來參考。

您也可以使用標準 <bean/> 定義來設定範本。

然後,若要使用範本,您可以調用其方法之一。

當您使用帶有 Message<?> 參數的方法時,主題、分割區、金鑰和時間戳記資訊會在訊息標頭中提供,其中包括以下項目

  • KafkaHeaders.TOPIC

  • KafkaHeaders.PARTITION

  • KafkaHeaders.KEY

  • KafkaHeaders.TIMESTAMP

訊息 Payload 是資料。

您可以選擇性地使用 ProducerListener 設定 KafkaTemplate,以取得發送結果(成功或失敗)的非同步回呼,而不是等待 Future 完成。以下列表顯示了 ProducerListener 介面的定義

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

依預設,範本配置了 LoggingProducerListener,它會記錄錯誤,並且在發送成功時不執行任何操作。

為了方便起見,如果您只想實作其中一種方法,則提供預設方法實作。

請注意,發送方法會返回 CompletableFuture<SendResult>。您可以向監聽器註冊回呼,以非同步方式接收發送結果。以下範例顯示如何執行此操作

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult 具有兩個屬性:ProducerRecordRecordMetadata。請參閱 Kafka API 文件以取得有關這些物件的資訊。

Throwable 可以轉換為 KafkaProducerException;其 producerRecord 屬性包含失敗的記錄。

如果您希望阻止發送執行緒以等待結果,您可以調用 future 的 get() 方法;建議使用帶有逾時的方法。如果您已設定 linger.ms,您可能希望在等待之前調用 flush(),或者為了方便起見,範本具有帶有 autoFlush 參數的建構子,該參數會導致範本在每次發送時執行 flush()。只有在您設定了 linger.ms 生產者屬性並想要立即發送部分批次時,才需要刷新。

範例

本節顯示將訊息發送到 Kafka 的範例

範例 1. 非阻塞 (Async)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
阻塞 (Sync)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

請注意,ExecutionException 的原因是帶有 producerRecord 屬性的 KafkaProducerException

使用 RoutingKafkaTemplate

從 2.5 版本開始,您可以使用 RoutingKafkaTemplate 在執行時根據目的地 topic 名稱選擇生產者。

路由範本支援交易、executeflushmetrics 操作,因為這些操作的主題是未知的。

範本需要 java.util.regex.PatternProducerFactory<Object, Object> 實例的 Map。此 Map 應為有序的(例如 LinkedHashMap),因為它會依序遍歷;您應該在開頭新增更具體的模式。

以下簡單的 Spring Boot 應用程式提供了一個範例,說明如何使用相同的範本發送到不同的主題,每個主題都使用不同的值序列化程式。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

此範例的對應 @KafkaListener 顯示在 註解屬性 中。

如需另一種技術來實現類似的結果,但具有將不同類型發送到同一主題的額外功能,請參閱 委派序列化程式和反序列化程式

使用 DefaultKafkaProducerFactory

使用 KafkaTemplate 中所示,ProducerFactory 用於建立生產者。

當不使用 交易 時,依預設,DefaultKafkaProducerFactory 會建立所有用戶端使用的單例生產者,如 KafkaProducer JavaDocs 中建議的那樣。但是,如果您在範本上調用 flush(),這可能會導致使用相同生產者的其他執行緒延遲。從 2.3 版本開始,DefaultKafkaProducerFactory 具有新的屬性 producerPerThread。當設定為 true 時,factory 將為每個執行緒建立(並快取)一個單獨的生產者,以避免此問題。

producerPerThreadtrue 時,使用者程式碼 必須 在不再需要生產者時在 factory 上調用 closeThreadBoundProducer()。這將實際關閉生產者並將其從 ThreadLocal 中移除。調用 reset()destroy() 不會清理這些生產者。

在建立 DefaultKafkaProducerFactory 時,可以透過調用僅接受屬性 Map 的建構子(請參閱 使用 KafkaTemplate 中的範例)從組態中選取金鑰和/或值 Serializer 類別,或者可以將 Serializer 實例傳遞給 DefaultKafkaProducerFactory 建構子(在這種情況下,所有 Producer 都共用相同的實例)。或者,您可以提供 Supplier<Serializer>s(從 2.3 版本開始),這些供應商將用於為每個 Producer 取得單獨的 Serializer 實例

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

從 2.5.10 版本開始,您現在可以在建立 factory 後更新生產者屬性。例如,如果您必須在憑證變更後更新 SSL 金鑰/信任儲存位置,這可能會很有用。變更不會影響現有的生產者實例;調用 reset() 以關閉任何現有的生產者,以便使用新屬性建立新的生產者。注意:您無法將交易生產者 factory 變更為非交易生產者 factory,反之亦然。

現在提供兩個新方法

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

從 2.8 版本開始,如果您以物件形式提供序列化程式(在建構子中或透過 Setter),factory 將調用 configure() 方法以使用組態屬性來設定它們。

使用 ReplyingKafkaTemplate

2.1.3 版本引入了 KafkaTemplate 的子類別,以提供請求/回覆語意。該類別名為 ReplyingKafkaTemplate,並且有兩個額外的方法;以下顯示方法簽名

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

結果是一個 CompletableFuture,它會以非同步方式填入結果(或例外狀況,表示逾時)。結果還具有 sendFuture 屬性,這是調用 KafkaTemplate.send() 的結果。您可以使用此 future 來判斷發送操作的結果。

在 3.0 版本中,這些方法(及其 sendFuture 屬性)返回的 future 已從 ListenableFutures 變更為 CompletableFutures。

如果使用第一個方法,或者 replyTimeout 引數為 null,則會使用範本的 defaultReplyTimeout 屬性(預設為 5 秒)。

從 2.8.8 版本開始,範本有一個新方法 waitForAssignment。如果回覆容器配置了 auto.offset.reset=latest,這非常有用,可以避免在容器初始化之前發送請求和回覆。

當使用手動分割區指派(無群組管理)時,等待時間必須大於容器的 pollTimeout 屬性,因為通知將在第一次輪詢完成後才會發送。

以下 Spring Boot 應用程式顯示了如何使用此功能的範例

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

請注意,我們可以利用 Boot 的自動配置容器 factory 來建立回覆容器。

如果回覆使用了非簡單的反序列化程式,請考慮使用 ErrorHandlingDeserializer,它會委派給您配置的反序列化程式。如此配置後,RequestReplyFuture 將會異常完成,您可以捕獲 ExecutionException,並在其 cause 屬性中包含 DeserializationException

從 2.6.7 版本開始,除了偵測 DeserializationExceptions 之外,範本還將調用 replyErrorChecker 函式(如果已提供)。如果它返回例外狀況,future 將會異常完成。

這是一個範例

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

範本設定了一個標頭(預設名稱為 KafkaHeaders.CORRELATION_ID),伺服器端必須將其回傳。

在此案例中,以下 @KafkaListener 應用程式會回應

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListener 基礎架構會回傳關聯 ID 並判斷回覆主題。

請參閱 使用 @SendTo 轉發監聽器結果 以取得有關發送回覆的更多資訊。範本使用預設標頭 KafKaHeaders.REPLY_TOPIC 來指示回覆的目的地主題。

從 2.2 版本開始,範本會嘗試從配置的回覆容器中偵測回覆主題或分割區。如果容器配置為監聽單一主題或單一 TopicPartitionOffset,則會使用它來設定回覆標頭。如果以其他方式配置容器,則使用者必須設定回覆標頭。在這種情況下,初始化期間會寫入 INFO 日誌訊息。以下範例使用 KafkaHeaders.REPLY_TOPIC

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

當您使用單一回覆 TopicPartitionOffset 進行配置時,您可以將相同的回覆主題用於多個範本,只要每個實例監聽不同的分割區即可。當使用單一回覆主題進行配置時,每個實例都必須使用不同的 group.id。在這種情況下,所有實例都會接收到每個回覆,但只有發送請求的實例才會找到關聯 ID。這對於自動縮放可能很有用,但會產生額外的網路流量開銷,以及丟棄每個不需要的回覆的小成本。當您使用此設定時,我們建議您將範本的 sharedReplyTopic 設定為 true,這會將非預期回覆的記錄層級從預設 ERROR 降低為 DEBUG。

以下是配置回覆容器以使用相同共用回覆主題的範例

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
如果您有多個用戶端實例,並且您未按照前一段討論的方式配置它們,則每個實例都需要專用的回覆主題。另一種方法是設定 KafkaHeaders.REPLY_PARTITION 並為每個實例使用專用的分割區。Header 包含一個四位元組整數(大端)。伺服器必須使用此標頭將回覆路由到正確的分割區(@KafkaListener 會執行此操作)。但是,在這種情況下,回覆容器不得使用 Kafka 的群組管理功能,並且必須配置為監聽固定分割區(透過在其 ContainerProperties 建構子中使用 TopicPartitionOffset)。
DefaultKafkaHeaderMapper 需要 Jackson 位於類別路徑上(適用於 @KafkaListener)。如果它不可用,則訊息轉換器沒有標頭對應器,因此您必須使用 SimpleKafkaHeaderMapper 配置 MessagingMessageConverter,如先前所示。

依預設,使用 3 個標頭

  • KafkaHeaders.CORRELATION_ID - 用於將回覆與請求關聯

  • KafkaHeaders.REPLY_TOPIC - 用於告知伺服器回覆的目的地

  • KafkaHeaders.REPLY_PARTITION - (選用)用於告知伺服器回覆的目的地分割區

這些標頭名稱由 @KafkaListener 基礎架構用於路由回覆。

從 2.3 版本開始,您可以自訂標頭名稱 - 範本具有 3 個屬性 correlationHeaderNamereplyTopicHeaderNamereplyPartitionHeaderName。如果您的伺服器不是 Spring 應用程式(或未使用 @KafkaListener),這會很有用。

反之,如果請求應用程式不是 Spring 應用程式,並且將關聯資訊放在不同的標頭中,則從 3.0 版本開始,您可以在監聽器容器 factory 上配置自訂 correlationHeaderName,並且該標頭將會被回傳。先前,監聽器必須回傳自訂關聯標頭。

使用 Message<?>s 進行請求/回覆

2.7 版本新增了 ReplyingKafkaTemplate 的方法,以發送和接收 spring-messagingMessage<?> 抽象

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

這些方法將使用範本的預設 replyTimeout,還有可以接受方法調用逾時的重載版本。

在 3.0 版本中,這些方法(及其 sendFuture 屬性)返回的 future 已從 ListenableFutures 變更為 CompletableFutures。

如果消費者的 Deserializer 或範本的 MessageConverter 可以在沒有任何額外資訊的情況下轉換 Payload,則使用第一個方法,無論是透過組態還是回覆訊息中的類型 Metadata。

如果您需要為返回類型提供類型資訊,以協助訊息轉換器,則使用第二個方法。這也允許相同的範本接收不同的類型,即使回覆中沒有類型 Metadata,例如當伺服器端不是 Spring 應用程式時。以下是後者的範例

範本 Bean
  • Java

  • Kotlin

@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}
使用範本
  • Java

  • Kotlin

RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })

回覆類型 Message<?>

@KafkaListener 返回 Message<?> 時,在 2.5 之前的版本中,必須填入回覆主題和關聯 ID 標頭。在此範例中,我們使用來自請求的回覆主題標頭

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

這也顯示了如何在回覆記錄上設定金鑰。

從 2.5 版本開始,框架將偵測這些標頭是否遺失,並使用主題(從 @SendTo 值或傳入的 KafkaHeaders.REPLY_TOPIC 標頭(如果存在)判斷的主題)填入它們。它也會回傳傳入的 KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION(如果存在)。

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

彙總多個回覆

使用 ReplyingKafkaTemplate 中的範本嚴格用於單一請求/回覆情境。對於單一訊息的多個接收者返回回覆的情況,您可以使用 AggregatingReplyingKafkaTemplate。這是 Scatter-Gather Enterprise Integration Pattern 的用戶端實作。

ReplyingKafkaTemplate 類似,AggregatingReplyingKafkaTemplate 建構子接受生產者 factory 和監聽器容器以接收回覆;它具有第三個參數 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy,每次收到回覆時都會諮詢它;當述詞返回 true 時,ConsumerRecords 的集合會用於完成 sendAndReceive 方法返回的 Future

還有一個額外的屬性 returnPartialOnTimeout(預設為 false)。當此屬性設定為 true 時,部分結果會正常完成 future(只要已收到至少一筆回覆記錄),而不是使用 KafkaReplyTimeoutException 完成 future。

從 2.3.5 版本開始,在逾時後也會調用述詞(如果 returnPartialOnTimeouttrue)。第一個引數是目前的記錄清單;如果此調用是由於逾時,則第二個引數為 true。述詞可以修改記錄清單。

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

請注意,返回類型是 ConsumerRecord,其值是 ConsumerRecords 的集合。「外部」ConsumerRecord 不是「真實」記錄,它是範本合成的,作為接收到的請求的實際回覆記錄的持有者。當發生正常釋放(釋放策略返回 true)時,主題會設定為 aggregatedResults;如果 returnPartialOnTimeout 為 true,並且發生逾時(且已收到至少一筆回覆記錄),則主題會設定為 partialResultsAfterTimeout。範本為這些「主題」名稱提供常數靜態變數

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

Collection 中的真實 ConsumerRecords 包含接收回覆的實際主題。

回覆的監聽器容器 必須 配置為 AckMode.MANUALAckMode.MANUAL_IMMEDIATE;消費者屬性 enable.auto.commit 必須為 false(自 2.3 版本以來的預設值)。為了避免任何訊息遺失的可能性,範本僅在沒有未完成的請求時才提交偏移量,即當最後一個未完成的請求由釋放策略釋放時。重新平衡後,可能會重複傳遞回覆;對於任何正在進行中的請求,這些回覆將被忽略;當為已釋放的回覆收到重複的回覆時,您可能會看到錯誤日誌訊息。
如果您將 ErrorHandlingDeserializer 與此彙總範本一起使用,則框架不會自動偵測 DeserializationExceptions。相反,記錄(帶有 null 值)將完整返回,反序列化例外狀況位於標頭中。建議應用程式調用公用程式方法 ReplyingKafkaTemplate.checkDeserialization() 方法來判斷是否發生反序列化例外狀況。請參閱其 JavaDocs 以取得更多資訊。replyErrorChecker 也不會為此彙總範本調用;您應該對回覆的每個元素執行檢查。