處理例外狀況
本節說明如何處理使用 Spring for Apache Kafka 時可能發生的各種例外狀況。
監聽器錯誤處理器
從 2.0 版開始,@KafkaListener
註解新增了一個屬性:errorHandler
。
您可以使用 errorHandler
來提供 KafkaListenerErrorHandler
實作的 Bean 名稱。此函數介面有一個方法,如下列清單所示
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
您可以存取訊息轉換器產生的 spring-messaging Message<?>
物件,以及監聽器擲回的例外狀況,該例外狀況包裝在 ListenerExecutionFailedException
中。錯誤處理器可以擲回原始例外狀況或新的例外狀況,該例外狀況會擲回給容器。錯誤處理器傳回的任何內容都會被忽略。
從 2.7 版開始,您可以在 MessagingMessageConverter
和 BatchMessagingMessageConverter
上設定 rawRecordHeader
屬性,這會導致原始 ConsumerRecord
新增至轉換後的 Message<?>
的 KafkaHeaders.RAW_DATA
標頭中。如果您希望在監聽器錯誤處理器中使用 DeadLetterPublishingRecoverer
,這會很有用。它可以用於請求/回覆情境,在您希望在一些重試次數後,在死信主題中擷取失敗的紀錄後,將失敗結果傳送給傳送者。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一個子介面 (ConsumerAwareListenerErrorHandler
),可以透過以下方法存取消費者物件
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一個子介面 (ManualAckListenerErrorHandler
) 在使用手動 AckMode
時提供對 Acknowledgment
物件的存取權。
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在任何一種情況下,您都不應在消費者上執行任何搜尋,因為容器將不會意識到它們。
容器錯誤處理器
從 2.8 版開始,舊版的 ErrorHandler
和 BatchErrorHandler
介面已被新的 CommonErrorHandler
取代。這些錯誤處理器可以處理紀錄和批次監聽器的錯誤,允許單一監聽器容器 factory 為兩種監聽器類型建立容器。提供了 CommonErrorHandler
實作來取代大多數舊版框架錯誤處理器實作。
請參閱 將自訂舊版錯誤處理器實作移轉至 CommonErrorHandler
,以取得將自訂錯誤處理器移轉至 CommonErrorHandler
的資訊。
當使用交易時,預設情況下不會設定錯誤處理器,因此例外狀況將會回滾交易。交易容器的錯誤處理由 AfterRollbackProcessor
處理。如果您在使用交易時提供自訂錯誤處理器,如果您希望交易回滾,則必須擲回例外狀況。
此介面有一個預設方法 isAckAfterHandle()
,容器會呼叫該方法以判斷如果錯誤處理器在沒有擲回例外狀況的情況下傳回,是否應提交偏移量;預設情況下它會傳回 true。
通常,框架提供的錯誤處理器會在錯誤未被「處理」(例如,在執行搜尋操作之後)時擲回例外狀況。預設情況下,此類例外狀況會由容器記錄在 ERROR
層級。所有框架錯誤處理器都擴充了 KafkaExceptionLogLevelAware
,這可讓您控制記錄這些例外狀況的層級。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定要用於容器 factory 中所有監聽器的全域錯誤處理器。以下範例示範如何執行此操作
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
預設情況下,如果已註解的監聽器方法擲回例外狀況,則會將其擲回給容器,並根據容器組態處理訊息。
容器會在呼叫錯誤處理器之前提交任何擱置中的偏移量提交。
如果您使用 Spring Boot,您只需要將錯誤處理器新增為 @Bean
,而 Boot 會將其新增至自動設定的 factory。
退避處理器
例如 DefaultErrorHandler 等錯誤處理器使用 BackOff
來判斷在重試傳遞之前要等待多久。從 2.9 版開始,您可以設定自訂 BackOffHandler
。預設處理器只會暫停執行緒,直到退避時間經過(或容器停止)。框架也提供了 ContainerPausingBackOffHandler
,它會暫停監聽器容器,直到退避時間經過,然後恢復容器。當延遲時間長於 max.poll.interval.ms
消費者屬性時,這會很有用。請注意,實際退避時間的解析度會受到 pollTimeout
容器屬性的影響。
DefaultErrorHandler
這個新的錯誤處理器取代了 SeekToCurrentErrorHandler
和 RecoveringBatchErrorHandler
,它們一直是多個版本的預設錯誤處理器。一個差異是批次監聽器的回退行為(當擲回 BatchListenerFailedException
以外的例外狀況時)相當於 重試完整批次。
從 2.9 版開始,可以將 DefaultErrorHandler 設定為提供與搜尋未處理紀錄偏移量相同的語意,但實際上不進行搜尋。相反地,紀錄會由監聽器容器保留,並在錯誤處理器結束後(以及在執行單一暫停的 poll() 以保持消費者運作之後)重新提交給監聽器;如果正在使用 非阻塞重試 或 ContainerPausingBackOffHandler ,則暫停可能會延長到多次輪詢)。錯誤處理器會將結果傳回容器,以指示目前的失敗紀錄是否可以重新提交,或者是否已復原,然後它將不會再次傳送給監聽器。若要啟用此模式,請將屬性 seekAfterError 設定為 false 。 |
錯誤處理器可以復原(略過)持續失敗的紀錄。預設情況下,在失敗十次後,失敗的紀錄會被記錄下來(在 ERROR
層級)。您可以使用自訂復原器 (BiConsumer
) 和控制傳遞嘗試次數和每次嘗試之間延遲時間的 BackOff
來設定處理器。將 FixedBackOff
與 FixedBackOff.UNLIMITED_ATTEMPTS
搭配使用會導致(實際上)無限重試。以下範例設定在嘗試三次後復原
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
若要使用此處理器的自訂執行個體設定監聽器容器,請將其新增至容器 factory。
例如,使用 @KafkaListener
容器 factory,您可以如下新增 DefaultErrorHandler
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
對於紀錄監聽器,這將重試傳遞最多 2 次(3 次傳遞嘗試),退避時間為 1 秒,而不是預設組態 (FixedBackOff(0L, 9)
)。在重試耗盡後,失敗只會被記錄下來。
例如,如果 poll
傳回六個紀錄(每個分割區 0、1、2 各兩個),且監聽器在第四個紀錄上擲回例外狀況,則容器會透過提交偏移量來確認前三個訊息。DefaultErrorHandler
會搜尋分割區 1 的偏移量 1 和分割區 2 的偏移量 0。下一個 poll()
會傳回三個未處理的紀錄。
如果 AckMode
是 BATCH
,則容器會在呼叫錯誤處理器之前提交前兩個分割區的偏移量。
對於批次監聽器,監聽器必須擲回 BatchListenerFailedException
,指出批次中哪些紀錄失敗。
事件順序如下
-
提交索引之前的紀錄的偏移量。
-
如果重試未耗盡,則執行搜尋,以便重新傳遞所有剩餘的紀錄(包括失敗的紀錄)。
-
如果重試耗盡,則嘗試復原失敗的紀錄(預設僅記錄日誌),並執行搜尋,以便重新傳遞剩餘的紀錄(不包括失敗的紀錄)。已復原紀錄的偏移量已提交。
-
如果重試耗盡且復原失敗,則執行搜尋,就像重試未耗盡一樣。
從 2.9 版開始,可以將 DefaultErrorHandler 設定為提供與搜尋未處理紀錄偏移量相同的語意,但實際上不進行搜尋。相反地,錯誤處理器會建立新的 ConsumerRecords<?, ?> ,其中僅包含未處理的紀錄,然後將其提交給監聽器(在執行單一暫停的 poll() 以保持消費者運作之後)。若要啟用此模式,請將屬性 seekAfterError 設定為 false 。 |
預設復原器會在重試耗盡後記錄失敗的紀錄。您可以使用自訂復原器,或框架提供的復原器,例如 DeadLetterPublishingRecoverer
。
當使用 POJO 批次監聽器(例如 List<Thing>
)且您沒有完整的消費者紀錄可新增至例外狀況時,您可以只新增失敗紀錄的索引
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
當容器設定為 AckMode.MANUAL_IMMEDIATE
時,可以將錯誤處理器設定為提交已復原紀錄的偏移量;將 commitRecovered
屬性設定為 true
。
另請參閱 發佈死信紀錄。
當使用交易時,DefaultAfterRollbackProcessor
提供類似的功能。請參閱 回滾後處理器。
DefaultErrorHandler
認為某些例外狀況是致命的,並且會略過此類例外狀況的重試;復原器會在第一次失敗時叫用。預設情況下,被視為致命的例外狀況為
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
因為這些例外狀況不太可能在重試的傳遞中解決。
您可以將更多例外狀況類型新增至不可重試的類別,或完全取代已分類例外狀況的對應。請參閱 DefaultErrorHandler.addNotRetryableException()
和 DefaultErrorHandler.setClassifications()
的 Javadocs 以取得更多資訊,以及 spring-retry
BinaryExceptionClassifier
的 Javadocs。
以下範例將 IllegalArgumentException
新增至不可重試的例外狀況
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
可以使用一或多個 RetryListener
設定錯誤處理器,以接收重試和復原進度的通知。從 2.8.10 版開始,新增了批次監聽器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
請參閱 JavaDocs 以取得更多資訊。
如果復原器失敗(擲回例外狀況),失敗的紀錄將會包含在搜尋中。如果復原器失敗,預設情況下 BackOff 將會重設,並且重新傳遞將再次經歷退避,然後再次嘗試復原。若要在復原失敗後略過重試,請將錯誤處理器的 resetStateOnRecoveryFailure 設定為 false 。 |
您可以為錯誤處理器提供 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
,以根據失敗的紀錄和/或例外狀況判斷要使用的 BackOff
handler.setBackOffFunction((record, ex) -> { ... });
如果函數傳回 null
,則將使用處理器的預設 BackOff
。
將 resetStateOnExceptionChange
設定為 true
,如果例外狀況類型在失敗之間變更,重試序列將會重新啟動(包括選取新的 BackOff
,如果已設定)。當 false
(2.9 版之前的預設值)時,不會考慮例外狀況類型。
從 2.9 版開始,這現在預設為 true
。
另請參閱 傳遞嘗試次數標頭。
批次錯誤處理器的轉換錯誤
從 2.8 版開始,當使用具有 ByteArrayDeserializer
、BytesDeserializer
或 StringDeserializer
的 MessageConverter
以及 DefaultErrorHandler
時,批次監聽器現在可以正確處理轉換錯誤。當發生轉換錯誤時,酬載會設定為 null,並且反序列化例外狀況會新增至紀錄標頭,類似於 ErrorHandlingDeserializer
。監聽器中提供 ConversionException
的清單,因此監聽器可以擲回 BatchListenerFailedException
,指出發生轉換例外狀況的第一個索引。
範例
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重試完整批次
這現在是 DefaultErrorHandler
對於批次監聽器的回退行為,其中監聽器擲回 BatchListenerFailedException
以外的例外狀況。
無法保證當重新傳遞批次時,批次具有相同數量的紀錄和/或重新傳遞的紀錄順序相同。因此,無法輕易地維護批次的重試狀態。FallbackBatchErrorHandler
採用以下方法。如果批次監聽器擲回不是 BatchListenerFailedException
的例外狀況,則會從記憶體中的紀錄批次執行重試。為了避免在延長的重試序列期間重新平衡,錯誤處理器會暫停消費者,在每次重試之前輪詢它,然後休眠退避時間,然後再次呼叫監聽器。如果/當重試耗盡時,會針對批次中的每個紀錄呼叫 ConsumerRecordRecoverer
。如果復原器擲回例外狀況,或執行緒在其休眠期間被中斷,則紀錄批次將在下一次輪詢時重新傳遞。在結束之前,無論結果如何,都會恢復消費者。
此機制無法與交易一起使用。 |
在等待 BackOff
間隔時,錯誤處理器將會迴圈短暫休眠,直到達到所需的延遲,同時檢查容器是否已停止,允許休眠在 stop()
後不久結束,而不是造成延遲。
容器停止錯誤處理器
如果監聽器擲回例外狀況,CommonContainerStoppingErrorHandler
會停止容器。對於紀錄監聽器,當 AckMode
為 RECORD
時,會提交已處理紀錄的偏移量。對於紀錄監聽器,當 AckMode
為任何手動值時,會提交已確認紀錄的偏移量。對於紀錄監聽器,當 AckMode
為 BATCH
時,或對於批次監聽器,當容器重新啟動時,將會重新播放整個批次。
在容器停止後,會擲回包裝 ListenerExecutionFailedException
的例外狀況。這是為了導致交易回滾(如果已啟用交易)。
委派錯誤處理器
CommonDelegatingErrorHandler
可以根據例外狀況類型委派給不同的錯誤處理器。例如,您可能希望針對大多數例外狀況叫用 DefaultErrorHandler
,或針對其他例外狀況叫用 CommonContainerStoppingErrorHandler
。
所有委派都必須共用相同的相容屬性 (ackAfterHandle
, seekAfterError
…)。
針對紀錄和批次監聽器使用不同的通用錯誤處理器
如果您希望針對紀錄和批次監聽器使用不同的錯誤處理策略,則提供了 CommonMixedErrorHandler
,允許針對每種監聽器類型組態特定的錯誤處理器。
通用錯誤處理器摘要
-
DefaultErrorHandler
-
CommonContainerStoppingErrorHandler
-
CommonDelegatingErrorHandler
-
CommonLoggingErrorHandler
-
CommonMixedErrorHandler
舊版錯誤處理器及其替代方案
舊版錯誤處理器 | 替代方案 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
無替代方案,使用具有無限 |
|
|
|
無替代方案,使用 |
將自訂舊版錯誤處理器實作移轉至 CommonErrorHandler
請參閱 CommonErrorHandler
中的 JavaDocs。
若要取代 ErrorHandler
或 ConsumerAwareErrorHandler
實作,您應該實作 handleOne()
並將 seeksAfterHandle()
保留為傳回 false
(預設值)。您也應該實作 handleOtherException()
以處理在紀錄處理範圍之外發生的例外狀況(例如,消費者錯誤)。
若要取代 RemainingRecordsErrorHandler
實作,您應該實作 handleRemaining()
並覆寫 seeksAfterHandle()
以傳回 true
(錯誤處理器必須執行必要的搜尋)。您也應該實作 handleOtherException()
- 以處理在紀錄處理範圍之外發生的例外狀況(例如,消費者錯誤)。
若要取代任何 BatchErrorHandler
實作,您應該實作 handleBatch()
。您也應該實作 handleOtherException()
- 以處理在紀錄處理範圍之外發生的例外狀況(例如,消費者錯誤)。
回滾後處理器
當使用交易時,如果監聽器擲回例外狀況(且錯誤處理器(如果存在)擲回例外狀況),則交易會回滾。預設情況下,任何未處理的紀錄(包括失敗的紀錄)都會在下一次輪詢時重新擷取。這是透過在 DefaultAfterRollbackProcessor
中執行 seek
操作來實現的。使用批次監聽器時,會重新處理整個紀錄批次(容器不知道批次中哪個紀錄失敗)。若要修改此行為,您可以使用自訂 AfterRollbackProcessor
設定監聽器容器。例如,對於基於紀錄的監聽器,您可能想要追蹤失敗的紀錄,並在嘗試一定次數後放棄,或許可以將其發佈到死信主題。
從 2.2 版開始,DefaultAfterRollbackProcessor
現在可以復原(略過)持續失敗的紀錄。預設情況下,在失敗十次後,失敗的紀錄會被記錄下來(在 ERROR
層級)。您可以使用自訂復原器 (BiConsumer
) 和最大失敗次數來設定處理器。將 maxFailures
屬性設定為負數會導致無限重試。以下範例設定在嘗試三次後復原
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
當您不使用交易時,您可以透過設定 DefaultErrorHandler
來實現類似的功能。請參閱 容器錯誤處理器。
從 3.2 版開始,復原現在可以復原(略過)整個持續失敗的紀錄批次。設定 ContainerProperties.setBatchRecoverAfterRollback(true)
以啟用此功能。
預設行為,復原對於批次監聽器是不可能的,因為框架不知道批次中哪個紀錄持續失敗。在這種情況下,應用程式監聽器必須處理持續失敗的紀錄。 |
另請參閱 發佈死信紀錄。
從 2.2.5 版開始,可以在新的交易中叫用 DefaultAfterRollbackProcessor
(在失敗的交易回滾後啟動)。然後,如果您使用 DeadLetterPublishingRecoverer
來發佈失敗的紀錄,處理器會將已復原紀錄在原始主題/分割區中的偏移量傳送至交易。若要啟用此功能,請在 DefaultAfterRollbackProcessor
上設定 commitRecovered
和 kafkaTemplate
屬性。
如果復原器失敗(擲回例外狀況),失敗的紀錄將會包含在搜尋中。從 2.5.5 版開始,如果復原器失敗,預設情況下 BackOff 將會重設,並且重新傳遞將再次經歷退避,然後再次嘗試復原。在較早的版本中,BackOff 不會重設,並且會在下一次失敗時重新嘗試復原。若要還原為先前的行為,請將處理器的 resetStateOnRecoveryFailure 屬性設定為 false 。 |
從 2.6 版開始,您現在可以為處理器提供 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>
,以根據失敗的紀錄和/或例外狀況判斷要使用的 BackOff
handler.setBackOffFunction((record, ex) -> { ... });
如果函數傳回 null
,則將使用處理器的預設 BackOff
。
從 2.6.3 版開始,將 resetStateOnExceptionChange
設定為 true
,如果例外狀況類型在失敗之間變更,重試序列將會重新啟動(包括選取新的 BackOff
,如果已設定)。預設情況下,不會考慮例外狀況類型。
從 2.3.1 版開始,與 DefaultErrorHandler
類似,DefaultAfterRollbackProcessor
認為某些例外狀況是致命的,並且會略過此類例外狀況的重試;復原器會在第一次失敗時叫用。預設情況下,被視為致命的例外狀況為
-
DeserializationException
-
MessageConversionException
-
ConversionException
-
MethodArgumentResolutionException
-
NoSuchMethodException
-
ClassCastException
因為這些例外狀況不太可能在重試的傳遞中解決。
您可以將更多例外狀況類型新增至不可重試的類別,或完全取代已分類例外狀況的對應。請參閱 DefaultAfterRollbackProcessor.setClassifications()
的 Javadocs 以取得更多資訊,以及 spring-retry
BinaryExceptionClassifier
的 Javadocs。
以下範例將 IllegalArgumentException
新增至不可重試的例外狀況
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
另請參閱 傳遞嘗試次數標頭。
使用目前的 kafka-clients ,容器無法偵測到 ProducerFencedException 是由重新平衡引起的,還是由生產者的 transactional.id 因逾時或到期而被撤銷引起的。因為在大多數情況下,它是由重新平衡引起的,所以容器不會呼叫 AfterRollbackProcessor (因為搜尋分割區是不合適的,因為我們不再被指派它們)。如果您確保逾時時間足夠大,可以處理每個交易並定期執行「空」交易(例如透過 ListenerContainerIdleEvent ),則可以避免因逾時和到期而造成 fencing。或者,您可以將 stopContainerWhenFenced 容器屬性設定為 true ,容器將會停止,避免紀錄遺失。您可以取用 ConsumerStoppedEvent 並檢查 Reason 屬性是否為 FENCED 以偵測此情況。由於事件也具有對容器的參考,因此您可以使用此事件重新啟動容器。 |
從 2.7 版開始,在等待 BackOff
間隔時,錯誤處理器將會迴圈短暫休眠,直到達到所需的延遲,同時檢查容器是否已停止,允許休眠在 stop()
後不久結束,而不是造成延遲。
從 2.7 版開始,可以使用一或多個 RetryListener
設定處理器,以接收重試和復原進度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
請參閱 JavaDocs 以取得更多資訊。
傳遞嘗試次數標頭
以下僅適用於紀錄監聽器,不適用於批次監聽器。
從 2.5 版開始,當使用實作 DeliveryAttemptAware
的 ErrorHandler
或 AfterRollbackProcessor
時,可以啟用將 KafkaHeaders.DELIVERY_ATTEMPT
標頭 (kafka_deliveryAttempt
) 新增至紀錄。此標頭的值是以 1 開始遞增的整數。當接收原始 ConsumerRecord<?, ?>
時,整數位於 byte[4]
中。
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
當搭配 DefaultKafkaHeaderMapper
或 SimpleKafkaHeaderMapper
使用 @KafkaListener
時,可以透過新增 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery
作為監聽器方法的參數來取得它。
若要啟用此標頭的填入,請將容器屬性 deliveryAttemptHeader
設定為 true
。預設情況下停用它,以避免為每個紀錄查閱狀態和新增標頭的(少量)額外負荷。
DefaultErrorHandler
和 DefaultAfterRollbackProcessor
支援此功能。
監聽器資訊標頭
在某些情況下,能夠知道監聽器在哪個容器中執行很有用。
從 2.8.4 版本開始,您現在可以在監聽器容器上設定 listenerInfo
屬性,或在 @KafkaListener
註解上設定 info
屬性。然後,容器會將此資訊新增到所有傳入訊息的 KafkaListener.LISTENER_INFO
標頭中;接著可以在記錄攔截器、過濾器等,或是在監聽器本身中使用它。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
當在 RecordInterceptor
或 RecordFilterStrategy
實作中使用時,標頭會以位元組陣列的形式存在於消費者記錄中,並使用 KafkaListenerAnnotationBeanPostProcessor
的 charSet
屬性進行轉換。
當從消費者記錄建立 MessageHeaders
時,標頭映射器也會轉換為 String
,並且永遠不會將此標頭映射到外送記錄。
對於 POJO 批次監聽器,從 2.8.6 版本開始,標頭會複製到批次的每個成員中,並且在轉換後也可以作為單一 String
參數使用。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批次監聽器具有過濾器,且過濾器導致空批次,您將需要在 @Header 參數中新增 required = false ,因為資訊不適用於空批次。 |
如果您收到 List<Message<Thing>>
,則資訊位於每個 Message<?>
的 KafkaHeaders.LISTENER_INFO
標頭中。
有關使用批次的更多資訊,請參閱批次監聽器。
發布死信記錄
當記錄達到最大失敗次數時,您可以透過記錄復原器配置 DefaultErrorHandler
和 DefaultAfterRollbackProcessor
。框架提供了 DeadLetterPublishingRecoverer
,它將失敗的訊息發布到另一個主題。復原器需要一個 KafkaTemplate<Object, Object>
,用於發送記錄。您也可以選擇性地使用 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
來配置它,該函數會被呼叫以解析目標主題和分割區。
預設情況下,死信記錄會被發送到名為 <originalTopic>.DLT 的主題(原始主題名稱後綴為 .DLT ),並發送到與原始記錄相同的分割區。因此,當您使用預設解析器時,死信主題必須至少具有與原始主題一樣多的分割區。 |
如果返回的 TopicPartition
具有負分割區,則分割區不會在 ProducerRecord
中設定,因此分割區由 Kafka 選擇。從 2.2.4 版本開始,任何 ListenerExecutionFailedException
(例如,在 @KafkaListener
方法中偵測到異常時拋出)都會增強 groupId
屬性。這允許目標解析器除了使用 ConsumerRecord
中的資訊外,還可以使用此屬性來選擇死信主題。
以下範例示範如何連接自訂目標解析器
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
發送到死信主題的記錄會使用以下標頭進行增強
-
KafkaHeaders.DLT_EXCEPTION_FQCN
:異常類別名稱(通常是ListenerExecutionFailedException
,但也可能是其他)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN
:異常原因類別名稱(如果存在,自 2.8 版本起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE
:異常堆疊追蹤。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE
:異常訊息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN
:異常類別名稱(僅限金鑰反序列化錯誤)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE
:異常堆疊追蹤(僅限金鑰反序列化錯誤)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE
:異常訊息(僅限金鑰反序列化錯誤)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC
:原始主題。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION
:原始分割區。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET
:原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP
:原始時間戳記。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE
:原始時間戳記類型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP
:未能處理記錄的原始消費者群組(自 2.8 版本起)。
金鑰異常僅由 DeserializationException
引起,因此沒有 DLT_KEY_EXCEPTION_CAUSE_FQCN
。
有兩種機制可以新增更多標頭。
-
子類別化復原器並覆寫
createProducerRecord()
- 呼叫super.createProducerRecord()
並新增更多標頭。 -
提供一個
BiFunction
以接收消費者記錄和異常,並返回一個Headers
物件;來自那裡的標頭將被複製到最終的生產者記錄;另請參閱 管理死信記錄標頭。使用setHeadersFunction()
來設定BiFunction
。
第二種方法更容易實作,但第一種方法具有更多可用的資訊,包括已組裝的標準標頭。
從 2.3 版本開始,當與 ErrorHandlingDeserializer
結合使用時,發布者將在死信生產者記錄中將記錄 value()
還原為原始的未反序列化值。先前,value()
為 null,使用者程式碼必須從訊息標頭解碼 DeserializationException
。此外,您可以為發布者提供多個 KafkaTemplate
;例如,如果您想發布來自 DeserializationException
的 byte[]
,以及使用與成功反序列化記錄不同的序列化器的值,則可能需要這樣做。以下範例示範如何使用使用 String
和 byte[]
序列化器的 KafkaTemplate
配置發布者
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
發布者使用映射鍵來尋找適用於即將發布的 value()
的範本。建議使用 LinkedHashMap
,以便按順序檢查鍵。
當發布 null
值,並且有多個範本時,復原器將尋找 Void
類別的範本;如果沒有範本,將使用來自 values().iterator()
的第一個範本。
自 2.7 版本起,您可以使用 setFailIfSendResultIsError
方法,以便在訊息發布失敗時拋出異常。您也可以使用 setWaitForSendResultTimeout
設定驗證發送者成功的逾時時間。
如果復原器失敗(拋出異常),則失敗的記錄將包含在搜尋中。從 2.5.5 版本開始,如果復原器失敗,預設情況下 BackOff 將會重置,並且重新傳遞將再次經歷退避,然後再嘗試復原。在較早的版本中,BackOff 不會重置,並且在下次失敗時會重新嘗試復原。若要恢復到先前的行為,請將錯誤處理器的 resetStateOnRecoveryFailure 屬性設定為 false 。 |
從 2.6.3 版開始,將 resetStateOnExceptionChange
設定為 true
,如果例外狀況類型在失敗之間變更,重試序列將會重新啟動(包括選取新的 BackOff
,如果已設定)。預設情況下,不會考慮例外狀況類型。
從 2.3 版本開始,復原器也可以與 Kafka Streams 一起使用 - 有關更多資訊,請參閱 從反序列化異常中恢復。
ErrorHandlingDeserializer
在標頭 ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER
和 ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER
中新增反序列化異常(使用 Java 序列化)。預設情況下,這些標頭不會保留在發布到死信主題的訊息中。從 2.7 版本開始,如果金鑰和值都反序列化失敗,則兩者的原始值都會填入發送到 DLT 的記錄中。
如果傳入的記錄彼此依賴,但可能亂序到達,則將失敗的記錄重新發布到原始主題的尾部(在一定次數內),而不是直接將其發送到死信主題可能很有用。有關範例,請參閱 此 Stack Overflow 問題。
以下錯誤處理器配置將完全做到這一點
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
從 2.7 版本開始,復原器會檢查目標解析器選擇的分割區是否實際存在。如果分割區不存在,則 ProducerRecord
中的分割區會設定為 null
,允許 KafkaProducer
選擇分割區。您可以透過將 verifyPartition
屬性設定為 false
來停用此檢查。
從 3.1 版本開始,將 logRecoveryRecord
屬性設定為 true
將記錄復原記錄和異常。
管理死信記錄標頭
-
appendOriginalHeaders
(預設值為true
) -
stripPreviousExceptionHeaders
(自 2.8 版本起,預設值為true
)
Apache Kafka 支援多個具有相同名稱的標頭;若要取得「最新」值,您可以使用 headers.lastHeader(headerName)
;若要取得多個標頭的迭代器,請使用 headers.headers(headerName).iterator()
。
當重複重新發布失敗的記錄時,這些標頭可能會增長(並最終導致發布因 RecordTooLargeException
而失敗);對於異常標頭,尤其是堆疊追蹤標頭而言,更是如此。
這兩個屬性的原因是,雖然您可能只想保留最後的異常資訊,但您可能想保留記錄通過每個失敗主題的歷史記錄。
appendOriginalHeaders
適用於所有名為 ORIGINAL
的標頭,而 stripPreviousExceptionHeaders
適用於所有名為 EXCEPTION
的標頭。
從 2.8.4 版本開始,您現在可以控制哪些標準標頭將新增到輸出記錄中。請參閱 enum HeadersToAdd
以取得預設新增的(目前)10 個標準標頭的通用名稱(這些不是實際的標頭名稱,只是一個抽象概念;實際的標頭名稱由 getHeaderNames()
方法設定,子類別可以覆寫該方法。
若要排除標頭,請使用 excludeHeaders()
方法;例如,若要抑制在標頭中新增異常堆疊追蹤,請使用
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以透過新增 ExceptionHeadersCreator
完全自訂異常標頭的添加;這也會停用所有標準異常標頭。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同樣從 2.8.4 版本開始,您現在可以透過 addHeadersFunction
方法提供多個標頭函數。即使另一個函數已經註冊,這也允許套用其他函數,例如,當使用 非阻塞重試 時。
ExponentialBackOffWithMaxRetries
實作
Spring Framework 提供了許多 BackOff
實作。預設情況下,ExponentialBackOff
將無限期地重試;若要在重試一定次數後放棄,需要計算 maxElapsedTime
。自 2.7.3 版本起,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries
,它是一個子類別,接收 maxRetries
屬性並自動計算 maxElapsedTime
,這更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
這將在 1、2、4、8、10、10
秒後重試,然後呼叫復原器。