發送訊息
本節涵蓋如何發送訊息。
使用 KafkaTemplate
本節涵蓋如何使用 KafkaTemplate
發送訊息。
總覽
KafkaTemplate
包裝了生產者並提供便捷的方法來將資料發送到 Kafka 主題。以下列表顯示了 KafkaTemplate
中的相關方法
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
請參閱 Javadoc 以取得更多詳細資訊。
在 3.0 版本中,先前返回 ListenableFuture 的方法已變更為返回 CompletableFuture 。為了方便遷移,2.9 版本新增了一個方法 usingCompletableFuture() ,該方法提供了相同的方法,但返回類型為 CompletableFuture ;此方法已不再可用。 |
sendDefault
API 要求已為範本提供預設主題。
API 接受 timestamp
作為參數,並將此時間戳記儲存在記錄中。使用者提供的时间戳記的儲存方式取決於 Kafka 主題上設定的時間戳記類型。如果主題設定為使用 CREATE_TIME
,則會記錄使用者指定的時間戳記(如果未指定,則會產生)。如果主題設定為使用 LOG_APPEND_TIME
,則會忽略使用者指定的時間戳記,並且 Broker 會加入本機 Broker 時間。
若要使用範本,您可以設定生產者 factory 並在範本的建構子中提供它。以下範例顯示如何執行此操作
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
從 2.5 版本開始,您現在可以覆寫 factory 的 ProducerConfig
屬性,以從同一個 factory 建立具有不同生產者組態的範本。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
請注意,類型為 ProducerFactory<?, ?>
的 Bean(例如 Spring Boot 自動設定的 Bean)可以使用不同的縮小泛型類型來參考。
您也可以使用標準 <bean/>
定義來設定範本。
然後,若要使用範本,您可以調用其方法之一。
當您使用帶有 Message<?>
參數的方法時,主題、分割區、金鑰和時間戳記資訊會在訊息標頭中提供,其中包括以下項目
-
KafkaHeaders.TOPIC
-
KafkaHeaders.PARTITION
-
KafkaHeaders.KEY
-
KafkaHeaders.TIMESTAMP
訊息 Payload 是資料。
您可以選擇性地使用 ProducerListener
設定 KafkaTemplate
,以取得發送結果(成功或失敗)的非同步回呼,而不是等待 Future
完成。以下列表顯示了 ProducerListener
介面的定義
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
依預設,範本配置了 LoggingProducerListener
,它會記錄錯誤,並且在發送成功時不執行任何操作。
為了方便起見,如果您只想實作其中一種方法,則提供預設方法實作。
請注意,發送方法會返回 CompletableFuture<SendResult>
。您可以向監聽器註冊回呼,以非同步方式接收發送結果。以下範例顯示如何執行此操作
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
SendResult
具有兩個屬性:ProducerRecord
和 RecordMetadata
。請參閱 Kafka API 文件以取得有關這些物件的資訊。
Throwable
可以轉換為 KafkaProducerException
;其 producerRecord
屬性包含失敗的記錄。
如果您希望阻止發送執行緒以等待結果,您可以調用 future 的 get()
方法;建議使用帶有逾時的方法。如果您已設定 linger.ms
,您可能希望在等待之前調用 flush()
,或者為了方便起見,範本具有帶有 autoFlush
參數的建構子,該參數會導致範本在每次發送時執行 flush()
。只有在您設定了 linger.ms
生產者屬性並想要立即發送部分批次時,才需要刷新。
範例
本節顯示將訊息發送到 Kafka 的範例
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
請注意,ExecutionException
的原因是帶有 producerRecord
屬性的 KafkaProducerException
。
使用 RoutingKafkaTemplate
從 2.5 版本開始,您可以使用 RoutingKafkaTemplate
在執行時根據目的地 topic
名稱選擇生產者。
路由範本不支援交易、execute 、flush 或 metrics 操作,因為這些操作的主題是未知的。 |
範本需要 java.util.regex.Pattern
到 ProducerFactory<Object, Object>
實例的 Map。此 Map 應為有序的(例如 LinkedHashMap
),因為它會依序遍歷;您應該在開頭新增更具體的模式。
以下簡單的 Spring Boot 應用程式提供了一個範例,說明如何使用相同的範本發送到不同的主題,每個主題都使用不同的值序列化程式。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
此範例的對應 @KafkaListener
顯示在 註解屬性 中。
如需另一種技術來實現類似的結果,但具有將不同類型發送到同一主題的額外功能,請參閱 委派序列化程式和反序列化程式。
使用 DefaultKafkaProducerFactory
如 使用 KafkaTemplate
中所示,ProducerFactory
用於建立生產者。
當不使用 交易 時,依預設,DefaultKafkaProducerFactory
會建立所有用戶端使用的單例生產者,如 KafkaProducer
JavaDocs 中建議的那樣。但是,如果您在範本上調用 flush()
,這可能會導致使用相同生產者的其他執行緒延遲。從 2.3 版本開始,DefaultKafkaProducerFactory
具有新的屬性 producerPerThread
。當設定為 true
時,factory 將為每個執行緒建立(並快取)一個單獨的生產者,以避免此問題。
當 producerPerThread 為 true 時,使用者程式碼 必須 在不再需要生產者時在 factory 上調用 closeThreadBoundProducer() 。這將實際關閉生產者並將其從 ThreadLocal 中移除。調用 reset() 或 destroy() 不會清理這些生產者。 |
另請參閱 KafkaTemplate
交易和非交易發布。
在建立 DefaultKafkaProducerFactory
時,可以透過調用僅接受屬性 Map 的建構子(請參閱 使用 KafkaTemplate
中的範例)從組態中選取金鑰和/或值 Serializer
類別,或者可以將 Serializer
實例傳遞給 DefaultKafkaProducerFactory
建構子(在這種情況下,所有 Producer
都共用相同的實例)。或者,您可以提供 Supplier<Serializer>
s(從 2.3 版本開始),這些供應商將用於為每個 Producer
取得單獨的 Serializer
實例
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
從 2.5.10 版本開始,您現在可以在建立 factory 後更新生產者屬性。例如,如果您必須在憑證變更後更新 SSL 金鑰/信任儲存位置,這可能會很有用。變更不會影響現有的生產者實例;調用 reset()
以關閉任何現有的生產者,以便使用新屬性建立新的生產者。注意:您無法將交易生產者 factory 變更為非交易生產者 factory,反之亦然。
現在提供兩個新方法
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
從 2.8 版本開始,如果您以物件形式提供序列化程式(在建構子中或透過 Setter),factory 將調用 configure()
方法以使用組態屬性來設定它們。
使用 ReplyingKafkaTemplate
2.1.3 版本引入了 KafkaTemplate
的子類別,以提供請求/回覆語意。該類別名為 ReplyingKafkaTemplate
,並且有兩個額外的方法;以下顯示方法簽名
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(另請參閱 使用 Message<?>
s 進行請求/回覆)。
結果是一個 CompletableFuture
,它會以非同步方式填入結果(或例外狀況,表示逾時)。結果還具有 sendFuture
屬性,這是調用 KafkaTemplate.send()
的結果。您可以使用此 future 來判斷發送操作的結果。
在 3.0 版本中,這些方法(及其 sendFuture 屬性)返回的 future 已從 ListenableFuture s 變更為 CompletableFuture s。 |
如果使用第一個方法,或者 replyTimeout
引數為 null
,則會使用範本的 defaultReplyTimeout
屬性(預設為 5 秒)。
從 2.8.8 版本開始,範本有一個新方法 waitForAssignment
。如果回覆容器配置了 auto.offset.reset=latest
,這非常有用,可以避免在容器初始化之前發送請求和回覆。
當使用手動分割區指派(無群組管理)時,等待時間必須大於容器的 pollTimeout 屬性,因為通知將在第一次輪詢完成後才會發送。 |
以下 Spring Boot 應用程式顯示了如何使用此功能的範例
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
請注意,我們可以利用 Boot 的自動配置容器 factory 來建立回覆容器。
如果回覆使用了非簡單的反序列化程式,請考慮使用 ErrorHandlingDeserializer
,它會委派給您配置的反序列化程式。如此配置後,RequestReplyFuture
將會異常完成,您可以捕獲 ExecutionException
,並在其 cause
屬性中包含 DeserializationException
。
從 2.6.7 版本開始,除了偵測 DeserializationException
s 之外,範本還將調用 replyErrorChecker
函式(如果已提供)。如果它返回例外狀況,future 將會異常完成。
這是一個範例
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
範本設定了一個標頭(預設名稱為 KafkaHeaders.CORRELATION_ID
),伺服器端必須將其回傳。
在此案例中,以下 @KafkaListener
應用程式會回應
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
@KafkaListener
基礎架構會回傳關聯 ID 並判斷回覆主題。
請參閱 使用 @SendTo
轉發監聽器結果 以取得有關發送回覆的更多資訊。範本使用預設標頭 KafKaHeaders.REPLY_TOPIC
來指示回覆的目的地主題。
從 2.2 版本開始,範本會嘗試從配置的回覆容器中偵測回覆主題或分割區。如果容器配置為監聽單一主題或單一 TopicPartitionOffset
,則會使用它來設定回覆標頭。如果以其他方式配置容器,則使用者必須設定回覆標頭。在這種情況下,初始化期間會寫入 INFO
日誌訊息。以下範例使用 KafkaHeaders.REPLY_TOPIC
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
當您使用單一回覆 TopicPartitionOffset
進行配置時,您可以將相同的回覆主題用於多個範本,只要每個實例監聽不同的分割區即可。當使用單一回覆主題進行配置時,每個實例都必須使用不同的 group.id
。在這種情況下,所有實例都會接收到每個回覆,但只有發送請求的實例才會找到關聯 ID。這對於自動縮放可能很有用,但會產生額外的網路流量開銷,以及丟棄每個不需要的回覆的小成本。當您使用此設定時,我們建議您將範本的 sharedReplyTopic
設定為 true
,這會將非預期回覆的記錄層級從預設 ERROR 降低為 DEBUG。
以下是配置回覆容器以使用相同共用回覆主題的範例
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
如果您有多個用戶端實例,並且您未按照前一段討論的方式配置它們,則每個實例都需要專用的回覆主題。另一種方法是設定 KafkaHeaders.REPLY_PARTITION 並為每個實例使用專用的分割區。Header 包含一個四位元組整數(大端)。伺服器必須使用此標頭將回覆路由到正確的分割區(@KafkaListener 會執行此操作)。但是,在這種情況下,回覆容器不得使用 Kafka 的群組管理功能,並且必須配置為監聽固定分割區(透過在其 ContainerProperties 建構子中使用 TopicPartitionOffset )。 |
DefaultKafkaHeaderMapper 需要 Jackson 位於類別路徑上(適用於 @KafkaListener )。如果它不可用,則訊息轉換器沒有標頭對應器,因此您必須使用 SimpleKafkaHeaderMapper 配置 MessagingMessageConverter ,如先前所示。 |
依預設,使用 3 個標頭
-
KafkaHeaders.CORRELATION_ID
- 用於將回覆與請求關聯 -
KafkaHeaders.REPLY_TOPIC
- 用於告知伺服器回覆的目的地 -
KafkaHeaders.REPLY_PARTITION
- (選用)用於告知伺服器回覆的目的地分割區
這些標頭名稱由 @KafkaListener
基礎架構用於路由回覆。
從 2.3 版本開始,您可以自訂標頭名稱 - 範本具有 3 個屬性 correlationHeaderName
、replyTopicHeaderName
和 replyPartitionHeaderName
。如果您的伺服器不是 Spring 應用程式(或未使用 @KafkaListener
),這會很有用。
反之,如果請求應用程式不是 Spring 應用程式,並且將關聯資訊放在不同的標頭中,則從 3.0 版本開始,您可以在監聽器容器 factory 上配置自訂 correlationHeaderName ,並且該標頭將會被回傳。先前,監聽器必須回傳自訂關聯標頭。 |
使用 Message<?>
s 進行請求/回覆
2.7 版本新增了 ReplyingKafkaTemplate
的方法,以發送和接收 spring-messaging
的 Message<?>
抽象
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
這些方法將使用範本的預設 replyTimeout
,還有可以接受方法調用逾時的重載版本。
在 3.0 版本中,這些方法(及其 sendFuture 屬性)返回的 future 已從 ListenableFuture s 變更為 CompletableFuture s。 |
如果消費者的 Deserializer
或範本的 MessageConverter
可以在沒有任何額外資訊的情況下轉換 Payload,則使用第一個方法,無論是透過組態還是回覆訊息中的類型 Metadata。
如果您需要為返回類型提供類型資訊,以協助訊息轉換器,則使用第二個方法。這也允許相同的範本接收不同的類型,即使回覆中沒有類型 Metadata,例如當伺服器端不是 Spring 應用程式時。以下是後者的範例
-
Java
-
Kotlin
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
@Bean
fun template(
pf: ProducerFactory<String?, String>?,
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
-
Java
-
Kotlin
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
回覆類型 Message<?>
當 @KafkaListener
返回 Message<?>
時,在 2.5 之前的版本中,必須填入回覆主題和關聯 ID 標頭。在此範例中,我們使用來自請求的回覆主題標頭
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
這也顯示了如何在回覆記錄上設定金鑰。
從 2.5 版本開始,框架將偵測這些標頭是否遺失,並使用主題(從 @SendTo
值或傳入的 KafkaHeaders.REPLY_TOPIC
標頭(如果存在)判斷的主題)填入它們。它也會回傳傳入的 KafkaHeaders.CORRELATION_ID
和 KafkaHeaders.REPLY_PARTITION
(如果存在)。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
彙總多個回覆
使用 ReplyingKafkaTemplate
中的範本嚴格用於單一請求/回覆情境。對於單一訊息的多個接收者返回回覆的情況,您可以使用 AggregatingReplyingKafkaTemplate
。這是 Scatter-Gather Enterprise Integration Pattern 的用戶端實作。
與 ReplyingKafkaTemplate
類似,AggregatingReplyingKafkaTemplate
建構子接受生產者 factory 和監聽器容器以接收回覆;它具有第三個參數 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy
,每次收到回覆時都會諮詢它;當述詞返回 true
時,ConsumerRecord
s 的集合會用於完成 sendAndReceive
方法返回的 Future
。
還有一個額外的屬性 returnPartialOnTimeout
(預設為 false)。當此屬性設定為 true
時,部分結果會正常完成 future(只要已收到至少一筆回覆記錄),而不是使用 KafkaReplyTimeoutException
完成 future。
從 2.3.5 版本開始,在逾時後也會調用述詞(如果 returnPartialOnTimeout
為 true
)。第一個引數是目前的記錄清單;如果此調用是由於逾時,則第二個引數為 true
。述詞可以修改記錄清單。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
請注意,返回類型是 ConsumerRecord
,其值是 ConsumerRecord
s 的集合。「外部」ConsumerRecord
不是「真實」記錄,它是範本合成的,作為接收到的請求的實際回覆記錄的持有者。當發生正常釋放(釋放策略返回 true)時,主題會設定為 aggregatedResults
;如果 returnPartialOnTimeout
為 true,並且發生逾時(且已收到至少一筆回覆記錄),則主題會設定為 partialResultsAfterTimeout
。範本為這些「主題」名稱提供常數靜態變數
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
Collection
中的真實 ConsumerRecord
s 包含接收回覆的實際主題。
回覆的監聽器容器 必須 配置為 AckMode.MANUAL 或 AckMode.MANUAL_IMMEDIATE ;消費者屬性 enable.auto.commit 必須為 false (自 2.3 版本以來的預設值)。為了避免任何訊息遺失的可能性,範本僅在沒有未完成的請求時才提交偏移量,即當最後一個未完成的請求由釋放策略釋放時。重新平衡後,可能會重複傳遞回覆;對於任何正在進行中的請求,這些回覆將被忽略;當為已釋放的回覆收到重複的回覆時,您可能會看到錯誤日誌訊息。 |
如果您將 ErrorHandlingDeserializer 與此彙總範本一起使用,則框架不會自動偵測 DeserializationException s。相反,記錄(帶有 null 值)將完整返回,反序列化例外狀況位於標頭中。建議應用程式調用公用程式方法 ReplyingKafkaTemplate.checkDeserialization() 方法來判斷是否發生反序列化例外狀況。請參閱其 JavaDocs 以取得更多資訊。replyErrorChecker 也不會為此彙總範本調用;您應該對回覆的每個元素執行檢查。 |