使用 @SendTo
轉發監聽器結果
從 2.0 版本開始,如果您也使用 @SendTo
注解來註解 @KafkaListener
,且方法調用返回結果,則結果將會轉發到 @SendTo
指定的主題。
@SendTo
值可以有多種形式
-
@SendTo("someTopic")
路由到文字主題。 -
@SendTo("#{someExpression}")
路由到在應用程式內容初始化期間評估一次的表達式所決定的主題。 -
@SendTo("!{someExpression}")
路由到在執行時期評估表達式所決定的主題。評估的#root
物件具有三個屬性-
request
:入站ConsumerRecord
(或批次監聽器的ConsumerRecords
物件)。 -
source
:從request
轉換而來的org.springframework.messaging.Message<?>
。 -
result
:方法返回結果。
-
-
@SendTo
(無屬性):這被視為!{source.headers['kafka_replyTopic']}
(自 2.1.3 版本起)。
從 2.1.11 和 2.2.1 版本開始,屬性佔位符在 @SendTo
值內解析。
表達式評估的結果必須是代表主題名稱的 String
。以下範例展示了使用 @SendTo
的各種方式
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
為了支援 @SendTo ,監聽器容器工廠必須提供 KafkaTemplate (在其 replyTemplate 屬性中),用於傳送回覆。這應該是 KafkaTemplate ,而不是用於客戶端請求/回覆處理的 ReplyingKafkaTemplate 。使用 Spring Boot 時,它會自動將範本自動配置到工廠中;當配置您自己的工廠時,必須如下列範例所示設定。 |
從 2.2 版本開始,您可以將 ReplyHeadersConfigurer
新增至監聽器容器工廠。諮詢此設定器以確定您要在回覆訊息中設定哪些標頭。以下範例示範如何新增 ReplyHeadersConfigurer
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果願意,您也可以新增更多標頭。以下範例示範如何執行此操作
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
當您使用 @SendTo
時,您必須在 ConcurrentKafkaListenerContainerFactory
的 replyTemplate
屬性中配置 KafkaTemplate
以執行傳送。Spring Boot 將自動連線其自動配置的範本(或任何單一實例存在的情況)。
除非您使用請求/回覆語意,否則僅使用簡單的 send(topic, value) 方法,因此您可能希望建立子類別以產生分割區或金鑰。以下範例示範如何執行此操作 |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
如果監聽器方法返回
|
使用請求/回覆語意時,傳送者可以請求目標分割區。
即使未返回任何結果,您也可以使用
有關更多資訊,請參閱處理例外。 |
如果監聽器方法返回 Iterable ,則預設情況下,會傳送每個元素作為值的記錄。從 2.3.5 版本開始,將 @KafkaListener 上的 splitIterables 屬性設定為 false ,整個結果將作為單個 ProducerRecord 的值傳送。這需要在回覆範本的 Producer 配置中使用合適的序列化程式。但是,如果回覆是 Iterable<Message<?>> ,則會忽略該屬性,並且每個訊息都會單獨傳送。 |