Resilience: 從錯誤和 Broker 故障中恢復
Spring AMQP 提供的一些關鍵 (和最受歡迎的) 高階功能與協定錯誤或 Broker 故障時的復原和自動重新連線有關。我們已經在本指南中看過所有相關元件,但將它們全部放在這裡並個別說明功能和復原情境應該會有幫助。
主要的重新連線功能由 CachingConnectionFactory 本身啟用。使用 RabbitAdmin 自動宣告功能通常也很有益。此外,如果您關心有保證的傳遞,您可能還需要在 RabbitTemplate 和 SimpleMessageListenerContainer 中使用 channelTransacted 旗標,並在 SimpleMessageListenerContainer 中使用 AcknowledgeMode.AUTO (如果您自己進行 ACK 則為手動)。
自動宣告交換器、佇列和綁定
RabbitAdmin 元件可以在啟動時宣告交換器、佇列和綁定。它透過 ConnectionListener 延遲執行此操作。因此,如果 Broker 在啟動時不存在,則沒有關係。第一次使用 Connection (例如,透過發送訊息) 時,監聽器會觸發並套用管理功能。在監聽器中執行自動宣告的另一個好處是,如果連線因任何原因 (例如,Broker 故障、網路故障等) 而中斷,則在重新建立連線時會再次套用它們。
以這種方式宣告的佇列必須具有固定的名稱 — 無論是明確宣告還是由框架為 AnonymousQueue 實例產生。匿名佇列是非持久的、獨佔的和自動刪除的。 |
僅當 CachingConnectionFactory 快取模式為 CHANNEL (預設值) 時,才會執行自動宣告。存在此限制是因為獨佔和自動刪除佇列綁定到連線。 |
從 2.2.2 版開始,RabbitAdmin 將偵測 DeclarableCustomizer 類型的 Bean,並在實際處理宣告之前套用該函數。例如,這對於在框架內獲得一流支援之前設定新引數 (屬性) 非常有用。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
這在不提供直接存取 Declarable Bean 定義的專案中也很有用。
請參閱 RabbitMQ 自動連線/拓撲復原。
同步操作中的故障和重試選項
如果在使用 RabbitTemplate (例如) 時在同步序列中遺失與 Broker 的連線,Spring AMQP 會擲回 AmqpException (通常但不總是 AmqpIOException)。我們不會試圖隱藏存在問題的事實,因此您必須能夠捕獲並回應例外。如果您懷疑連線已遺失 (且不是您的錯),最簡單的方法是再次嘗試操作。您可以手動執行此操作,也可以考慮使用 Spring Retry 來處理重試 (命令式或宣告式)。
Spring Retry 提供了一些 AOP 攔截器,並且在指定重試參數 (嘗試次數、例外類型、退避演算法等) 方面具有很大的彈性。Spring AMQP 也提供了一些便利的工廠 Bean,用於以方便 AMQP 用例的形式建立 Spring Retry 攔截器,並提供強型別的回呼介面,您可以使用這些介面來實作自訂復原邏輯。如需更多詳細資訊,請參閱 StatefulRetryOperationsInterceptor 和 StatelessRetryOperationsInterceptor 的 Javadoc 和屬性。如果沒有交易,或者在重試回呼中啟動交易,則無狀態重試是合適的。請注意,無狀態重試比有狀態重試更容易設定和分析,但如果存在必須回滾或肯定要回滾的正在進行的交易,則通常不合適。交易中間的連線中斷應具有與回滾相同的效果。因此,對於交易在堆疊中較高位置啟動的重新連線,有狀態重試通常是最佳選擇。有狀態重試需要一種機制來唯一識別訊息。最簡單的方法是讓發送者在 MessageId 訊息屬性中放入唯一值。提供的訊息轉換器提供了一種執行此操作的選項:您可以將 createMessageIds 設定為 true。否則,您可以將 MessageKeyGenerator 實作注入到攔截器中。金鑰產生器必須為每條訊息傳回唯一金鑰。在 2.0 版之前的版本中,提供了 MissingMessageIdAdvice。它使沒有 messageId 屬性的訊息可以完全重試一次 (忽略重試設定)。由於 spring-retry 1.2 版已將其功能內建到攔截器和訊息監聽器容器中,因此不再提供此建議。
為了向後相容性,預設情況下 (在一次重試後),訊息 ID 為 Null 的訊息被認為對消費者是致命的 (消費者已停止)。若要複製 MissingMessageIdAdvice 提供的功能,您可以將監聽器容器上的 statefulRetryFatalWithNullMessageId 屬性設定為 false。使用該設定,消費者會繼續執行,並且訊息會被拒絕 (在一次重試後)。它會被丟棄或路由到死信佇列 (如果已設定)。 |
從 1.3 版開始,提供了一個建構器 API,以協助使用 Java (在 @Configuration 類別中) 組裝這些攔截器。以下範例示範了如何執行此操作
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只有重試功能的一個子集可以透過這種方式設定。更進階的功能需要將 RetryTemplate 設定為 Spring Bean。如需有關可用策略及其設定的完整資訊,請參閱 Spring Retry Javadoc。
使用批次監聽器重試
不建議使用批次監聽器設定重試,除非批次是由生產者在單一記錄中建立的。如需有關消費者和生產者建立的批次的資訊,請參閱 批次訊息。對於消費者建立的批次,框架不知道批次中哪個訊息導致故障,因此在重試耗盡後無法復原。對於生產者建立的批次,由於只有一條訊息實際失敗,因此可以復原整條訊息。應用程式可能想要告知自訂復原器批次中故障發生的位置,例如透過設定擲回例外狀況的索引屬性。
批次監聽器的重試復原器必須實作 MessageBatchRecoverer
。
訊息監聽器和非同步案例
如果 MessageListener 因業務例外狀況而失敗,則例外狀況由訊息監聽器容器處理,然後容器會返回監聽另一個訊息。如果故障是由連線中斷 (而非業務例外狀況) 引起的,則必須取消並重新啟動為監聽器收集訊息的消費者。SimpleMessageListenerContainer 可以無縫地處理此問題,並留下日誌以說明正在重新啟動監聽器。實際上,它會無限循環,嘗試重新啟動消費者。只有在消費者行為非常不佳時,它才會放棄。一個副作用是,如果容器啟動時 Broker 關閉,它會持續嘗試,直到可以建立連線為止。
與協定錯誤和連線中斷相反,業務例外狀況處理可能需要更多考慮和一些自訂設定,尤其是在使用交易或容器 ACK 時。在 2.8.x 之前的版本中,RabbitMQ 沒有死信行為的定義。因此,依預設,由於業務例外狀況而被拒絕或回滾的訊息可以無限期地重新傳遞。若要限制用戶端重新傳遞的次數,一種選擇是在監聽器的建議鏈中使用 StatefulRetryOperationsInterceptor
。攔截器可以具有實作自訂死信動作的復原回呼 — 無論哪種動作適合您的特定環境。
另一種替代方案是將容器的 defaultRequeueRejected
屬性設定為 false
。這會導致所有失敗的訊息都被丟棄。當使用 RabbitMQ 2.8.x 或更高版本時,這也有助於將訊息傳遞到死信交換器。
或者,您可以擲回 AmqpRejectAndDontRequeueException
。這樣做可以防止訊息重新排隊,無論 defaultRequeueRejected
屬性的設定為何。
從 2.1 版開始,引入了 ImmediateRequeueAmqpException
以執行完全相反的邏輯:訊息將被重新排隊,無論 defaultRequeueRejected
屬性的設定為何。
通常,會使用兩種技術的組合。您可以在建議鏈中使用 StatefulRetryOperationsInterceptor
,並使用擲回 AmqpRejectAndDontRequeueException
的 MessageRecoverer
。當所有重試都已耗盡時,會呼叫 MessageRecover
。RejectAndDontRequeueRecoverer
正是這樣做的。預設 MessageRecoverer
會消耗錯誤訊息並發出 WARN
訊息。
從 1.3 版開始,提供了一個新的 RepublishMessageRecoverer
,以允許在重試耗盡後發布失敗的訊息。
當復原器消耗最終例外狀況時,訊息會被 ACK,並且如果已設定,則不會由 Broker 發送到死信交換器。
當在消費者端使用 RepublishMessageRecoverer 時,接收到的訊息在 receivedDeliveryMode 訊息屬性中具有 deliveryMode 。在這種情況下,deliveryMode 為 Null。這表示 Broker 上的 NON_PERSISTENT 傳遞模式。從 2.0 版開始,您可以針對 deliveryMode 設定 RepublishMessageRecoverer ,以在 deliveryMode 為 Null 時設定到要重新發布的訊息中。依預設,它使用 MessageProperties 預設值 - MessageDeliveryMode.PERSISTENT 。 |
以下範例示範如何將 RepublishMessageRecoverer
設定為復原器
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
RepublishMessageRecoverer
會在訊息標頭中發布訊息,其中包含額外資訊,例如例外狀況訊息、堆疊追蹤、原始交換器和路由金鑰。可以透過建立子類別並覆寫 additionalHeaders()
來新增額外標頭。deliveryMode
(或任何其他屬性) 也可以在 additionalHeaders()
中變更,如下列範例所示
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
從 2.0.5 版開始,如果堆疊追蹤太大,可能會被截斷;這是因為所有標頭都必須適合單一框架。依預設,如果堆疊追蹤導致其他標頭的可用空間少於 20,000 位元組 ('headroom'),則會被截斷。如果您需要更多或更少的空間用於其他標頭,則可以透過設定復原器的 frameMaxHeadroom
屬性來調整此值。從 2.1.13、2.2.3 版開始,例外狀況訊息包含在此計算中,並且將使用以下演算法最大化堆疊追蹤量
-
如果僅堆疊追蹤就超出限制,則例外狀況訊息標頭將被截斷為 97 個位元組加上 …,並且堆疊追蹤也會被截斷。
-
如果堆疊追蹤很小,則訊息將被截斷 (加上 …) 以適應可用位元組 (但堆疊追蹤本身內的訊息將被截斷為 97 個位元組加上 …)。
每當發生任何類型的截斷時,都會記錄原始例外狀況以保留完整資訊。評估會在增強標頭後執行,因此可以在運算式中使用例外狀況類型等資訊。
從 2.4.8 版開始,錯誤交換器和路由金鑰可以作為 SpEL 運算式提供,其中 Message
是評估的根物件。
從 2.3.3 版開始,提供了一個新的子類別 RepublishMessageRecovererWithConfirms
;這支援發布者確認的兩種樣式,並且會在傳回之前等待確認 (如果未確認或訊息已傳回,則擲回例外狀況)。
如果確認類型為 CORRELATED
,則子類別也會偵測訊息是否已傳回並擲回 AmqpMessageReturnedException
;如果發布被否定確認,則會擲回 AmqpNackReceivedException
。
如果確認類型為 SIMPLE
,則子類別將在通道上叫用 waitForConfirmsOrDie
方法。
請參閱 發布者確認和傳回 以取得有關確認和傳回的更多資訊。
從 2.1 版開始,新增了 ImmediateRequeueMessageRecoverer
以擲回 ImmediateRequeueAmqpException
,這會通知監聽器容器重新排隊目前的失敗訊息。
Spring Retry 的例外狀況分類
Spring Retry 在判斷哪些例外狀況可以叫用重試方面具有很大的彈性。預設設定會重試所有例外狀況。鑑於使用者例外狀況包裝在 ListenerExecutionFailedException
中,我們需要確保分類檢查例外狀況原因。預設分類器僅查看最上層例外狀況。
自 Spring Retry 1.0.3 以來,BinaryExceptionClassifier
具有一個名為 traverseCauses
的屬性 (預設值:false
)。當為 true
時,它會遍歷例外狀況原因,直到找到符合項或沒有原因為止。
若要將此分類器用於重試,您可以使用使用建構函式建立的 SimpleRetryPolicy
,該建構函式接受最大嘗試次數、Exception
實例的 Map
和布林值 (traverseCauses
),並將此策略注入到 RetryTemplate
中。