錯誤處理
Apache Kafka Streams 提供了原生處理來自反序列化錯誤的例外狀況的功能。有關此支援的詳細資訊,請參閱此處。Apache Kafka Streams 開箱即用提供了兩種反序列化例外狀況處理器 - LogAndContinueExceptionHandler
和 LogAndFailExceptionHandler
。顧名思義,前者將記錄錯誤並繼續處理下一個記錄,而後者將記錄錯誤並失敗。LogAndFailExceptionHandler
是預設的反序列化例外狀況處理器。
在 Binder 中處理反序列化例外狀況
Kafka Streams Binder 允許使用以下屬性指定上述反序列化例外狀況處理器。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述兩種反序列化例外狀況處理器之外,Binder 還提供了第三種處理器,用於將錯誤記錄(毒丸)發送到 DLQ(死信佇列)主題。以下是如何啟用此 DLQ 例外狀況處理器。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
當設定上述屬性時,所有反序列化錯誤中的記錄都會自動發送到 DLQ 主題。
您可以設定發布 DLQ 訊息的主題名稱如下。
您可以為 DlqDestinationResolver
提供實作,這是一個函數介面。DlqDestinationResolver
接受 ConsumerRecord
和例外狀況作為輸入,然後允許指定主題名稱作為輸出。透過存取 Kafka ConsumerRecord
,可以在 BiFunction
的實作中檢視標頭記錄。
以下是提供 DlqDestinationResolver
實作的範例。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在為 DlqDestinationResolver
提供實作時,需要記住一件重要的事情是,Binder 中的 provisioner 不會自動為應用程式建立主題。這是因為 Binder 無法推斷實作可能發送到的所有 DLQ 主題的名稱。因此,如果您使用此策略提供 DLQ 名稱,則應用程式有責任確保事先建立這些主題。
如果 DlqDestinationResolver
作為 bean 存在於應用程式中,則它具有更高的優先順序。如果您不想遵循此方法,而是想使用組態提供靜態 DLQ 名稱,則可以設定以下屬性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果設定了此屬性,則錯誤記錄將發送到主題 custom-dlq
。如果應用程式未使用上述任何一種策略,則它將建立一個 DLQ 主題,名稱為 error.<input-topic-name>.<application-id>
。例如,如果您的繫結目的地主題是 inputTopic
,且應用程式 ID 是 process-applicationId
,則預設 DLQ 主題是 error.inputTopic.process-applicationId
。如果您的意圖是啟用 DLQ,則始終建議為每個輸入繫結顯式建立 DLQ 主題。
每個輸入消費者繫結的 DLQ
屬性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
適用於整個應用程式。這表示如果同一個應用程式中有多個函數,則此屬性將應用於所有函數。但是,如果您在單個處理器中有多個處理器或多個輸入繫結,則可以使用 Binder 為每個輸入消費者繫結提供的更細緻的 DLQ 控制。
如果您有以下處理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
並且您只想在第一個輸入繫結上啟用 DLQ,並在第二個繫結上啟用 skipAndContinue,那麼您可以像下面這樣在消費者上執行此操作。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以這種方式設定反序列化例外狀況處理器比在 Binder 層級設定具有更高的優先順序。
DLQ 分割
預設情況下,記錄會使用與原始記錄相同的分割區發布到死信主題。這表示死信主題必須至少具有與原始記錄一樣多的分割區。
若要變更此行為,請將 DlqPartitionFunction
實作新增為應用程式內容的 @Bean
。只能存在一個這樣的 bean。該函數會提供消費者群組(在大多數情況下與應用程式 ID 相同)、失敗的 ConsumerRecord
和例外狀況。例如,如果您始終想要路由到分割區 0,您可以使用
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果您將消費者繫結的 dlqPartitions 屬性設定為 1(且 Binder 的 minPartitionCount 等於 1 ),則無需提供 DlqPartitionFunction ;架構將始終使用分割區 0。如果您將消費者繫結的 dlqPartitions 屬性設定為大於 1 的值(或 Binder 的 minPartitionCount 大於 1 ),即使分割區計數與原始主題的分割區計數相同,您也必須提供 DlqPartitionFunction bean。 |
在使用 Kafka Streams Binder 中的例外狀況處理功能時,需要記住一些事項。
-
屬性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler
適用於整個應用程式。這表示如果同一個應用程式中有多個函數,則此屬性將應用於所有函數。 -
反序列化的例外狀況處理與原生反序列化和架構提供的訊息轉換一致地運作。
在 Binder 中處理生產例外狀況
與上述針對反序列化例外狀況處理器的支援不同,Binder 沒有為處理生產例外狀況提供此類一流的機制。但是,您仍然可以使用 StreamsBuilderFactoryBean
自訂器組態生產例外狀況處理器,您可以在下面的後續章節中找到更多詳細資訊。
執行階段錯誤處理
當涉及到處理來自應用程式碼的錯誤時,即來自業務邏輯執行的錯誤,通常由應用程式來處理。因為 Kafka Streams Binder 無法干預應用程式碼。但是,為了讓應用程式更輕鬆,Binder 提供了一個方便的 RecordRecoverableProcessor
,使用它,您可以指示您想要如何處理應用程式層級的錯誤。
考慮以下程式碼。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
如果您的 map
呼叫中的業務程式碼拋出例外狀況,則您有責任處理該錯誤。這就是 RecordRecoverableProcessor
變得方便的地方。預設情況下,RecordRecoverableProcessor
將僅記錄錯誤並讓應用程式繼續執行。假設您想要將失敗的記錄發布到 DLT,而不是在應用程式中處理它。在這種情況下,您必須使用名為 DltAwareProcessor
的 RecordRecoverableProcessor
自訂實作。以下是如何執行此操作。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
原始 map
呼叫中的業務邏輯程式碼現在已移至 KStream#process
方法呼叫的一部分,該方法呼叫採用 ProcessorSupplier
。然後,我們傳入自訂的 DltAwareProcessor
,它可以發布到 DLT。上述 DltAwareProcessor
的建構子接受三個參數 - 一個 Function
,它接受輸入記錄,然後將業務邏輯運算作為 Function
主體的一部分,DLT 主題,以及最後一個 DltPublishingContext
。當 Function
的 lambda 運算式拋出例外狀況時,DltAwareProcessor
會將輸入記錄發送到 DLT。DltPublishingContext
為 DltAwareProcessor
提供必要的發布基礎架構 bean。DltPublishingContext
由 Binder 自動組態,以便您可以將其直接注入到應用程式中。
如果您不希望 Binder 將失敗的記錄發布到 DLT,則必須直接使用 RecordRecoverableProcessor
而不是 DltAwareProcessor
。您可以提供自己的恢復器作為 BiConsumer
,它接受輸入 Record
和例外狀況作為引數。假設一種情況,您不想將記錄發送到 DLT,而只是記錄訊息並繼續執行。以下是如何完成此操作的範例。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// Handle the record
}));
}
在這種情況下,當記錄失敗時,RecordRecoverableProcessor
會使用使用者提供的恢復器,這是一個 BiConsumer
,它接受失敗的記錄和拋出的例外狀況作為引數。
在 DltAwareProcessor 中處理記錄鍵
當使用 DltAwareProcessor
將失敗的記錄發送到 DLT 時,如果您想要將記錄鍵發送到 DLT 主題,則需要在 DLT 繫結上設定適當的序列化器。這是因為 DltAwareProcessor
使用 StreamBridge
,後者使用常規 Kafka Binder(基於訊息通道),預設情況下,後者對鍵使用 ByteArraySerializer
。在記錄值的情況下,Spring Cloud Stream 會將酬載轉換為正確的 byte[]
;但是,鍵的情況並非如此,因為它只會傳遞在標頭中接收到的內容作為鍵。如果您提供非位元組陣列鍵,則可能會導致類別轉換例外狀況,為了避免這種情況,您需要在 DLT 繫結上設定序列化器,如下所示。
假設 DLT 目的地是 hello-dlt-1
並且記錄鍵是 String 資料類型。
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer