建立自訂 ItemReader 和 ItemWriter

到目前為止,本章已討論了 Spring Batch 中讀取和寫入的基本契約,以及一些常見的實作方式。然而,這些都相當通用,並且有許多潛在情境可能未包含在現成的實作中。本節透過一個簡單的範例,展示如何建立自訂 ItemReaderItemWriter 實作,並正確實作其契約。 ItemReader 也實作了 ItemStream,以便說明如何使 reader 或 writer 可重新啟動。

自訂 ItemReader 範例

為了本範例的目的,我們建立一個簡單的 ItemReader 實作,從提供的清單中讀取。我們從實作 ItemReader 最基本的契約,即 read 方法開始,如下列程式碼所示

public class CustomItemReader<T> implements ItemReader<T> {

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

上述類別接受一個項目清單,並一次傳回一個項目,並從清單中移除每個項目。當清單為空時,它會傳回 null,從而滿足 ItemReader 的最基本要求,如下列測試程式碼所示

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

使 ItemReader 可重新啟動

最終的挑戰是使 ItemReader 可重新啟動。目前,如果處理中斷並重新開始,ItemReader 必須從頭開始。這在許多情境中實際上是有效的,但有時最好讓批次 Job 從上次中斷的地方重新開始。關鍵的區別通常在於 reader 是有狀態還是無狀態。無狀態 reader 不需要擔心可重新啟動性,但有狀態的 reader 必須嘗試在重新啟動時重建其最後已知的狀態。因此,我們建議您盡可能保持自訂 reader 為無狀態,這樣您就不必擔心可重新啟動性。

如果您確實需要儲存狀態,則應使用 ItemStream 介面

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (executionContext.containsKey(CURRENT_INDEX)) {
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else {
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

在每次呼叫 ItemStream update 方法時,ItemReader 的目前索引會以 'current.index' 的鍵儲存在提供的 ExecutionContext 中。當呼叫 ItemStream open 方法時,會檢查 ExecutionContext 以查看它是否包含具有該鍵的項目。如果找到該鍵,則目前索引會移至該位置。這是一個相當簡單的範例,但它仍然符合一般契約

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

大多數 ItemReader 都有更複雜的重新啟動邏輯。例如,JdbcCursorItemReader 會儲存游標中最後處理的列 ID。

還值得注意的是,ExecutionContext 中使用的鍵不應是微不足道的。這是因為相同的 ExecutionContext 用於 Step 中的所有 ItemStream。在大多數情況下,只需在鍵前面加上類別名稱就足以保證唯一性。但是,在極少數情況下,在同一個 step 中使用兩種相同類型的 ItemStream(如果輸出需要兩個檔案,可能會發生這種情況),則需要更唯一的名稱。因此,許多 Spring Batch ItemReaderItemWriter 實作都有一個 setName() 屬性,可讓覆寫此鍵名稱。

自訂 ItemWriter 範例

實作自訂 ItemWriter 在許多方面與上述 ItemReader 範例類似,但在某些方面有所不同,因此值得單獨舉例說明。但是,新增可重新啟動性基本上是相同的,因此本範例未涵蓋。與 ItemReader 範例一樣,為了使範例盡可能簡單,使用了 List

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(Chunk<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

使 ItemWriter 可重新啟動

為了使 ItemWriter 可重新啟動,我們將遵循與 ItemReader 相同的流程,新增和實作 ItemStream 介面以同步執行環境。在範例中,我們可能必須計算處理的項目數量,並將其新增為頁尾記錄。如果我們需要這樣做,我們可以在 ItemWriter 中實作 ItemStream,以便在重新開啟 stream 時,可以從執行環境重新構成計數器。

在許多實際情況下,自訂 ItemWriter 也會委派給另一個 writer,該 writer 本身是可重新啟動的(例如,在寫入檔案時),或者它寫入交易資源,因此不需要可重新啟動,因為它是無狀態的。當您有狀態 writer 時,您可能應該確定同時實作 ItemStreamItemWriter。還請記住,writer 的用戶端需要了解 ItemStream,因此您可能需要在組態中將其註冊為 stream。