常見批次模式

某些批次 Job 可以完全由 Spring Batch 中的現成元件組裝而成。例如,可以配置 ItemReaderItemWriter 實作來涵蓋各種情境。然而,在大多數情況下,必須編寫自訂程式碼。應用程式開發人員的主要 API 進入點是 TaskletItemReaderItemWriter 和各種監聽器介面。大多數簡單的批次 Job 可以使用來自 Spring Batch ItemReader 的現成輸入,但通常情況是,在處理和寫入方面存在自訂考量,需要開發人員實作 ItemWriterItemProcessor

在本章中,我們提供了一些自訂業務邏輯中常見模式的範例。這些範例主要以監聽器介面為特色。應注意,如果適當,ItemReaderItemWriter 也可以實作監聽器介面。

記錄 Item 處理與失敗

常見的使用案例是需要逐個 Item 特別處理 Step 中的錯誤,例如記錄到特殊通道或將記錄插入資料庫。區塊導向的 Step(從 Step 工廠 bean 建立)讓使用者可以使用簡單的 ItemReadListener 來實作此使用案例,以處理 read 上的錯誤,並使用 ItemWriteListener 來處理 write 上的錯誤。以下程式碼片段說明了一個監聽器,該監聽器記錄讀取和寫入失敗

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

實作此監聽器後,必須向 Step 註冊。

  • Java

  • XML

以下範例展示如何在 Java 中向 Step 註冊監聽器

Java 設定
@Bean
public Step simpleStep(JobRepository jobRepository) {
	return new StepBuilder("simpleStep", jobRepository)
				...
				.listener(new ItemFailureLoggerListener())
				.build();
}

以下範例展示如何在 XML 中向 Step 註冊監聽器

XML 設定
<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>
如果您的監聽器在 onError() 方法中執行任何操作,則它必須在即將回滾的交易中。如果您需要在 onError() 方法中使用交易資源(例如資料庫),請考慮向該方法新增宣告式交易(請參閱 Spring Core Reference Guide 以取得詳細資訊),並將其傳播屬性值設為 REQUIRES_NEW

因業務原因手動停止 Job

Spring Batch 透過 JobOperator 介面提供 stop() 方法,但這實際上是供操作員而非應用程式程式設計人員使用。有時,從業務邏輯內部停止 Job 執行更方便或更有意義。

最簡單的做法是擲回 RuntimeException(既不無限期重試也不跳過的例外)。例如,可以使用自訂例外類型,如下列範例所示

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

停止 Step 執行的另一個簡單方法是從 ItemReader 傳回 null,如下列範例所示

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

先前的範例實際上依賴於存在 CompletionPolicy 策略的預設實作,該策略在要處理的 Item 為 null 時發出批次完成訊號。可以實作更複雜的完成策略,並透過 SimpleStepFactoryBean 注入到 Step 中。

  • Java

  • XML

以下範例展示如何在 Java 中將完成策略注入 Step

Java 設定
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("simpleStep", jobRepository)
				.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
				.reader(reader())
				.writer(writer())
				.build();
}

以下範例展示如何在 XML 中將完成策略注入 Step

XML 設定
<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

另一種方法是在 StepExecution 中設定一個旗標,框架中的 Step 實作會在 Item 處理之間檢查該旗標。為了實作此替代方案,我們需要存取目前的 StepExecution,這可以透過實作 StepListener 並向 Step 註冊來達成。以下範例展示了一個設定旗標的監聽器

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly();
       }
    }

}

設定旗標後,預設行為是 Step 擲回 JobInterruptedException。此行為可以透過 StepInterruptionPolicy 控制。但是,唯一的選擇是擲回或不擲回例外,因此這始終是 Job 的異常結束。

加入頁尾記錄

通常,在寫入平面檔案時,必須在所有處理完成後,將「頁尾」記錄附加到檔案的末尾。這可以使用 Spring Batch 提供的 FlatFileFooterCallback 介面來達成。FlatFileFooterCallback(及其對應項 FlatFileHeaderCallback)是 FlatFileItemWriter 的選用屬性,可以新增至 Item Writer。

  • Java

  • XML

以下範例展示如何在 Java 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

Java 設定
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.headerCallback(headerCallback())
			.footerCallback(footerCallback())
			.build();
}

以下範例展示如何在 XML 中使用 FlatFileHeaderCallbackFlatFileFooterCallback

XML 設定
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

頁尾回呼介面只有一個方法,當必須寫入頁尾時會呼叫該方法,如下列介面定義所示

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

寫入摘要頁尾

涉及頁尾記錄的常見需求是在輸出過程中彙總資訊,並將此資訊附加到檔案的末尾。此頁尾通常用作檔案的摘要或提供校驗和。

例如,如果批次 Job 正在將 Trade 記錄寫入平面檔案,並且需要將所有 Trade 的總金額放在頁尾中,則可以使用以下 ItemWriter 實作

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(Chunk<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

TradeItemWriter 儲存一個 totalAmount 值,該值隨著寫入的每個 Trade Item 的 amount 而增加。在處理完最後一個 Trade 後,框架會呼叫 writeFooter,將 totalAmount 放入檔案中。請注意,write 方法使用了臨時變數 chunkTotal,它儲存區塊中 Trade 金額的總計。這樣做是為了確保如果 write 方法中發生跳過,則 totalAmount 保持不變。只有在 write 方法結束時,一旦我們保證不會擲回任何例外,我們才會更新 totalAmount

為了呼叫 writeFooter 方法,必須將 TradeItemWriter(實作 FlatFileFooterCallback)作為 footerCallback 連接到 FlatFileItemWriter 中。

  • Java

  • XML

以下範例展示如何在 Java 中連結 TradeItemWriter

Java 設定
@Bean
public TradeItemWriter tradeItemWriter() {
	TradeItemWriter itemWriter = new TradeItemWriter();

	itemWriter.setDelegate(flatFileItemWriter(null));

	return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.footerCallback(tradeItemWriter())
			.build();
}

以下範例展示如何在 XML 中連結 TradeItemWriter

XML 設定
<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

到目前為止,TradeItemWriter 的編寫方式只有在 Step 不可重新啟動時才能正確運作。這是因為類別是有狀態的(因為它儲存了 totalAmount),但 totalAmount 未持久化到資料庫中。因此,在重新啟動時無法檢索它。為了使此類別可重新啟動,應實作 ItemStream 介面以及方法 openupdate,如下列範例所示

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

update 方法在物件持久化到資料庫之前,將 totalAmount 的最新版本儲存到 ExecutionContext。open 方法從 ExecutionContext 檢索任何現有的 totalAmount,並將其用作處理的起點,允許 TradeItemWriter 在重新啟動時從上次執行 Step 時停止的地方繼續。

驅動查詢式 ItemReaders

關於 Readers 和 Writers 的章節中,討論了使用分頁的資料庫輸入。許多資料庫供應商(例如 DB2)都具有極其悲觀的鎖定策略,如果正在讀取的表也需要被線上應用程式的其他部分使用,則可能會導致問題。此外,在極大型資料集上開啟游標可能會在某些供應商的資料庫上造成問題。因此,許多專案傾向於使用「驅動查詢」方法來讀取資料。此方法的工作原理是迭代鍵,而不是需要傳回的整個物件,如下圖所示

Driving Query Job
圖 1. 驅動查詢 Job

如您所見,上圖所示的範例使用了與基於游標的範例中相同的「FOO」表。但是,SQL 陳述式中僅選取了 ID,而不是選取整個列。因此,從 read 傳回的不是 FOO 物件,而是 Integer。然後可以使用此數字查詢「詳細資訊」,即完整的 Foo 物件,如下圖所示

Driving Query Example
圖 2. 驅動查詢範例

應使用 ItemProcessor 將從驅動查詢取得的鍵轉換為完整的 Foo 物件。可以使用現有的 DAO 根據鍵查詢完整物件。

多行記錄

雖然平面檔案通常每條記錄都限制在一行中,但檔案可能具有跨越多行且具有多種格式的記錄是很常見的。以下檔案摘錄顯示了這種安排的範例

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

以 'HEA' 開頭的行和以 'FOT' 開頭的行之間的所有內容都被視為一條記錄。為了正確處理這種情況,必須考慮以下幾點

  • ItemReader 必須將多行記錄的每一行作為一個群組讀取,而不是一次讀取一條記錄,以便可以完整地將其傳遞給 ItemWriter

  • 每種行類型可能需要以不同的方式進行符號化。

由於單一記錄跨越多行,並且由於我們可能不知道有多少行,因此 ItemReader 必須小心始終讀取整條記錄。為了做到這一點,自訂 ItemReader 應實作為 FlatFileItemReader 的包裝器。

  • Java

  • XML

以下範例展示如何在 Java 中實作自訂 ItemReader

Java 設定
@Bean
public MultiLineTradeItemReader itemReader() {
	MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

	itemReader.setDelegate(flatFileItemReader());

	return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
			.name("flatFileItemReader")
			.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
			.lineTokenizer(orderFileTokenizer())
			.fieldSetMapper(orderFieldSetMapper())
			.build();
	return reader;
}

以下範例展示如何在 XML 中實作自訂 ItemReader

XML 設定
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

為了確保正確地將每一行符號化(這對於固定長度輸入尤其重要),可以在委派 FlatFileItemReader 上使用 PatternMatchingCompositeLineTokenizer。請參閱Readers and Writers 章節中的 FlatFileItemReader 以取得更多詳細資訊。然後,委派 Reader 使用 PassThroughFieldSetMapper 為包裝 ItemReader 傳回每一行的 FieldSet

  • Java

  • XML

以下範例展示如何確保在 Java 中正確地將每一行符號化

Java 內容
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
	PatternMatchingCompositeLineTokenizer tokenizer =
			new PatternMatchingCompositeLineTokenizer();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

	tokenizers.put("HEA*", headerRecordTokenizer());
	tokenizers.put("FOT*", footerRecordTokenizer());
	tokenizers.put("NCU*", customerLineTokenizer());
	tokenizers.put("BAD*", billingAddressLineTokenizer());

	tokenizer.setTokenizers(tokenizers);

	return tokenizer;
}

以下範例展示如何確保在 XML 中正確地將每一行符號化

XML 內容
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

此包裝器必須能夠識別記錄的結尾,以便它可以持續呼叫其委派上的 read(),直到到達結尾。對於讀取的每一行,包裝器都應建構要傳回的 Item。一旦到達頁尾,就可以傳回 Item 以交付給 ItemProcessorItemWriter,如下列範例所示

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

執行系統命令

許多批次 Job 需要從批次 Job 內部呼叫外部命令。此類程序可以由排程器單獨啟動,但關於執行的常見 Metadata 優勢將會遺失。此外,多 Step Job 也需要拆分為多個 Job。

由於需求非常普遍,Spring Batch 提供了用於呼叫系統命令的 Tasklet 實作。

  • Java

  • XML

以下範例展示如何在 Java 中呼叫外部命令

Java 設定
@Bean
public SystemCommandTasklet tasklet() {
	SystemCommandTasklet tasklet = new SystemCommandTasklet();

	tasklet.setCommand("echo hello");
	tasklet.setTimeout(5000);

	return tasklet;
}

以下範例展示如何在 XML 中呼叫外部命令

XML 設定
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

處理找不到輸入時的 Step 完成

在許多批次情境中,找不到要處理的資料庫或檔案中的列並非異常情況。Step 只是被認為沒有找到工作,並且在讀取 0 個 Item 的情況下完成。Spring Batch 中開箱即用的所有 ItemReader 實作都預設為此方法。如果即使在存在輸入時也沒有寫出任何內容(如果檔案名稱錯誤或出現某些類似問題,通常會發生這種情況),這可能會導致一些混淆。因此,應檢查 Metadata 本身以確定框架發現要處理的工作量。但是,如果找不到輸入被認為是異常情況怎麼辦?在這種情況下,以程式設計方式檢查 Metadata 是否沒有處理任何 Item 並導致失敗是最佳解決方案。由於這是一個常見的使用案例,Spring Batch 提供了一個具有完全此功能的監聽器,如下列 NoWorkFoundStepExecutionListener 的類別定義所示

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

上述 StepExecutionListener 在 'afterStep' 階段檢查 StepExecutionreadCount 屬性,以確定是否沒有讀取任何 Item。如果是這種情況,則傳回結束代碼 FAILED,表示 Step 應失敗。否則,傳回 null,這不會影響 Step 的狀態。

將資料傳遞至後續 Step

將資訊從一個 Step 傳遞到另一個 Step 通常很有用。這可以透過 ExecutionContext 完成。問題是有兩個 ExecutionContext:一個在 Step 層級,另一個在 Job 層級。Step ExecutionContext 僅在 Step 存在期間保留,而 Job ExecutionContext 在整個 Job 期間保留。另一方面,Step ExecutionContext 在每次 Step 提交區塊時更新,而 Job ExecutionContext 僅在每個 Step 結束時更新。

這種分離的結果是,所有資料都必須在 Step 執行時放置在 Step ExecutionContext 中。這樣做可確保在 Step 執行時正確儲存資料。如果資料儲存到 Job ExecutionContext,則在 Step 執行期間不會持久化。如果 Step 失敗,則該資料將遺失。

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

為了使資料可供未來的 Steps 使用,必須在 Step 完成後將其「提升」到 Job ExecutionContext。Spring Batch 提供了 ExecutionContextPromotionListener 來達到此目的。必須使用與 ExecutionContext 中必須提升的資料相關的鍵來配置監聽器。它也可以選擇性地配置為應該發生提升的結束代碼模式列表(COMPLETED 是預設值)。與所有監聽器一樣,它必須在 Step 上註冊。

  • Java

  • XML

以下範例展示如何在 Java 中將 Step 提升至 Job ExecutionContext

Java 設定
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
	return new JobBuilder("job1", jobRepository)
				.start(step1)
				.next(step2)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(savingWriter())
				.listener(promotionListener())
				.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
	ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

	listener.setKeys(new String[] {"someKey"});

	return listener;
}

以下範例展示如何在 XML 中將 Step 提升至 Job ExecutionContext

XML 設定
<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

最後,必須從 Job ExecutionContext 檢索已儲存的值,如下列範例所示

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}