非同步 @KafkaListener 返回類型

從 3.2 版本開始,@KafkaListener (和 @KafkaHandler) 方法可以使用非同步返回類型指定,允許非同步發送回覆。返回類型包括 CompletableFuture<?>Mono<?> 和 Kotlin suspend 函數。

@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
    ...
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("done");
    return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
    ...
    return Mono.empty();
}
當偵測到非同步返回類型時,AckMode 將自動設定為 MANUAL 並啟用無序提交;相反地,當非同步操作完成時,非同步完成將會確認 (ack)。當非同步結果以錯誤完成時,訊息是否恢復取決於容器錯誤處理器。如果在監聽器方法內發生某些例外,阻止建立非同步結果物件,您必須捕獲該例外並返回適當的返回物件,該物件將導致訊息被確認 (ack) 或恢復。

如果在具有非同步返回類型(包括 Kotlin 暫停函數)的監聽器上配置了 KafkaListenerErrorHandler,則在失敗後會調用錯誤處理程序。請參閱處理例外以獲取有關此錯誤處理程序及其用途的更多資訊。