使用 @SendTo 轉發監聽器結果

從 2.0 版本開始,如果您也使用 @SendTo 注解來註解 @KafkaListener,且方法調用返回結果,則結果將會轉發到 @SendTo 指定的主題。

@SendTo 值可以有多種形式

  • @SendTo("someTopic") 路由到文字主題。

  • @SendTo("#{someExpression}") 路由到在應用程式內容初始化期間評估一次的表達式所決定的主題。

  • @SendTo("!{someExpression}") 路由到在執行時期評估表達式所決定的主題。評估的 #root 物件具有三個屬性

    • request:入站 ConsumerRecord(或批次監聽器的 ConsumerRecords 物件)。

    • source:從 request 轉換而來的 org.springframework.messaging.Message<?>

    • result:方法返回結果。

  • @SendTo(無屬性):這被視為 !{source.headers['kafka_replyTopic']}(自 2.1.3 版本起)。

從 2.1.11 和 2.2.1 版本開始,屬性佔位符在 @SendTo 值內解析。

表達式評估的結果必須是代表主題名稱的 String。以下範例展示了使用 @SendTo 的各種方式

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}
為了支援 @SendTo,監聽器容器工廠必須提供 KafkaTemplate(在其 replyTemplate 屬性中),用於傳送回覆。這應該是 KafkaTemplate,而不是用於客戶端請求/回覆處理的 ReplyingKafkaTemplate。使用 Spring Boot 時,它會自動將範本自動配置到工廠中;當配置您自己的工廠時,必須如下列範例所示設定。

從 2.2 版本開始,您可以將 ReplyHeadersConfigurer 新增至監聽器容器工廠。諮詢此設定器以確定您要在回覆訊息中設定哪些標頭。以下範例示範如何新增 ReplyHeadersConfigurer

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

如果願意,您也可以新增更多標頭。以下範例示範如何執行此操作

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

當您使用 @SendTo 時,您必須在 ConcurrentKafkaListenerContainerFactoryreplyTemplate 屬性中配置 KafkaTemplate 以執行傳送。Spring Boot 將自動連線其自動配置的範本(或任何單一實例存在的情況)。

除非您使用請求/回覆語意,否則僅使用簡單的 send(topic, value) 方法,因此您可能希望建立子類別以產生分割區或金鑰。以下範例示範如何執行此操作
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory()) {

        @Override
        public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
            return super.send(topic, partitionForData(data), keyForData(data), data);
        }

        ...

    };
}

如果監聽器方法返回 Message<?>Collection<Message<?>>,則監聽器方法負責設定回覆的訊息標頭。例如,當處理來自 ReplyingKafkaTemplate 的請求時,您可以執行以下操作

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

使用請求/回覆語意時,傳送者可以請求目標分割區。

即使未返回任何結果,您也可以使用 @SendTo 注解 @KafkaListener 方法。這是為了允許配置 errorHandler,它可以將有關訊息傳遞失敗的資訊轉發到某些主題。以下範例示範如何執行此操作

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}

有關更多資訊,請參閱處理例外

如果監聽器方法返回 Iterable,則預設情況下,會傳送每個元素作為值的記錄。從 2.3.5 版本開始,將 @KafkaListener 上的 splitIterables 屬性設定為 false,整個結果將作為單個 ProducerRecord 的值傳送。這需要在回覆範本的 Producer 配置中使用合適的序列化程式。但是,如果回覆是 Iterable<Message<?>>,則會忽略該屬性,並且每個訊息都會單獨傳送。