批次

本節將更詳細地介紹 Spring Cloud Task 與 Spring Batch 的整合。本節涵蓋了追蹤任務執行與其執行的工作之間的關聯,以及透過 Spring Cloud Deployer 進行遠端分割。

將工作執行與其執行的任務相關聯

Spring Boot 提供了在 Spring Boot Uber-jar 內執行批次工作的機制。Spring Boot 對此功能的支援讓開發人員可以在該執行中執行多個批次工作。Spring Cloud Task 提供了將工作執行與任務執行相關聯的能力,以便可以追溯彼此。

Spring Cloud Task 透過使用 TaskBatchExecutionListener 來實現此功能。預設情況下,此監聽器會在任何同時配置了 Spring Batch 工作 (透過在上下文中定義 Job 類型的 bean) 和類路徑上具有 spring-cloud-task-batch jar 的上下文中自動配置。監聽器會被注入到所有符合這些條件的工作中。

覆寫 TaskBatchExecutionListener

為了防止監聽器被注入到目前上下文中的任何批次工作中,您可以使用標準的 Spring Boot 機制來停用自動配置。

若要僅將監聽器注入到上下文中的特定工作,請覆寫 batchTaskExecutionListenerBeanPostProcessor 並提供工作 bean ID 的清單,如下例所示

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
	TaskBatchExecutionListenerBeanPostProcessor postProcessor =
		new TaskBatchExecutionListenerBeanPostProcessor();

	postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

	return postProcessor;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到範例批次應用程式,此處

遠端分割

Spring Cloud Deployer 提供了在大多數雲端基礎架構上啟動基於 Spring Boot 的應用程式的機制。DeployerPartitionHandlerDeployerStepExecutionHandler 將啟動 worker 步驟執行的任務委派給 Spring Cloud Deployer。

若要配置 DeployerStepExecutionHandler,您必須提供一個 Resource,表示要執行的 Spring Boot Uber-jar、一個 TaskLauncherHandler 和一個 JobExplorer。您可以配置任何環境屬性,以及一次執行的 worker 最大數量、輪詢結果的間隔 (預設為 10 秒) 和逾時時間 (預設為 -1 或無逾時)。以下範例顯示了如何配置此 PartitionHandler

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
		JobExplorer jobExplorer) throws Exception {

	MavenProperties mavenProperties = new MavenProperties();
	mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
		new MavenProperties.RemoteRepository(repository))));

 	Resource resource =
		MavenResource.parse(String.format("%s:%s:%s",
				"io.spring.cloud",
				"partitioned-batch-job",
				"1.1.0.RELEASE"), mavenProperties);

	DeployerPartitionHandler partitionHandler =
		new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

	List<String> commandLineArgs = new ArrayList<>(3);
	commandLineArgs.add("--spring.profiles.active=worker");
	commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
	commandLineArgs.add("--spring.batch.initializer.enabled=false");

	partitionHandler.setCommandLineArgsProvider(
		new PassThroughCommandLineArgsProvider(commandLineArgs));
	partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
	partitionHandler.setMaxWorkers(2);
	partitionHandler.setApplicationName("PartitionedBatchJobTask");

	return partitionHandler;
}
當將環境變數傳遞給分割區時,每個分割區可能位於具有不同環境設定的不同機器上。因此,您應該僅傳遞那些必要的環境變數。

請注意,在上面的範例中,我們已將 worker 的最大數量設定為 2。設定 worker 的最大數量建立了應一次執行的分割區的最大數量。

要執行的 Resource 預期是一個 Spring Boot Uber-jar,其中 DeployerStepExecutionHandler 配置為目前上下文中的 CommandLineRunner。前面範例中列舉的儲存庫應該是 Spring Boot Uber-jar 所在的遠端儲存庫。管理員和 worker 都應具有對同一個資料儲存區的能見度,該資料儲存區正被用作工作儲存庫和任務儲存庫。一旦底層基礎架構啟動了 Spring Boot jar,並且 Spring Boot 啟動了 DeployerStepExecutionHandler,步驟處理常式就會執行請求的 Step。以下範例顯示了如何配置 DeployerStepExecutionHandler

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
	DeployerStepExecutionHandler handler =
		new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

	return handler;
}
您可以在 Spring Cloud Task 專案的 samples 模組中找到範例遠端分割應用程式,此處

非同步啟動遠端批次分割區

預設情況下,批次分割區會循序啟動。但是,在某些情況下,這可能會影響效能,因為每次啟動都會阻塞,直到資源 (例如:在 Kubernetes 中佈建 pod) 被佈建。在這些情況下,您可以為 DeployerPartitionHandler 提供 ThreadPoolTaskExecutor。這將根據 ThreadPoolTaskExecutor 的配置啟動遠端批次分割區。例如

	@Bean
	public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(4);
		executor.setThreadNamePrefix("default_task_executor_thread");
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.initialize();
		return executor;
	}

	@Bean
	public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
		TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
		Resource resource = this.resourceLoader
			.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

		DeployerPartitionHandler partitionHandler =
			new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
				"workerStep", taskRepository, executor);
	...
	}
我們需要關閉上下文,因為使用 ThreadPoolTaskExecutor 會留下一個活動的執行緒,因此應用程式不會終止。為了適當地關閉應用程式,我們需要將 spring.cloud.task.closecontextEnabled 屬性設定為 true

關於為 Kubernetes 平台開發批次分割應用程式的注意事項

  • 在 Kubernetes 平台上部署分割應用程式時,您必須為 Spring Cloud Kubernetes Deployer 使用以下依賴項

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
    </dependency>
  • 任務應用程式及其分割區的應用程式名稱需要遵循以下 regex 模式:[a-z0-9]([-a-z0-9]*[a-z0-9])。否則,將會擲回例外。

批次資訊訊息

Spring Cloud Task 提供了批次工作發出資訊訊息的功能。“Spring Batch 事件”章節詳細介紹了此功能。

批次工作結束代碼

先前所述,Spring Cloud Task 應用程式支援記錄任務執行的結束代碼的功能。但是,在您在任務中執行 Spring Batch 工作的情況下,無論批次工作執行如何完成,當使用預設的 Batch/Boot 行為時,任務的結果始終為零。請記住,任務是一個 boot 應用程式,並且從任務傳回的結束代碼與 boot 應用程式相同。若要覆寫此行為,並允許任務在批次工作傳回 BatchStatusFAILED 時傳回非零的結束代碼,請將 spring.cloud.task.batch.fail-on-job-failure 設定為 true。然後,結束代碼可以是 1 (預設值) 或基於指定的 ExitCodeGenerator)

此功能使用新的 ApplicationRunner,取代 Spring Boot 提供的 ApplicationRunner。預設情況下,它配置為相同的順序。但是,如果您想自訂 ApplicationRunner 運行的順序,您可以透過設定 spring.cloud.task.batch.applicationRunnerOrder 屬性來設定其順序。若要讓您的任務根據批次工作執行的結果傳回結束代碼,您需要編寫自己的 CommandLineRunner