擴展和並行處理

許多批次處理問題可以使用單執行緒、單進程的 Job 來解決,因此在考慮更複雜的實作之前,務必先妥善檢查這是否符合您的需求,這始終是一個好主意。先測量實際 Job 的效能,看看最簡單的實作是否符合您的需求。即使使用標準硬體,您也可以在一分鐘內讀取和寫入數百 MB 的檔案。

當您準備好開始實作具有某些並行處理的 Job 時,Spring Batch 提供了多種選項,本章將對其進行說明,儘管某些功能在其他地方涵蓋。在高層次上,有兩種並行處理模式

  • 單進程,多執行緒

  • 多進程

這些模式還可以細分為以下類別

  • 多執行緒 Step (單進程)

  • 並行 Step (單進程)

  • Step 的遠端區塊處理 (多進程)

  • 分割 Step (單進程或多進程)

首先,我們回顧單進程選項。然後,我們回顧多進程選項。

多執行緒 Step

開始並行處理最簡單的方法是在您的 Step 設定中新增 TaskExecutor

  • Java

  • XML

當使用 Java 設定時,您可以將 TaskExecutor 新增到 Step,如下列範例所示

Java 設定
@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}

例如,您可以將屬性新增至 tasklet,如下所示

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

在此範例中,taskExecutor 是對另一個實作 TaskExecutor 介面的 Bean 定義的參考。TaskExecutor 是一個標準的 Spring 介面,因此請查閱 Spring 使用者指南以了解可用實作的詳細資訊。最簡單的多執行緒 TaskExecutorSimpleAsyncTaskExecutor

上述設定的結果是,Step 透過在單獨的執行緒中執行讀取、處理和寫入每個項目區塊(每個提交間隔)來執行。請注意,這表示要處理的項目沒有固定的順序,並且與單執行緒情況相比,一個區塊可能包含非連續的項目。除了 Task Executor 施加的任何限制(例如,它是否由執行緒池支援)之外,tasklet 設定還有一個節流限制(預設值:4)。您可能需要增加此限制,以確保執行緒池得到充分利用。

  • Java

  • XML

當使用 Java 設定時,建構器提供對節流限制的存取,如下所示

Java 設定
@Bean
public Step sampleStep(TaskExecutor taskExecutor, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}

例如,您可以增加節流限制,如下所示

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

另請注意,您的 Step 中使用的任何共用資源(例如 DataSource)都可能對並行性施加限制。請務必使這些資源中的池至少與 Step 中所需的並行執行緒數一樣大。

節流限制棄用

從 v5.0 開始,節流限制已棄用,沒有替代方案。如果您想替換預設 TaskExecutorRepeatTemplate 中的目前節流機制,您需要提供自訂的 RepeatOperations 實作(基於具有有界任務佇列的 TaskExecutor),並使用 StepBuilder#stepOperations 在 Step 上設定它

Java 設定
@Bean
public Step sampleStep(RepeatOperations customRepeatOperations, JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("sampleStep", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(itemReader())
				.writer(itemWriter())
				.stepOperations(customRepeatOperations)
				.build();
}

對於某些常見的批次使用案例,使用多執行緒 Step 實作存在一些實際限制。Step 中的許多參與者(例如讀取器和寫入器)都是有狀態的。如果狀態未按執行緒隔離,則這些元件無法在多執行緒 Step 中使用。特別是,Spring Batch 中的大多數讀取器和寫入器並非設計用於多執行緒使用。但是,可以使用無狀態或執行緒安全的讀取器和寫入器,並且在 Spring Batch 範例中,有一個範例(稱為 parallelJob)展示了進程指示器的使用(請參閱防止狀態持久化),以追蹤已在資料庫輸入表中處理的項目。

Spring Batch 提供了一些 ItemWriterItemReader 的實作。通常,它們會在 Javadoc 中說明它們是否是執行緒安全的,或者您必須做什麼才能避免在並行環境中出現問題。如果在 Javadoc 中沒有資訊,您可以檢查實作以查看是否有任何狀態。如果讀取器不是執行緒安全的,您可以使用提供的 SynchronizedItemStreamReader 修飾它,或在您自己的同步委派器中使用它。您可以同步對 read() 的呼叫,並且只要處理和寫入是區塊中最昂貴的部分,您的 Step 仍然可以比單執行緒設定更快地完成。

並行 Step

只要需要並行化的應用程式邏輯可以拆分為不同的職責並分配給個別的 Step,就可以在單個進程中進行並行化。並行 Step 執行很容易設定和使用。

  • Java

  • XML

當使用 Java 設定時,並行執行 Step (step1,step2)step3 很簡單,如下所示

Java 設定
@Bean
public Job job(JobRepository jobRepository) {
    return new JobBuilder("job", jobRepository)
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

例如,並行執行 Step (step1,step2)step3 很簡單,如下所示

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

可設定的 Task Executor 用於指定哪個 TaskExecutor 實作應執行個別流程。預設值為 SyncTaskExecutor,但需要異步 TaskExecutor 才能並行執行 Step。請注意,Job 會確保分割中的每個流程在彙總退出狀態和轉換之前完成。

有關更多詳細資訊,請參閱關於分割流程的章節。

遠端區塊處理

在遠端區塊處理中,Step 處理跨多個進程分割,並透過某些中介軟體相互通訊。下圖顯示了此模式

Remote Chunking
圖 1. 遠端區塊處理

管理員元件是單個進程,而工作人員是多個遠端進程。如果管理員不是瓶頸,則此模式效果最佳,因此處理必須比讀取項目更昂貴(實際情況通常如此)。

管理員是 Spring Batch Step 的實作,其中 ItemWriter 被替換為通用版本,該版本知道如何將項目區塊作為訊息傳送至中介軟體。工作人員是正在使用的任何中介軟體的標準監聽器(例如,對於 JMS,它們將是 MesssageListener 實作),它們的角色是透過使用標準 ItemWriterItemProcessor 加上 ItemWriter,透過 ChunkProcessor 介面來處理項目區塊。使用此模式的優點之一是讀取器、處理器和寫入器元件是現成的(與 Step 的本機執行所使用的相同)。項目是動態劃分的,工作是透過中介軟體共用的,因此,如果監聽器都是渴望的消費者,則負載平衡是自動的。

中介軟體必須是持久的,具有保證的交付和每個訊息的單個消費者。JMS 是明顯的候選者,但在網格計算和共用記憶體產品空間中還存在其他選項(例如 JavaSpaces)。

有關更多詳細資訊,請參閱關於Spring Batch 整合 - 遠端區塊處理的章節。

分割

Spring Batch 也提供了一個 SPI,用於分割 Step 執行並遠端執行它。在這種情況下,遠端參與者是 Step 實例,它們可以很容易地配置並用於本機處理。下圖顯示了此模式

Partitioning Overview
圖 2. 分割

Job 在左側作為 Step 實例序列執行,其中一個 Step 實例標記為管理員。此圖中的工作人員都是 Step 的相同實例,實際上可以取代管理員,從而為 Job 產生相同的結果。工作人員通常是遠端服務,但也可能是本機執行緒。管理員在此模式中傳送給工作人員的訊息不需要是持久的或具有保證的交付。JobRepository 中的 Spring Batch 中繼資料確保每個工作人員針對每個 Job 執行執行一次且僅執行一次。

Spring Batch 中的 SPI 由 Step 的特殊實作(稱為 PartitionStep)和兩個策略介面組成,這兩個策略介面需要針對特定環境實作。策略介面是 PartitionHandlerStepExecutionSplitter,以下順序圖顯示了它們的角色

Partitioning SPI
圖 3. 分割 SPI

在這種情況下,右側的 Step 是「遠端」工作人員,因此,潛在地,有許多物件和/或進程扮演這個角色,並且 PartitionStep 顯示正在驅動執行。

  • Java

  • XML

以下範例顯示了使用 Java 設定時的 PartitionStep 設定

Java 設定
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

與多執行緒 Step 的 throttleLimit 方法類似,gridSize 方法可防止 Task Executor 被來自單個 Step 的請求飽和。

以下範例顯示了使用 XML 設定時的 PartitionStep 設定

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

與多執行緒 Step 的 throttle-limit 屬性類似,grid-size 屬性可防止 Task Executor 被來自單個 Step 的請求飽和。

Spring Batch 範例的單元測試套件(請參閱 partition*Job.xml 設定)有一個簡單的範例,您可以複製和擴展它。

Spring Batch 為名為 step1:partition0 等的分割建立 Step 執行。許多人更喜歡將管理員 Step 稱為 step1:manager 以保持一致性。您可以使用 Step 的別名(透過指定 name 屬性而不是 id 屬性)。

PartitionHandler

PartitionHandler 是了解遠端處理或網格環境結構的元件。它能夠將 StepExecution 請求傳送至遠端 Step 實例,並以某些結構特定的格式(如 DTO)包裝。它不必知道如何分割輸入資料或如何彙總多個 Step 執行的結果。一般來說,它可能也不需要了解彈性或容錯移轉,因為在許多情況下,這些都是結構的功能。無論如何,Spring Batch 始終提供獨立於結構的可重新啟動性。失敗的 Job 始終可以重新啟動,在這種情況下,僅重新執行失敗的 Step

PartitionHandler 介面可以針對各種結構類型進行專門的實作,包括簡單的 RMI 遠端處理、EJB 遠端處理、自訂 Web 服務、JMS、Java Spaces、共用記憶體網格(如 Terracotta 或 Coherence)和網格執行結構(如 GridGain)。Spring Batch 不包含任何專有網格或遠端處理結構的實作。

但是,Spring Batch 確實提供了一個有用的 PartitionHandler 實作,該實作使用 Spring 中的 TaskExecutor 策略在單獨的執行緒中在本機執行 Step 實例。該實作稱為 TaskExecutorPartitionHandler

  • Java

  • XML

您可以使用 Java 設定顯式設定 TaskExecutorPartitionHandler,如下所示

Java 設定
@Bean
public Step step1Manager(JobRepository jobRepository) {
    return new StepBuilder("step1.manager", jobRepository)
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

TaskExecutorPartitionHandler 是先前顯示的 XML 命名空間設定的 Step 的預設值。您也可以顯式設定它,如下所示

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

gridSize 屬性決定要建立的單獨 Step 執行的數量,因此它可以與 TaskExecutor 中的執行緒池大小相匹配。或者,它可以設定為大於可用執行緒數,這會使工作區塊更小。

TaskExecutorPartitionHandler 對於 IO 密集型 Step 實例很有用,例如複製大量檔案或將檔案系統複製到內容管理系統中。它也可以透過提供作為遠端調用代理的 Step 實作(例如使用 Spring Remoting)用於遠端執行。

Partitioner

Partitioner 具有更簡單的職責:僅為新的 Step 執行產生執行環境定義作為輸入參數(無需擔心重新啟動)。它有一個單一方法,如下列介面定義所示

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

此方法的傳回值將每個 Step 執行的唯一名稱 (String) 與 ExecutionContext 形式的輸入參數關聯起來。這些名稱稍後會在 Batch 中繼資料中顯示為分割 StepExecutions 中的 Step 名稱。ExecutionContext 只是一個名稱-值對的包,因此它可能包含主要金鑰範圍、行號或輸入檔案的位置。然後,遠端 Step 通常透過使用 #{…​} 佔位符(Step 範圍中的延遲綁定)綁定到環境定義輸入,如下節所示。

Step 執行的名稱(Partitioner 傳回的 Map 中的金鑰)需要在 Job 的 Step 執行中是唯一的,但沒有任何其他特定要求。執行此操作的最簡單方法(以及使名稱對使用者有意義)是使用前綴+後綴命名慣例,其中前綴是正在執行的 Step 的名稱(其本身在 Job 中是唯一的),後綴只是一個計數器。框架中有一个 SimplePartitioner 使用了此慣例。

您可以使用名為 PartitionNameProvider 的可選介面來與分割本身分開提供分割名稱。如果 Partitioner 實作了此介面,則僅在重新啟動時查詢名稱。如果分割成本很高,這可能是一個有用的最佳化。PartitionNameProvider 提供的名稱必須與 Partitioner 提供的名稱相符。

將輸入資料繫結到 Step

對於由 PartitionHandler 執行的 Step 來說,具有相同的設定並在執行時從 ExecutionContext 繫結其輸入參數非常有效。這很容易透過 Spring Batch 的 StepScope 功能來完成(在關於延遲綁定的章節中更詳細地介紹)。例如,如果 Partitioner 建立具有名為 fileName 的屬性金鑰的 ExecutionContext 實例,該金鑰指向每個 Step 調用的不同檔案(或目錄),則 Partitioner 輸出可能類似於下表的内容

表 1. Partitioner 提供的針對目錄處理的範例 Step 執行名稱到執行環境定義

Step 執行名稱 (金鑰)

ExecutionContext (值)

filecopy:partition0

fileName=/home/data/one

filecopy:partition1

fileName=/home/data/two

filecopy:partition2

fileName=/home/data/three

然後,可以使用延遲綁定到執行環境定義,將檔案名稱繫結到 Step。

  • Java

  • XML

以下範例顯示如何在 Java 中定義延遲綁定

Java 設定
@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}

以下範例顯示如何在 XML 中定義延遲綁定

XML 設定
<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>