提供的 Advice 類別
除了提供應用 AOP advice 類別的一般機制外,Spring Integration 還提供以下現成的 advice 實作
-
RequestHandlerRetryAdvice
(在 重試 Advice 中描述) -
RequestHandlerCircuitBreakerAdvice
(在 斷路器 Advice 中描述) -
ExpressionEvaluatingRequestHandlerAdvice
(在 運算式 Advice 中描述) -
RateLimiterRequestHandlerAdvice
(在 速率限制器 Advice 中描述) -
CacheRequestHandlerAdvice
(在 快取 Advice 中描述) -
ReactiveRequestHandlerAdvice
(在 反應式 Advice 中描述) -
ContextHolderRequestHandlerAdvice
(在 Context Holder Advice 中描述)
重試 Advice
重試 advice (o.s.i.handler.advice.RequestHandlerRetryAdvice
) 利用 Spring Retry 專案提供的豐富重試機制。spring-retry
的核心元件是 RetryTemplate
,它允許設定複雜的重試情境,包括 RetryPolicy
和 BackoffPolicy
策略 (具有多種實作),以及 RecoveryCallback
策略,以決定在重試耗盡時要採取的動作。
- 無狀態重試
-
無狀態重試是指重試活動完全在 advice 內處理的情況。執行緒暫停 (如果設定為暫停),然後重試動作。
- 具狀態重試
-
具狀態重試是指重試狀態在 advice 內管理,但會擲回例外,且呼叫者重新提交請求的情況。具狀態重試的一個範例是,當我們希望訊息發起者 (例如 JMS) 負責重新提交,而不是在目前的執行緒上執行時。具狀態重試需要一些機制來偵測重新嘗試的提交。
如需 spring-retry
的詳細資訊,請參閱專案的 Javadoc 和 Spring Batch 的參考文件,spring-retry
起源於此。
預設的回退行為是不回退。會立即嘗試重試。使用會導致執行緒在嘗試之間暫停的回退策略可能會導致效能問題,包括過多的記憶體使用和執行緒飢餓。在高容量環境中,應謹慎使用回退策略。 |
設定重試 Advice
本節中的範例使用以下 <service-activator>
,它總是擲回例外
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 簡單無狀態重試
-
預設的
RetryTemplate
具有SimpleRetryPolicy
,它會嘗試三次。沒有BackOffPolicy
,因此這三次嘗試會背靠背地進行,嘗試之間沒有延遲。沒有RecoveryCallback
,因此結果是在最後一次重試失敗後,將例外擲回給呼叫者。在 Spring Integration 環境中,可以使用輸入端點上的error-channel
來處理此最終例外。以下範例使用RetryTemplate
並顯示其DEBUG
輸出<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- 具有復原的簡單無狀態重試
-
以下範例將
RecoveryCallback
新增至先前的範例,並使用ErrorMessageSendingRecoverer
將ErrorMessage
傳送至通道<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- 具有自訂策略和復原的無狀態重試
-
為了更複雜,我們可以為 advice 提供自訂的
RetryTemplate
。此範例繼續使用SimpleRetryPolicy
,但將嘗試次數增加到四次。它還新增了ExponentialBackoffPolicy
,其中第一次重試等待一秒,第二次等待五秒,第三次等待 25 秒 (總共四次嘗試)。以下清單顯示了範例及其DEBUG
輸出<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- 無狀態重試的命名空間支援
-
從 4.0 版開始,由於重試 advice 的命名空間支援,先前的設定可以大大簡化,如下列範例所示
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
在先前的範例中,advice 被定義為頂層 bean,以便可以在多個
request-handler-advice-chain
實例中使用。您也可以直接在鏈中定義 advice,如下列範例所示<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
<handler-retry-advice>
可以具有<fixed-back-off>
或<exponential-back-off>
子元素,或沒有子元素。沒有子元素的<handler-retry-advice>
不使用回退。如果沒有recovery-channel
,則在重試耗盡時會擲回例外。命名空間只能與無狀態重試一起使用。對於更複雜的環境 (自訂策略等),請使用正常的
<bean>
定義。 - 具有復原的簡單具狀態重試
-
為了使重試成為具狀態,我們需要為 advice 提供
RetryStateGenerator
實作。此類別用於將訊息識別為重新提交,以便RetryTemplate
可以判斷此訊息的目前重試狀態。框架提供了一個SpelExpressionRetryStateGenerator
,它透過使用 SpEL 運算式來判斷訊息識別碼。此範例再次使用預設策略 (三次嘗試,沒有回退)。與無狀態重試一樣,這些策略可以自訂。以下清單顯示了範例及其DEBUG
輸出<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
如果您將先前的範例與無狀態範例進行比較,您可以看到,對於具狀態重試,例外會在每次失敗時擲回給呼叫者。
- 重試的例外分類
-
Spring Retry 在判斷哪些例外可以調用重試方面具有很大的彈性。預設設定會針對所有例外進行重試,而例外分類器會查看最上層的例外。如果您將其設定為,例如,僅在
MyException
上重試,而您的應用程式擲回SomeOtherException
,其中原因是MyException
,則不會發生重試。由於 Spring Retry 1.0.3,
BinaryExceptionClassifier
具有一個名為traverseCauses
的屬性 (預設為false
)。當true
時,它會遍歷例外原因,直到找到匹配項或遍歷完所有原因。若要將此分類器用於重試,請使用使用建構函式建立的
SimpleRetryPolicy
,該建構函式接受最大嘗試次數、Exception
物件的Map
和traverseCauses
布林值。然後,您可以將此策略注入到RetryTemplate
中。
在這種情況下,需要 traverseCauses ,因為使用者例外可能會包裝在 MessagingException 中。 |
斷路器 Advice
斷路器模式的一般概念是,如果服務目前不可用,請不要浪費時間 (和資源) 嘗試使用它。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
實作了此模式。當斷路器處於關閉狀態時,端點會嘗試調用服務。如果一定數量的連續嘗試失敗,斷路器會進入開啟狀態。當它處於開啟狀態時,新的請求會「快速失敗」,並且在經過一段時間之前,不會嘗試調用服務。
當該時間到期時,斷路器會設定為半開啟狀態。當處於此狀態時,即使單次嘗試失敗,斷路器也會立即進入開啟狀態。如果嘗試成功,斷路器會進入關閉狀態,在這種情況下,除非再次發生設定數量的連續失敗,否則它不會再次進入開啟狀態。任何成功的嘗試都會將狀態重設為零失敗,以判斷斷路器何時可能再次進入開啟狀態。
通常,此 advice 可能用於外部服務,在這些服務中,可能需要一些時間才能失敗 (例如,嘗試建立網路連線時逾時)。
RequestHandlerCircuitBreakerAdvice
具有兩個屬性:threshold
和 halfOpenAfter
。threshold
屬性代表在斷路器進入開啟狀態之前需要發生的連續失敗次數。預設值為 5
。halfOpenAfter
屬性代表斷路器在上次失敗後等待多長時間,然後再嘗試另一個請求。預設值為 1000 毫秒。
以下範例設定了斷路器,並顯示其 DEBUG
和 ERROR
輸出
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
在先前的範例中,閾值設定為 2
,halfOpenAfter
設定為 12
秒。每 5 秒鐘會有一個新請求到達。前兩次嘗試調用了服務。第三次和第四次失敗,並顯示例外,指示斷路器已開啟。第五次請求嘗試是因為該請求是在上次失敗後 15 秒發出的。第六次嘗試立即失敗,因為斷路器立即進入開啟狀態。
運算式評估 Advice
最後提供的 advice 類別是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
。此 advice 比其他兩個 advice 更通用。它提供了一種機制,用於評估傳送到端點的原始輸入訊息上的運算式。在成功或失敗之後,可以使用單獨的運算式進行評估。或者,可以將包含評估結果以及輸入訊息的訊息傳送到訊息通道。
此 advice 的典型用例可能是在 <ftp:outbound-channel-adapter/>
中,例如,如果傳輸成功,則將檔案移動到一個目錄,如果失敗,則移動到另一個目錄
此 advice 具有在成功時設定運算式、在失敗時設定運算式以及每個運算式的對應通道的屬性。對於成功的情況,傳送到 successChannel
的訊息是 AdviceMessage
,其 payload 是運算式評估的結果。名為 inputMessage
的其他屬性包含傳送到處理器的原始訊息。傳送到 failureChannel
的訊息 (當處理器擲回例外時) 是 ErrorMessage
,其 payload 為 MessageHandlingExpressionEvaluatingAdviceException
。與所有 MessagingException
實例一樣,此 payload 具有 failedMessage
和 cause
屬性,以及一個名為 evaluationResult
的其他屬性,其中包含運算式評估的結果。
從 5.1.3 版開始,如果設定了通道,但未提供運算式,則預設運算式會用於評估訊息的 payload 。 |
當在 advice 的範圍內擲回例外時,預設情況下,該例外會在評估任何 failureExpression
之後擲回給呼叫者。如果您希望抑制擲回例外,請將 trapException
屬性設定為 true
。以下 advice 顯示如何使用 Java DSL 設定 advice
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
速率限制器 Advice
速率限制器 advice (RateLimiterRequestHandlerAdvice
) 允許確保端點不會因請求過載。當超出速率限制時,請求將進入封鎖狀態。
此 advice 的典型用例可能是外部服務提供者不允許每分鐘超過 n
個請求。
RateLimiterRequestHandlerAdvice
實作完全基於 Resilience4j 專案,並且需要 RateLimiter
或 RateLimiterConfig
注入。也可以使用預設值和/或自訂名稱進行設定。
以下範例設定了速率限制器 advice,每 1 秒一個請求
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
快取 Advice
從 5.2 版開始,引入了 CacheRequestHandlerAdvice
。它基於 Spring Framework 中的快取抽象,並與 @Caching
註解系列提供的概念和功能對齊。內部邏輯基於 CacheAspectSupport
擴充功能,其中快取操作的代理是在 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
方法周圍完成的,請求 Message<?>
作為引數。此 advice 可以使用 SpEL 運算式或 Function
來評估快取金鑰。請求 Message<?>
可作為 SpEL 評估內容的根物件,或作為 Function
輸入引數。依預設,請求訊息的 payload
用於快取金鑰。當預設快取操作為 CacheableOperation
時,CacheRequestHandlerAdvice
必須使用 cacheNames
進行設定,或使用一組任意 CacheOperation
s 進行設定。每個 CacheOperation
都可以單獨設定,或具有共用選項,例如 CacheManager
、CacheResolver
和 CacheErrorHandler
,可以從 CacheRequestHandlerAdvice
設定中重複使用。此設定功能類似於 Spring Framework 的 @CacheConfig
和 @Caching
註解組合。如果未提供 CacheManager
,則依預設會從 CacheAspectSupport
中的 BeanFactory
解析單一 bean。
以下範例設定了兩個具有不同快取操作集的 advice
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}