Repeat

RepeatTemplate

批次處理是有關重複動作,無論是作為簡單的優化還是作為 Job 的一部分。為了規劃和概括重複,並提供相當於迭代器框架的功能,Spring Batch 具有 RepeatOperations 介面。RepeatOperations 介面具有以下定義

public interface RepeatOperations {

    RepeatStatus iterate(RepeatCallback callback) throws RepeatException;

}

回呼是一個介面,如下列定義所示,可讓您插入一些要重複的業務邏輯

public interface RepeatCallback {

    RepeatStatus doInIteration(RepeatContext context) throws Exception;

}

重複執行回呼,直到實作判斷迭代應結束為止。這些介面中的傳回值是一個列舉值,可以是 RepeatStatus.CONTINUABLERepeatStatus.FINISHEDRepeatStatus 列舉會向重複操作的呼叫者傳達有關是否還有任何工作剩餘的資訊。一般來說,RepeatOperations 的實作應檢查 RepeatStatus 並將其用作結束迭代的決策的一部分。任何希望向呼叫者發出訊號,表示沒有工作剩餘的回呼都可以傳回 RepeatStatus.FINISHED

RepeatOperations 最簡單的通用實作是 RepeatTemplate

RepeatTemplate template = new RepeatTemplate();

template.setCompletionPolicy(new SimpleCompletionPolicy(2));

template.iterate(new RepeatCallback() {

    public RepeatStatus doInIteration(RepeatContext context) {
        // Do stuff in batch...
        return RepeatStatus.CONTINUABLE;
    }

});

在先前的範例中,我們傳回 RepeatStatus.CONTINUABLE,以表示還有更多工作要做。回呼也可以傳回 RepeatStatus.FINISHED,以向呼叫者發出訊號,表示沒有工作剩餘。某些迭代可以通過回呼中完成的工作的本質考量來終止。其他迭代實際上是無限迴圈(就回呼而言),並且完成決策委託給外部策略,如先前的範例所示。

RepeatContext

RepeatCallback 的方法參數是 RepeatContext。許多回呼會忽略 context。但是,如有必要,您可以將其用作屬性包,以儲存迭代期間的暫時資料。在 iterate 方法傳回後,context 將不再存在。

如果正在進行巢狀迭代,則 RepeatContext 具有父 context。父 context 有時可用於儲存需要在 iterate 呼叫之間共享的資料。例如,如果您想要計算迭代中事件的發生次數並在後續呼叫中記住它,情況就是如此。

RepeatStatus

RepeatStatus 是 Spring Batch 使用的列舉,用於指示處理是否已完成。它有兩個可能的 RepeatStatus

表 1. RepeatStatus 屬性

描述

CONTINUABLE

還有更多工作要做。

FINISHED

不應再進行重複。

您可以透過使用 RepeatStatus 中的 and() 方法,將 RepeatStatus 值與邏輯 AND 運算結合使用。這樣做的效果是對 continuable 旗標執行邏輯 AND 運算。換句話說,如果任一狀態為 FINISHED,則結果為 FINISHED

Completion Policies

RepeatTemplate 內部,iterate 方法中迴圈的終止由 CompletionPolicy 決定,CompletionPolicy 也是 RepeatContext 的 factory。RepeatTemplate 的責任是在迭代的每個階段使用目前的 policy 建立 RepeatContext 並將其傳遞給 RepeatCallback。在回呼完成其 doInIteration 後,RepeatTemplate 必須呼叫 CompletionPolicy,要求它更新其狀態(將儲存在 RepeatContext 中)。然後它會詢問 policy 迭代是否已完成。

Spring Batch 提供了一些 CompletionPolicy 的簡單通用實作。SimpleCompletionPolicy 允許執行固定次數(使用 RepeatStatus.FINISHED 強制隨時提早完成)。

使用者可能需要實作自己的 completion policies 以進行更複雜的決策。例如,一旦線上系統開始使用,就阻止批次 Job 執行的批次處理視窗將需要自訂 policy。

例外處理

如果在 RepeatCallback 內部擲回例外,則 RepeatTemplate 會諮詢 ExceptionHandler,後者可以決定是否重新擲回例外。

以下清單顯示 ExceptionHandler 介面定義

public interface ExceptionHandler {

    void handleException(RepeatContext context, Throwable throwable)
        throws Throwable;

}

常見的使用案例是計算給定類型的例外數量,並在達到限制時失敗。為此,Spring Batch 提供了 SimpleLimitExceptionHandler 和稍微更具彈性的 RethrowOnThresholdExceptionHandlerSimpleLimitExceptionHandler 具有 limit 屬性和應與目前例外比較的例外類型。也計算提供的類型的所有子類別。在達到限制之前,會忽略給定類型的例外,然後會重新擲回這些例外。其他類型的例外始終會重新擲回。

SimpleLimitExceptionHandler 的一個重要的選用屬性是名為 useParent 的布林旗標。預設情況下為 false,因此 limit 僅在目前的 RepeatContext 中計算。當設定為 true 時,limit 會跨巢狀迭代中的同級 context 保留(例如 step 內的一組 chunks)。

Listeners

通常,能夠接收額外的回呼以處理跨越多個不同迭代的跨領域問題非常有用。為此,Spring Batch 提供了 RepeatListener 介面。RepeatTemplate 允許使用者註冊 RepeatListener 實作,並且在迭代期間,它們會獲得具有 RepeatContextRepeatStatus 的回呼(如果可用)。

RepeatListener 介面具有以下定義

public interface RepeatListener {
    void before(RepeatContext context);
    void after(RepeatContext context, RepeatStatus result);
    void open(RepeatContext context);
    void onError(RepeatContext context, Throwable e);
    void close(RepeatContext context);
}

openclose 回呼在整個迭代之前和之後出現。beforeafteronError 適用於個別的 RepeatCallback 呼叫。

請注意,當有多個 listener 時,它們會在清單中,因此有一個順序。在這種情況下,openbefore 以相同的順序呼叫,而 afteronErrorclose 則以相反的順序呼叫。

並行處理

RepeatOperations 的實作不限於循序執行回呼。某些實作能夠並行執行其回呼非常重要。為此,Spring Batch 提供了 TaskExecutorRepeatTemplate,它使用 Spring TaskExecutor 策略來執行 RepeatCallback。預設是使用 SynchronousTaskExecutor,其效果是在同一個執行緒中執行整個迭代(與正常的 RepeatTemplate 相同)。

宣告式迭代

有時,有些業務處理您知道每次發生時都想要重複執行。這方面的經典範例是訊息管線的優化。如果一批訊息頻繁到達,則處理它們比承擔每則訊息單獨交易的成本更有效率。Spring Batch 提供了一個 AOP 攔截器,它將方法呼叫包裝在 RepeatOperations 物件中以達到此目的。RepeatOperationsInterceptor 執行攔截的方法,並根據提供的 RepeatTemplate 中的 CompletionPolicy 重複執行。

  • Java

  • XML

以下範例使用 Java 設定來重複呼叫名為 processMessage 的服務方法(有關如何設定 AOP 攔截器的更多詳細資訊,請參閱 <<docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 使用者指南>>)

@Bean
public MyService myService() {
	ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
	factory.setInterfaces(MyService.class);
	factory.setTarget(new MyService());

	MyService service = (MyService) factory.getProxy();
	JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
	pointcut.setPatterns(".*processMessage.*");

	RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();

	((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));

	return service;
}

以下範例顯示宣告式迭代,其使用 Spring AOP 命名空間來重複呼叫名為 processMessage 的服務方法(有關如何設定 AOP 攔截器的更多詳細資訊,請參閱 <<docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 使用者指南>>)

<aop:config>
    <aop:pointcut id="transactional"
        expression="execution(* com..*Service.processMessage(..))" />
    <aop:advisor pointcut-ref="transactional"
        advice-ref="retryAdvice" order="-1"/>
</aop:config>

<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>

先前的範例在攔截器內部使用預設的 RepeatTemplate。若要變更 policies、listeners 和其他詳細資訊,您可以將 RepeatTemplate 的執行個體注入到攔截器中。

如果攔截的方法傳回 void,則攔截器始終傳回 RepeatStatus.CONTINUABLE(因此,如果 CompletionPolicy 沒有有限的終點,則存在無限迴圈的危險)。否則,它會傳回 RepeatStatus.CONTINUABLE,直到從攔截的方法傳回的值為 null。在這種情況下,它會傳回 RepeatStatus.FINISHED。因此,目標方法內的業務邏輯可以透過傳回 null 或擲回例外(由提供的 RepeatTemplate 中的 ExceptionHandler 重新擲回)來發出訊號,表示沒有更多工作要做。