透過訊息啟動批次工作

當使用核心 Spring Batch API 啟動批次工作時,您基本上有兩個選項

  • 從命令列,使用 CommandLineJobRunner

  • 以程式設計方式,使用 JobOperator.start()JobLauncher.run()

例如,當使用 shell 腳本調用批次工作時,您可能想要使用 CommandLineJobRunner。或者,您可以直接使用 JobOperator (例如,當使用 Spring Batch 作為 Web 應用程式的一部分時)。但是,更複雜的使用案例呢?也許您需要輪詢遠端 (S)FTP 伺服器以檢索批次工作所需的資料,或者您的應用程式必須同時支援多個不同的資料來源。例如,您可能不僅從 Web 接收資料檔案,還從 FTP 和其他來源接收。在調用 Spring Batch 之前,可能還需要對輸入檔案進行額外的轉換。

因此,使用 Spring Integration 及其眾多适配器執行批次工作會更強大。例如,您可以使用檔案輸入通道适配器來監控檔案系統中的目錄,並在輸入檔案到達時立即啟動批次工作。此外,您可以建立 Spring Integration 流程,使用多個不同的适配器,僅通過配置即可輕鬆地從多個來源同時擷取批次工作的資料。使用 Spring Integration 實作所有這些情境都很容易,因為它允許 JobLauncher 的解耦、事件驅動執行。

Spring Batch Integration 提供了 JobLaunchingMessageHandler 類別,您可以使用它來啟動批次工作。JobLaunchingMessageHandler 的輸入由 Spring Integration 訊息提供,該訊息的有效負載類型為 JobLaunchRequest。此類別是圍繞要啟動的 Job 和啟動批次工作所需的 JobParameters 的包裝器。

下圖顯示了啟動批次工作所需的典型 Spring Integration 訊息流程。EIP (企業整合模式) 網站提供了訊息圖示及其描述的完整概述。

Launch Batch Job
圖 1. 啟動批次工作

將檔案轉換為 JobLaunchRequest

以下範例將檔案轉換為 JobLaunchRequest

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

JobExecution 回應

當批次工作正在執行時,將會傳回 JobExecution 實例。您可以使用此實例來判斷執行的狀態。如果可以成功建立 JobExecution,則始終會傳回它,無論實際執行是否成功。

關於如何傳回 JobExecution 實例的確切行為取決於提供的 TaskExecutor。如果使用 synchronous (單執行緒) TaskExecutor 實作,則僅在工作完成才會傳回 JobExecution 回應。當使用 asynchronous TaskExecutor 時,JobExecution 實例會立即傳回。然後,您可以取得 JobExecution 實例的 id (使用 JobExecution.getJobId()) 並使用 JobExplorer 查詢 JobRepository 以取得工作的更新狀態。如需更多資訊,請參閱 查詢儲存庫

Spring Batch Integration 配置

考慮這樣一種情況:有人需要建立檔案 inbound-channel-adapter 以監聽所提供目錄中的 CSV 檔案,將它們交給轉換器 (FileMessageToJobRequest),通過工作啟動閘道啟動工作,並使用 logging-channel-adapter 記錄 JobExecution 的輸出。

  • Java

  • XML

以下範例展示了如何在 Java 中配置常見案例

Java 配置
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

以下範例展示了如何在 XML 中配置常見案例

XML 配置
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

範例 ItemReader 配置

現在我們正在輪詢檔案並啟動工作,我們需要配置 Spring Batch ItemReader (例如) 以使用在工作參數「input.file.name」定義的位置找到的檔案,如下面的 Bean 配置所示

  • Java

  • XML

以下 Java 範例顯示了必要的 Bean 配置

Java 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

以下 XML 範例顯示了必要的 Bean 配置

XML 配置
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

先前範例中主要的重點是將 #{jobParameters['input.file.name']} 的值注入為 Resource 屬性值,並將 ItemReader Bean 設定為具有 Step scope。將 Bean 設定為具有 Step scope 可以利用延遲綁定支援,從而允許存取 jobParameters 變數。