提供的 Advice 類別

除了提供應用 AOP advice 類別的一般機制外,Spring Integration 還提供以下現成的 advice 實作

重試 Advice

重試 advice (o.s.i.handler.advice.RequestHandlerRetryAdvice) 利用 Spring Retry 專案提供的豐富重試機制。spring-retry 的核心元件是 RetryTemplate,它允許設定複雜的重試情境,包括 RetryPolicyBackoffPolicy 策略 (具有多種實作),以及 RecoveryCallback 策略,以決定在重試耗盡時要採取的動作。

無狀態重試

無狀態重試是指重試活動完全在 advice 內處理的情況。執行緒暫停 (如果設定為暫停),然後重試動作。

具狀態重試

具狀態重試是指重試狀態在 advice 內管理,但會擲回例外,且呼叫者重新提交請求的情況。具狀態重試的一個範例是,當我們希望訊息發起者 (例如 JMS) 負責重新提交,而不是在目前的執行緒上執行時。具狀態重試需要一些機制來偵測重新嘗試的提交。

如需 spring-retry 的詳細資訊,請參閱專案的 JavadocSpring Batch 的參考文件,spring-retry 起源於此。

預設的回退行為是不回退。會立即嘗試重試。使用會導致執行緒在嘗試之間暫停的回退策略可能會導致效能問題,包括過多的記憶體使用和執行緒飢餓。在高容量環境中,應謹慎使用回退策略。

設定重試 Advice

本節中的範例使用以下 <service-activator>,它總是擲回例外

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}
簡單無狀態重試

預設的 RetryTemplate 具有 SimpleRetryPolicy,它會嘗試三次。沒有 BackOffPolicy,因此這三次嘗試會背靠背地進行,嘗試之間沒有延遲。沒有 RecoveryCallback,因此結果是在最後一次重試失敗後,將例外擲回給呼叫者。在 Spring Integration 環境中,可以使用輸入端點上的 error-channel 來處理此最終例外。以下範例使用 RetryTemplate 並顯示其 DEBUG 輸出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
具有復原的簡單無狀態重試

以下範例將 RecoveryCallback 新增至先前的範例,並使用 ErrorMessageSendingRecovererErrorMessage 傳送至通道

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
具有自訂策略和復原的無狀態重試

為了更複雜,我們可以為 advice 提供自訂的 RetryTemplate。此範例繼續使用 SimpleRetryPolicy,但將嘗試次數增加到四次。它還新增了 ExponentialBackoffPolicy,其中第一次重試等待一秒,第二次等待五秒,第三次等待 25 秒 (總共四次嘗試)。以下清單顯示了範例及其 DEBUG 輸出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
            <property name="retryTemplate" ref="retryTemplate" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="4" />
        </bean>
    </property>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="1000" />
            <property name="multiplier" value="5.0" />
            <property name="maxInterval" value="60000" />
        </bean>
    </property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
無狀態重試的命名空間支援

從 4.0 版開始,由於重試 advice 的命名空間支援,先前的設定可以大大簡化,如下列範例所示

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

在先前的範例中,advice 被定義為頂層 bean,以便可以在多個 request-handler-advice-chain 實例中使用。您也可以直接在鏈中定義 advice,如下列範例所示

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

<handler-retry-advice> 可以具有 <fixed-back-off><exponential-back-off> 子元素,或沒有子元素。沒有子元素的 <handler-retry-advice> 不使用回退。如果沒有 recovery-channel,則在重試耗盡時會擲回例外。命名空間只能與無狀態重試一起使用。

對於更複雜的環境 (自訂策略等),請使用正常的 <bean> 定義。

具有復原的簡單具狀態重試

為了使重試成為具狀態,我們需要為 advice 提供 RetryStateGenerator 實作。此類別用於將訊息識別為重新提交,以便 RetryTemplate 可以判斷此訊息的目前重試狀態。框架提供了一個 SpelExpressionRetryStateGenerator,它透過使用 SpEL 運算式來判斷訊息識別碼。此範例再次使用預設策略 (三次嘗試,沒有回退)。與無狀態重試一樣,這些策略可以自訂。以下清單顯示了範例及其 DEBUG 輸出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="retryStateGenerator">
                <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
                    <constructor-arg value="headers['jms_messageId']" />
                </bean>
            </property>
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

如果您將先前的範例與無狀態範例進行比較,您可以看到,對於具狀態重試,例外會在每次失敗時擲回給呼叫者。

重試的例外分類

Spring Retry 在判斷哪些例外可以調用重試方面具有很大的彈性。預設設定會針對所有例外進行重試,而例外分類器會查看最上層的例外。如果您將其設定為,例如,僅在 MyException 上重試,而您的應用程式擲回 SomeOtherException,其中原因是 MyException,則不會發生重試。

由於 Spring Retry 1.0.3,BinaryExceptionClassifier 具有一個名為 traverseCauses 的屬性 (預設為 false)。當 true 時,它會遍歷例外原因,直到找到匹配項或遍歷完所有原因。

若要將此分類器用於重試,請使用使用建構函式建立的 SimpleRetryPolicy,該建構函式接受最大嘗試次數、Exception 物件的 MaptraverseCauses 布林值。然後,您可以將此策略注入到 RetryTemplate 中。

在這種情況下,需要 traverseCauses,因為使用者例外可能會包裝在 MessagingException 中。

斷路器 Advice

斷路器模式的一般概念是,如果服務目前不可用,請不要浪費時間 (和資源) 嘗試使用它。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice 實作了此模式。當斷路器處於關閉狀態時,端點會嘗試調用服務。如果一定數量的連續嘗試失敗,斷路器會進入開啟狀態。當它處於開啟狀態時,新的請求會「快速失敗」,並且在經過一段時間之前,不會嘗試調用服務。

當該時間到期時,斷路器會設定為半開啟狀態。當處於此狀態時,即使單次嘗試失敗,斷路器也會立即進入開啟狀態。如果嘗試成功,斷路器會進入關閉狀態,在這種情況下,除非再次發生設定數量的連續失敗,否則它不會再次進入開啟狀態。任何成功的嘗試都會將狀態重設為零失敗,以判斷斷路器何時可能再次進入開啟狀態。

通常,此 advice 可能用於外部服務,在這些服務中,可能需要一些時間才能失敗 (例如,嘗試建立網路連線時逾時)。

RequestHandlerCircuitBreakerAdvice 具有兩個屬性:thresholdhalfOpenAfterthreshold 屬性代表在斷路器進入開啟狀態之前需要發生的連續失敗次數。預設值為 5halfOpenAfter 屬性代表斷路器在上次失敗後等待多長時間,然後再嘗試另一個請求。預設值為 1000 毫秒。

以下範例設定了斷路器,並顯示其 DEBUGERROR 輸出

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

在先前的範例中,閾值設定為 2halfOpenAfter 設定為 12 秒。每 5 秒鐘會有一個新請求到達。前兩次嘗試調用了服務。第三次和第四次失敗,並顯示例外,指示斷路器已開啟。第五次請求嘗試是因為該請求是在上次失敗後 15 秒發出的。第六次嘗試立即失敗,因為斷路器立即進入開啟狀態。

運算式評估 Advice

最後提供的 advice 類別是 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。此 advice 比其他兩個 advice 更通用。它提供了一種機制,用於評估傳送到端點的原始輸入訊息上的運算式。在成功或失敗之後,可以使用單獨的運算式進行評估。或者,可以將包含評估結果以及輸入訊息的訊息傳送到訊息通道。

此 advice 的典型用例可能是在 <ftp:outbound-channel-adapter/> 中,例如,如果傳輸成功,則將檔案移動到一個目錄,如果失敗,則移動到另一個目錄

此 advice 具有在成功時設定運算式、在失敗時設定運算式以及每個運算式的對應通道的屬性。對於成功的情況,傳送到 successChannel 的訊息是 AdviceMessage,其 payload 是運算式評估的結果。名為 inputMessage 的其他屬性包含傳送到處理器的原始訊息。傳送到 failureChannel 的訊息 (當處理器擲回例外時) 是 ErrorMessage,其 payload 為 MessageHandlingExpressionEvaluatingAdviceException。與所有 MessagingException 實例一樣,此 payload 具有 failedMessagecause 屬性,以及一個名為 evaluationResult 的其他屬性,其中包含運算式評估的結果。

從 5.1.3 版開始,如果設定了通道,但未提供運算式,則預設運算式會用於評估訊息的 payload

當在 advice 的範圍內擲回例外時,預設情況下,該例外會在評估任何 failureExpression 之後擲回給呼叫者。如果您希望抑制擲回例外,請將 trapException 屬性設定為 true。以下 advice 顯示如何使用 Java DSL 設定 advice

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.<String>handle((payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}

速率限制器 Advice

速率限制器 advice (RateLimiterRequestHandlerAdvice) 允許確保端點不會因請求過載。當超出速率限制時,請求將進入封鎖狀態。

此 advice 的典型用例可能是外部服務提供者不允許每分鐘超過 n 個請求。

RateLimiterRequestHandlerAdvice 實作完全基於 Resilience4j 專案,並且需要 RateLimiterRateLimiterConfig 注入。也可以使用預設值和/或自訂名稱進行設定。

以下範例設定了速率限制器 advice,每 1 秒一個請求

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}

快取 Advice

從 5.2 版開始,引入了 CacheRequestHandlerAdvice。它基於 Spring Framework 中的快取抽象,並與 @Caching 註解系列提供的概念和功能對齊。內部邏輯基於 CacheAspectSupport 擴充功能,其中快取操作的代理是在 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage 方法周圍完成的,請求 Message<?> 作為引數。此 advice 可以使用 SpEL 運算式或 Function 來評估快取金鑰。請求 Message<?> 可作為 SpEL 評估內容的根物件,或作為 Function 輸入引數。依預設,請求訊息的 payload 用於快取金鑰。當預設快取操作為 CacheableOperation 時,CacheRequestHandlerAdvice 必須使用 cacheNames 進行設定,或使用一組任意 CacheOperation s 進行設定。每個 CacheOperation 都可以單獨設定,或具有共用選項,例如 CacheManagerCacheResolverCacheErrorHandler,可以從 CacheRequestHandlerAdvice 設定中重複使用。此設定功能類似於 Spring Framework 的 @CacheConfig@Caching 註解組合。如果未提供 CacheManager,則依預設會從 CacheAspectSupport 中的 BeanFactory 解析單一 bean。

以下範例設定了兩個具有不同快取操作集的 advice

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}