非同步 @KafkaListener
返回類型
從 3.2 版本開始,@KafkaListener
(和 @KafkaHandler
) 方法可以使用非同步返回類型指定,允許非同步發送回覆。返回類型包括 CompletableFuture<?>
、Mono<?>
和 Kotlin suspend
函數。
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
當偵測到非同步返回類型時,AckMode 將自動設定為 MANUAL 並啟用無序提交;相反地,當非同步操作完成時,非同步完成將會確認 (ack)。當非同步結果以錯誤完成時,訊息是否恢復取決於容器錯誤處理器。如果在監聽器方法內發生某些例外,阻止建立非同步結果物件,您必須捕獲該例外並返回適當的返回物件,該物件將導致訊息被確認 (ack) 或恢復。 |
如果在具有非同步返回類型(包括 Kotlin 暫停函數)的監聽器上配置了 KafkaListenerErrorHandler
,則在失敗後會調用錯誤處理程序。請參閱處理例外以獲取有關此錯誤處理程序及其用途的更多資訊。