攔截 Step 執行

如同 Job,在 Step 執行期間也有許多事件,使用者可能需要在這些事件中執行某些功能。例如,若要寫出需要頁尾的平面檔案,則需要在 Step 完成時通知 ItemWriter,以便寫入頁尾。這可以使用許多 Step 範圍的監聽器之一來完成。

您可以透過 listeners 元素,將任何實作 StepListener 擴展介面(但不是該介面本身,因為它是空的)的類別應用於步驟。 listeners 元素在 step、tasklet 或 chunk 宣告中皆有效。我們建議您在應用程式功能所在的層級宣告監聽器,或者,如果它是多功能的(例如 StepExecutionListenerItemReadListener),則在它應用程式的最細微層級宣告。

  • Java

  • XML

以下範例顯示在 Java 中應用於 chunk 層級的監聽器

Java 設定
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

以下範例顯示在 XML 中應用於 chunk 層級的監聽器

XML 設定
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

若使用命名空間 <step> 元素或 *StepFactoryBean 工廠,則實作 StepListener 介面之一的 ItemReaderItemWriterItemProcessor 會自動向 Step 註冊。這僅適用於直接注入到 Step 中的元件。如果監聽器巢狀在另一個元件中,則需要明確註冊它(如先前在 Step 註冊 ItemStream 下所述)。

除了 StepListener 介面之外,還提供了註解來解決相同的問題。Plain old Java 物件可以具有帶有這些註解的方法,然後將這些方法轉換為對應的 StepListener 類型。註解自訂的 chunk 元件實作(例如 ItemReaderItemWriterTasklet)也很常見。XML 解析器會分析 <listener/> 元素的註解,並將其註冊到 builder 中的 listener 方法,因此您只需使用 XML 命名空間或 builder 向步驟註冊監聽器即可。

StepExecutionListener

StepExecutionListener 代表 Step 執行最通用的監聽器。它允許在 Step 開始之前和結束之後(無論是正常結束還是失敗)收到通知,如下例所示

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus 具有 afterStep 的返回類型,讓監聽器有機會修改在 Step 完成時返回的退出代碼。

與此介面對應的註解為

  • @BeforeStep

  • @AfterStep

ChunkListener

「chunk」定義為在交易範圍內處理的項目。在每個 commit 間隔 commit 交易會 commit 一個 chunk。您可以使用 ChunkListener 在 chunk 開始處理之前或在 chunk 成功完成之後執行邏輯,如下列介面定義所示

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

在交易開始後但在 ItemReader 開始讀取之前,會呼叫 beforeChunk 方法。相反地,在 chunk commit 後(如果發生 rollback,則完全不會呼叫),會呼叫 afterChunk

與此介面對應的註解為

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

當沒有 chunk 宣告時,您可以應用 ChunkListenerTaskletStep 負責呼叫 ChunkListener,因此它也適用於非項目導向的 tasklet(在 tasklet 之前和之後都會呼叫)。

ItemReadListener

在先前討論 skip 邏輯時,曾提及記錄 skip 的記錄可能是有益的,以便稍後可以處理這些記錄。在讀取錯誤的情況下,可以使用 ItemReaderListener 來完成,如下列介面定義所示

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

在每次呼叫 ItemReader 上的 read 之前,都會呼叫 beforeRead 方法。在每次成功呼叫 read 之後,都會呼叫 afterRead 方法,並傳遞讀取的項目。如果在讀取時發生錯誤,則會呼叫 onReadError 方法。提供的例外狀況可以用於記錄。

與此介面對應的註解為

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 相同,也可以「監聽」項目的處理,如下列介面定義所示

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

ItemProcessor 上的 process 之前,會呼叫 beforeProcess 方法,並傳遞要處理的項目。在項目成功處理後,會呼叫 afterProcess 方法。如果在處理時發生錯誤,則會呼叫 onProcessError 方法。提供的例外狀況和嘗試處理的項目可以用於記錄。

與此介面對應的註解為

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

您可以使用 ItemWriteListener「監聽」項目的寫入,如下列介面定義所示

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

ItemWriter 上的 write 之前,會呼叫 beforeWrite 方法,並傳遞要寫入的項目列表。在項目成功寫入後但在 commit 與 chunk 處理相關聯的交易之前,會呼叫 afterWrite 方法。如果在寫入時發生錯誤,則會呼叫 onWriteError 方法。提供的例外狀況和嘗試寫入的項目可以用於記錄。

與此介面對應的註解為

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供機制來通知錯誤,但沒有一個會通知您記錄實際上已被 skip。例如,即使項目已重試並成功,也會呼叫 onWriteError。因此,有一個單獨的介面用於追蹤 skip 的項目,如下列介面定義所示

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

每當在讀取時 skip 項目時,都會呼叫 onSkipInRead。應注意,rollback 可能會導致同一個項目被註冊為 skip 多次。當在寫入時 skip 項目時,會呼叫 onSkipInWrite。由於項目已成功讀取(且未 skip),因此也會將項目本身作為引數提供。

與此介面對應的註解為

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListeners 和交易

SkipListener 最常見的用例之一是記錄 skip 的項目,以便可以使用另一個批次處理或甚至人工處理來評估和修正導致 skip 的問題。由於在許多情況下,原始交易可能會 rollback,因此 Spring Batch 做出兩個保證

  • 每個項目只會呼叫一次適當的 skip 方法(取決於錯誤發生的時間)。

  • 總是在 commit 交易之前呼叫 SkipListener。這是為了確保監聽器呼叫的任何交易資源不會因 ItemWriter 中的失敗而 rollback。