建立自訂 ItemReader 和 ItemWriter
到目前為止,本章已討論了 Spring Batch 中讀取和寫入的基本契約,以及一些常見的實作方式。然而,這些都相當通用,並且有許多潛在情境可能未包含在現成的實作中。本節透過一個簡單的範例,展示如何建立自訂 ItemReader
和 ItemWriter
實作,並正確實作其契約。 ItemReader
也實作了 ItemStream
,以便說明如何使 reader 或 writer 可重新啟動。
自訂 ItemReader
範例
為了本範例的目的,我們建立一個簡單的 ItemReader
實作,從提供的清單中讀取。我們從實作 ItemReader
最基本的契約,即 read
方法開始,如下列程式碼所示
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
上述類別接受一個項目清單,並一次傳回一個項目,並從清單中移除每個項目。當清單為空時,它會傳回 null
,從而滿足 ItemReader
的最基本要求,如下列測試程式碼所示
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使 ItemReader
可重新啟動
最終的挑戰是使 ItemReader
可重新啟動。目前,如果處理中斷並重新開始,ItemReader
必須從頭開始。這在許多情境中實際上是有效的,但有時最好讓批次 Job 從上次中斷的地方重新開始。關鍵的區別通常在於 reader 是有狀態還是無狀態。無狀態 reader 不需要擔心可重新啟動性,但有狀態的 reader 必須嘗試在重新啟動時重建其最後已知的狀態。因此,我們建議您盡可能保持自訂 reader 為無狀態,這樣您就不必擔心可重新啟動性。
如果您確實需要儲存狀態,則應使用 ItemStream
介面
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
在每次呼叫 ItemStream
update
方法時,ItemReader
的目前索引會以 'current.index' 的鍵儲存在提供的 ExecutionContext
中。當呼叫 ItemStream
open
方法時,會檢查 ExecutionContext
以查看它是否包含具有該鍵的項目。如果找到該鍵,則目前索引會移至該位置。這是一個相當簡單的範例,但它仍然符合一般契約
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多數 ItemReader
都有更複雜的重新啟動邏輯。例如,JdbcCursorItemReader
會儲存游標中最後處理的列 ID。
還值得注意的是,ExecutionContext
中使用的鍵不應是微不足道的。這是因為相同的 ExecutionContext
用於 Step
中的所有 ItemStream
。在大多數情況下,只需在鍵前面加上類別名稱就足以保證唯一性。但是,在極少數情況下,在同一個 step 中使用兩種相同類型的 ItemStream
(如果輸出需要兩個檔案,可能會發生這種情況),則需要更唯一的名稱。因此,許多 Spring Batch ItemReader
和 ItemWriter
實作都有一個 setName()
屬性,可讓覆寫此鍵名稱。
自訂 ItemWriter
範例
實作自訂 ItemWriter
在許多方面與上述 ItemReader
範例類似,但在某些方面有所不同,因此值得單獨舉例說明。但是,新增可重新啟動性基本上是相同的,因此本範例未涵蓋。與 ItemReader
範例一樣,為了使範例盡可能簡單,使用了 List
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使 ItemWriter
可重新啟動
為了使 ItemWriter
可重新啟動,我們將遵循與 ItemReader
相同的流程,新增和實作 ItemStream
介面以同步執行環境。在範例中,我們可能必須計算處理的項目數量,並將其新增為頁尾記錄。如果我們需要這樣做,我們可以在 ItemWriter
中實作 ItemStream
,以便在重新開啟 stream 時,可以從執行環境重新構成計數器。
在許多實際情況下,自訂 ItemWriter
也會委派給另一個 writer,該 writer 本身是可重新啟動的(例如,在寫入檔案時),或者它寫入交易資源,因此不需要可重新啟動,因為它是無狀態的。當您有狀態 writer 時,您可能應該確定同時實作 ItemStream
和 ItemWriter
。還請記住,writer 的用戶端需要了解 ItemStream
,因此您可能需要在組態中將其註冊為 stream。