批次處理的領域語言

對於任何經驗豐富的批次處理架構師來說,Spring Batch 中使用的批次處理整體概念應該是熟悉且舒適的。有「Jobs」和「Steps」,以及開發人員提供的處理單元,稱為 ItemReaderItemWriter。然而,由於 Spring 的模式、操作、範本、回呼和慣用語,以下方面有改進的機會:

  • 在更清楚地遵守關注點分離方面有顯著改進。

  • 清楚劃分的架構層次和以介面形式提供的服務。

  • 簡單且預設的實作,可實現快速採用和開箱即用的易用性。

  • 顯著增強的擴展性。

下圖是數十年來使用的批次處理參考架構的簡化版本。它概述了構成批次處理領域語言的組件。此架構框架是一個藍圖,已在過去幾代平台(大型主機上的 COBOL、Unix 上的 C,以及現在任何地方的 Java)上的數十年實作中得到驗證。JCL 和 COBOL 開發人員可能會像 C、C# 和 Java 開發人員一樣熟悉這些概念。Spring Batch 提供了層、組件和技術服務的實體實作,這些層、組件和技術服務常見於用於解決簡單到複雜批次處理應用程式創建的穩健、可維護系統中,並具有解決非常複雜處理需求的基礎設施和擴展。

Figure 2.1: Batch Stereotypes
圖 1. 批次處理原型

前面的圖表突顯了構成 Spring Batch 領域語言的關鍵概念。一個 Job 有一到多個步驟,每個步驟恰好有一個 ItemReader、一個 ItemProcessor 和一個 ItemWriter。一個 Job 需要被啟動(使用 JobLauncher),並且需要儲存關於目前執行程序的元數據(在 JobRepository 中)。

Job

本節描述與批次處理 Job 概念相關的原型。Job 是一個封裝整個批次處理程序的實體。與其他 Spring 專案常見的情況一樣,Job 可以使用 XML 設定檔或基於 Java 的設定來組裝在一起。此設定可以稱為「job 設定」。然而,Job 只是整體階層的頂端,如下圖所示

Job Hierarchy
圖 2. Job 階層

在 Spring Batch 中,Job 僅僅是 Step 實例的容器。它將多個邏輯上屬於同一個流程的步驟組合在一起,並允許設定所有步驟通用的屬性,例如可重新啟動性。job 設定包含

  • Job 的名稱。

  • Step 實例的定義和順序。

  • Job 是否可重新啟動。

  • Java

  • XML

對於那些使用 Java 設定的人來說,Spring Batch 以 SimpleJob 類別的形式提供了 Job 介面的預設實作,該類別在 Job 之上建立了一些標準功能。當使用基於 Java 的設定時,可以使用建構器集合來實例化 Job,如下例所示

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .build();
}

對於那些使用 XML 設定的人來說,Spring Batch 以 SimpleJob 類別的形式提供了 Job 介面的預設實作,該類別在 Job 之上建立了一些標準功能。然而,批次處理命名空間抽象化了直接實例化它的需求。相反,您可以使用 <job> 元素,如下例所示

<job id="footballJob">
    <step id="playerload" next="gameLoad"/>
    <step id="gameLoad" next="playerSummarization"/>
    <step id="playerSummarization"/>
</job>

JobInstance

JobInstance 指的是邏輯 job 執行的概念。考慮一個應該在每天結束時執行一次的批次處理 job,例如上圖中的 EndOfDay Job。有一個 EndOfDay job,但是 Job 的每次單獨執行都必須單獨追蹤。對於此 job,每天有一個邏輯 JobInstance。例如,有 1 月 1 日執行、1 月 2 日執行等等。如果 1 月 1 日執行第一次失敗,並在第二天再次執行,它仍然是 1 月 1 日執行。(通常,這也與它正在處理的數據相對應,意味著 1 月 1 日執行處理 1 月 1 日的數據)。因此,每個 JobInstance 可以有多個執行(JobExecution 將在本章稍後更詳細地討論),並且在給定時間只能運行一個 JobInstance(它對應於特定的 Job 和識別 JobParameters)。

JobInstance 的定義與要載入的數據絕對無關。完全由 ItemReader 實作來決定如何載入數據。例如,在 EndOfDay 場景中,數據上可能有一列指示數據所屬的「生效日期」或「排程日期」。因此,1 月 1 日執行將僅載入來自 1 日的數據,而 1 月 2 日執行將僅使用來自 2 日的數據。由於此決定很可能是業務決策,因此留給 ItemReader 來決定。但是,使用相同的 JobInstance 會決定是否使用先前執行的「狀態」(即 ExecutionContext,將在本章稍後討論)。使用新的 JobInstance 意味著「從頭開始」,而使用現有的實例通常意味著「從上次停止的地方開始」。

JobParameters

在討論了 JobInstance 以及它與 Job 的區別之後,自然而然的問題是:「如何區分一個 JobInstance 與另一個 JobInstance?」答案是:JobParametersJobParameters 物件保存一組用於啟動批次處理 job 的參數。它們可以用於識別,甚至可以用作執行期間的參考數據,如下圖所示

Job Parameters
圖 3. Job 參數

在前面的範例中,其中有兩個實例,一個用於 1 月 1 日,另一個用於 1 月 2 日,實際上只有一個 Job,但它有兩個 JobParameter 物件:一個是以 01-01-2017 的 job 參數啟動的,另一個是以 01-02-2017 的參數啟動的。因此,合約可以定義為:JobInstance = Job + 識別 JobParameters。這允許開發人員有效地控制如何定義 JobInstance,因為他們控制傳入的參數。

並非所有 job 參數都需要有助於 JobInstance 的識別。預設情況下,它們會這樣做。但是,框架也允許提交帶有不影響 JobInstance 身份的參數的 Job

JobExecution

JobExecution 指的是單次嘗試運行 Job 的技術概念。執行可能會以失敗或成功結束,但除非執行成功完成,否則與給定執行相對應的 JobInstance 不會被視為已完成。以上述 EndOfDay Job 為例,考慮一個用於 01-01-2017 的 JobInstance,它在第一次運行時失敗。如果使用與第一次運行相同的識別 job 參數(01-01-2017)再次運行,則會建立一個新的 JobExecution。但是,仍然只有一個 JobInstance

Job 定義了一個 job 是什麼以及如何執行它,而 JobInstance 是一個純粹的組織物件,用於將執行分組在一起,主要是為了啟用正確的重新啟動語意。然而,JobExecution 是實際運行期間發生的事情的主要儲存機制,並且包含更多必須控制和持久化的屬性,如下表所示

表 1. JobExecution 屬性

屬性

定義

狀態

一個 BatchStatus 物件,指示執行的狀態。運行時,它是 BatchStatus#STARTED。如果失敗,則是 BatchStatus#FAILED。如果成功完成,則是 BatchStatus#COMPLETED

startTime

一個 java.time.LocalDateTime,表示執行開始時的目前系統時間。如果 job 尚未開始,則此欄位為空。

endTime

一個 java.time.LocalDateTime,表示執行完成時的目前系統時間,無論是否成功。如果 job 尚未完成,則此欄位為空。

exitStatus

ExitStatus,指示運行的結果。它最重要,因為它包含返回給呼叫者的退出代碼。有關更多詳細資訊,請參閱第 5 章。如果 job 尚未完成,則此欄位為空。

createTime

一個 java.time.LocalDateTime,表示首次持久化 JobExecution 時的目前系統時間。job 可能尚未開始(因此沒有開始時間),但它始終具有 createTime,這是框架管理 job 層級 ExecutionContexts 所必需的。

lastUpdated

一個 java.time.LocalDateTime,表示上次持久化 JobExecution 的時間。如果 job 尚未開始,則此欄位為空。

executionContext

「屬性包」,包含需要在執行之間持久化的任何使用者數據。

failureExceptions

Job 執行期間遇到的例外列表。如果在 Job 失敗期間遇到多個例外,這些可能會很有用。

這些屬性很重要,因為它們是持久化的,並且可以用於完全確定執行的狀態。例如,如果 01-01 的 EndOfDay job 在晚上 9:00 執行,並在 9:30 失敗,則在批次處理元數據表中建立以下條目

表 2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01

TRUE

表 4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

為了清晰和格式化,欄位名稱可能已被縮寫或移除。

現在 job 已經失敗,假設花了整個晚上才確定問題,因此「批次處理視窗」現在已關閉。進一步假設視窗從晚上 9:00 開始,job 再次啟動以進行 01-01,從上次停止的地方開始,並在 9:30 成功完成。由於現在是第二天,因此也必須運行 01-02 job,它在 9:31 之後立即啟動,並在其正常的一小時時間內在 10:30 完成。除非兩個 job 可能嘗試存取相同的數據,導致資料庫層級的鎖定問題,否則不要求一個 JobInstance 在另一個之後啟動。完全由排程器來決定何時應運行 Job。由於它們是單獨的 JobInstance,因此 Spring Batch 不會嘗試阻止它們同時運行。(嘗試在另一個 JobInstance 已經運行時運行相同的 JobInstance 會導致拋出 JobExecutionAlreadyRunningException)。現在 JobInstanceJobParameters 表中都應該有一個額外的條目,並且在 JobExecution 表中應該有兩個額外的條目,如下表所示

表 5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

表 6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

IDENTIFYING

1

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

2

DATE

schedule.Date

2017-01-01 00:00:00

TRUE

3

DATE

schedule.Date

2017-01-02 00:00:00

TRUE

表 7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

2

1

2017-01-02 21:00

2017-01-02 21:30

COMPLETED

3

2

2017-01-02 21:31

2017-01-02 22:29

COMPLETED

為了清晰和格式化,欄位名稱可能已被縮寫或移除。

Step

Step 是一個領域物件,它封裝了批次處理 job 的獨立、循序階段。因此,每個 Job 完全由一個或多個步驟組成。Step 包含定義和控制實際批次處理所需的所有資訊。這是一個必然模糊的描述,因為任何給定 Step 的內容都由編寫 Job 的開發人員自行決定。Step 可以像開發人員希望的那樣簡單或複雜。一個簡單的 Step 可能會將資料從檔案載入到資料庫中,幾乎不需要程式碼(取決於使用的實作)。更複雜的 Step 可能具有複雜的業務規則,這些規則在處理過程中應用。與 Job 一樣,Step 具有與唯一 JobExecution 相關聯的個別 StepExecution,如下圖所示

Figure 2.1: Job Hierarchy With Steps
圖 4. 帶有步驟的 Job 階層

StepExecution

StepExecution 代表單次嘗試執行 Step。每次運行 Step 時都會建立新的 StepExecution,類似於 JobExecution。但是,如果由於之前的步驟失敗而導致步驟執行失敗,則不會為其持久化執行。僅當 StepExecutionStep 實際啟動時才會建立 StepExecution

Step 執行由 StepExecution 類別的物件表示。每個執行都包含對其對應步驟和 JobExecution 的參考,以及與交易相關的數據,例如提交和回滾計數以及開始和結束時間。此外,每個步驟執行都包含一個 ExecutionContext,其中包含開發人員需要在批次處理運行之間持久化的任何數據,例如統計資訊或重新啟動所需的狀態資訊。下表列出了 StepExecution 的屬性

表 8. StepExecution 屬性

屬性

定義

狀態

一個 BatchStatus 物件,指示執行的狀態。運行時,狀態為 BatchStatus.STARTED。如果失敗,狀態為 BatchStatus.FAILED。如果成功完成,狀態為 BatchStatus.COMPLETED

startTime

一個 java.time.LocalDateTime,表示執行開始時的目前系統時間。如果步驟尚未開始,則此欄位為空。

endTime

一個 java.time.LocalDateTime,表示執行完成時的目前系統時間,無論是否成功。如果步驟尚未退出,則此欄位為空。

exitStatus

指示執行結果的 ExitStatus。它最重要,因為它包含返回給呼叫者的退出代碼。有關更多詳細資訊,請參閱第 5 章。如果 job 尚未退出,則此欄位為空。

executionContext

「屬性包」,包含需要在執行之間持久化的任何使用者數據。

readCount

已成功讀取的項目數。

writeCount

已成功寫入的項目數。

commitCount

已為此執行提交的交易數。

rollbackCount

Step 控制的業務交易已回滾的次數。

readSkipCount

read 失敗的次數,導致項目被跳過。

processSkipCount

process 失敗的次數,導致項目被跳過。

filterCount

已被 ItemProcessor「篩選」的項目數。

writeSkipCount

write 失敗的次數,導致項目被跳過。

ExecutionContext

ExecutionContext 表示鍵/值對的集合,這些鍵/值對由框架持久化和控制,以便為開發人員提供一個位置來儲存範圍限定為 StepExecution 物件或 JobExecution 物件的持久狀態。(對於熟悉 Quartz 的人來說,它與 JobDataMap 非常相似。)最佳用法範例是促進重新啟動。以平面檔案輸入為例,在處理個別行時,框架會在提交點定期持久化 ExecutionContext。這樣做可以讓 ItemReader 儲存其狀態,以防在運行期間發生致命錯誤,甚至停電。所有需要做的就是將目前讀取的行數放入上下文中,如下例所示,框架會完成剩下的工作

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

以上述 Job 原型部分中的 EndOfDay 範例為例,假設有一個步驟 loadData,它將檔案載入到資料庫中。在第一次失敗運行後,元數據表將如下例所示

表 9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表 10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

DATE

schedule.Date

2017-01-01

表 11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

START_TIME

END_TIME

STATUS

1

1

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

START_TIME

END_TIME

STATUS

1

1

loadData

2017-01-01 21:00

2017-01-01 21:30

FAILED

表 13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count=40321}

在上述情況下,Step 運行了 30 分鐘,並處理了 40,321 個「片段」,在本場景中,這將代表檔案中的行。此值在框架每次提交之前更新,並且可以包含與 ExecutionContext 中的條目相對應的多行。在提交之前收到通知需要各種 StepListener 實作之一(或 ItemStream),這些實作將在本指南稍後更詳細地討論。與之前的範例一樣,假設 Job 在第二天重新啟動。重新啟動時,上次運行的 ExecutionContext 中的值會從資料庫中重新建立。當 ItemReader 開啟時,它可以檢查它在上下文中是否有任何儲存的狀態,並從那裡初始化自身,如下例所示

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在這種情況下,在前面的程式碼運行後,目前行是 40,322,讓 Step 從上次停止的地方再次開始。您也可以使用 ExecutionContext 來獲取需要持久化的關於運行本身的統計資訊。例如,如果平面檔案包含跨越多行的處理訂單,則可能需要儲存已處理的訂單數量(這與讀取的行數大不相同),以便可以在 Step 結束時發送一封電子郵件,其中包含正文中處理的訂單總數。框架處理為開發人員儲存此資訊,以正確地將其範圍限定為個別 JobInstance。很難知道是否應該使用現有的 ExecutionContext。例如,使用上面 EndOfDay 範例,當 01-01 運行第二次再次開始時,框架會識別出它是相同的 JobInstance,並在個別 Step 基礎上,從資料庫中提取 ExecutionContext,並將其(作為 StepExecution 的一部分)交給 Step 本身。相反,對於 01-02 運行,框架會識別出它是一個不同的實例,因此必須將一個空的上下文交給 Step。框架為開發人員做出了許多這種類型的判斷,以確保在正確的時間將狀態給予他們。同樣重要的是要注意,在任何給定時間,每個 StepExecution 都恰好存在一個 ExecutionContextExecutionContext 的客戶端應該小心,因為這會建立一個共用的鍵空間。因此,在放入值時應小心,以確保沒有數據被覆蓋。但是,Step 絕對不會在上下文中儲存任何數據,因此沒有辦法對框架產生不利影響。

請注意,每個 JobExecution 至少有一個 ExecutionContext,每個 StepExecution 也有一個。例如,考慮以下程式碼片段

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如註解中所述,ecStep 不等於 ecJob。它們是兩個不同的 ExecutionContexts。範圍限定為 Step 的那個在 Step 中的每個提交點儲存,而範圍限定為 Job 的那個在每個 Step 執行之間儲存。

ExecutionContext 中,所有非暫時性條目都必須是 Serializable。執行環境的正確序列化支撐了步驟和 job 的重新啟動能力。如果您使用本質上不可序列化的鍵或值,則需要採用量身定制的序列化方法。無法序列化執行環境可能會危及狀態持久化過程,使失敗的 job 無法正確恢復。

JobRepository

JobRepository 是所有先前提及的原型的持久化機制。它為 JobLauncherJobStep 實作提供 CRUD 操作。當 Job 首次啟動時,會從儲存庫取得 JobExecution。此外,在執行過程中,StepExecutionJobExecution 實作透過將它們傳遞到儲存庫來持久化。

  • Java

  • XML

當使用 Java 設定時,@EnableBatchProcessing 註解提供 JobRepository 作為自動設定的組件之一。

Spring Batch XML 命名空間提供使用 <job-repository> 標籤設定 JobRepository 實例的支援,如下例所示

<job-repository id="jobRepository"/>

JobLauncher

JobLauncher 表示用於使用給定的一組 JobParameters 啟動 Job 的簡單介面,如下例所示

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

預期實作從 JobRepository 取得有效的 JobExecution 並執行 Job

ItemReader

ItemReader 是一個抽象,表示一次一個項目地檢索 Step 的輸入。當 ItemReader 耗盡它可以提供的項目時,它會透過返回 null 來指示這一點。您可以在 Readers And Writers 中找到有關 ItemReader 介面及其各種實作的更多詳細資訊。

ItemWriter

ItemWriter 是一個抽象概念,代表 Step 的輸出,一次處理一個批次或區塊的項目。 一般來說,ItemWriter 不知道接下來應該接收哪個輸入,只知道在其當前調用中傳入的項目。 您可以在讀取器與寫入器中找到關於 ItemWriter 介面及其各種實作的更多詳細資訊。

ItemProcessor

ItemProcessor 是一個抽象概念,代表項目的業務處理。 雖然 ItemReader 讀取一個項目,而 ItemWriter 寫入一個項目,但 ItemProcessor 提供了轉換或應用其他業務處理的存取點。 如果在處理項目時,確定該項目無效,則傳回 null 表示不應將該項目寫出。 您可以在讀取器與寫入器中找到關於 ItemProcessor 介面的更多詳細資訊。

Batch 命名空間

先前列出的許多領域概念需要在 Spring ApplicationContext 中進行配置。 雖然您可以標準 Bean 定義中使用上述介面的實作,但為了方便配置,已提供命名空間,如下例所示

<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
   http://www.springframework.org/schema/beans
   https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/batch
   https://www.springframework.org/schema/batch/spring-batch.xsd">

<job id="ioSampleJob">
    <step id="step1">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
        </tasklet>
    </step>
</job>

</beans:beans>

只要宣告了 batch 命名空間,就可以使用它的任何元素。 您可以在設定與執行 Job 中找到關於配置 Job 的更多資訊。 您可以在設定 Step 中找到關於配置 Step 的更多資訊。