Spring Cloud Stream 整合

任務本身可能很有用,但將任務整合到更大的生態系統中,使其可用於更複雜的處理和協調。本節介紹 Spring Cloud Task 與 Spring Cloud Stream 的整合選項。

從 Spring Cloud Stream 啟動任務

您可以從串流啟動任務。若要執行此操作,請建立一個接收器,以監聽包含 TaskLaunchRequest 作為其有效負載的訊息。TaskLaunchRequest 包含

  • uri:要執行的任務工件。

  • applicationName:與任務相關聯的名稱。如果未設定 applicationName,則 TaskLaunchRequest 會產生一個由以下內容組成的任務名稱:Task-<UUID>

  • commandLineArguments:包含任務命令列引數的清單。

  • environmentProperties:包含任務要使用的環境變數的地圖。

  • deploymentProperties:包含部署器用來部署任務的屬性的地圖。

如果有效負載的類型不同,則接收器會擲回例外。

例如,可以建立一個串流,其中包含一個處理器,該處理器從 HTTP 來源接收資料,並建立一個包含 TaskLaunchRequestGenericMessage,並將訊息傳送到其輸出通道。然後,任務接收器將從其輸入通道接收訊息,然後啟動任務。

若要建立 taskSink,您只需要建立一個包含 EnableTaskLauncher 註解的 Spring Boot 應用程式,如下列範例所示

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
	public static void main(String[] args) {
		SpringApplication.run(TaskSinkApplication.class, args);
	}
}

Spring Cloud Task 專案的 samples 模組包含範例 Sink 和 Processor。若要將這些範例安裝到您的本機 Maven 儲存庫,請從 spring-cloud-task-samples 目錄執行 Maven 建置,並將 skipInstall 屬性設定為 false,如下列範例所示

mvn clean install

maven.remoteRepositories.springRepo.url 屬性必須設定為 Spring Boot Uber-jar 所在的遠端儲存庫的位置。如果未設定,則沒有遠端儲存庫,因此它僅依賴本機儲存庫。

Spring Cloud Data Flow

若要在 Spring Cloud Data Flow 中建立串流,您必須先註冊我們建立的 Task Sink 應用程式。在下列範例中,我們使用 Spring Cloud Data Flow Shell 註冊 Processor 和 Sink 範例應用程式

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

下列範例顯示如何從 Spring Cloud Data Flow Shell 建立串流

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

Spring Cloud Task 事件

當任務透過 Spring Cloud Stream 通道執行時,Spring Cloud Task 提供透過 Spring Cloud Stream 通道發出事件的功能。任務監聽器用於在名為 task-events 的訊息通道上發布 TaskExecution。此功能自動裝配到任何具有 spring-cloud-streamspring-cloud-stream-<binder> 以及類別路徑上已定義任務的任務中。

若要停用事件發出監聽器,請將 spring.cloud.task.events.enabled 屬性設定為 false

透過定義適當的類別路徑,下列任務會在 task-events 通道上將 TaskExecution 作為事件發出(在任務的開始和結束時)

@SpringBootApplication
public class TaskEventsApplication {

	public static void main(String[] args) {
		SpringApplication.run(TaskEventsApplication.class, args);
	}

	@Configuration
	public static class TaskConfiguration {

		@Bean
		public ApplicationRunner applicationRunner() {
			return new ApplicationRunner() {
				@Override
				public void run(ApplicationArguments args) {
					System.out.println("The ApplicationRunner was executed");
				}
			};
		}
	}
}
也需要在類別路徑上實作 Binder。
範例任務事件應用程式可以在 Spring Cloud Task Project 的 samples 模組中找到,此處

停用特定任務事件

若要停用任務事件,您可以將 spring.cloud.task.events.enabled 屬性設定為 false

Spring Batch 事件

透過任務執行 Spring Batch 工作時,可以將 Spring Cloud Task 配置為根據 Spring Batch 中可用的 Spring Batch 監聽器發出資訊訊息。具體來說,以下 Spring Batch 監聽器會自動配置到每個批次工作中,並在透過 Spring Cloud Task 執行時,在相關聯的 Spring Cloud Stream 通道上發出訊息

  • JobExecutionListener 監聽 job-execution-events

  • StepExecutionListener 監聽 step-execution-events

  • ChunkListener 監聽 chunk-events

  • ItemReadListener 監聽 item-read-events

  • ItemProcessListener 監聽 item-process-events

  • ItemWriteListener 監聽 item-write-events

  • SkipListener 監聽 skip-events

當內容中存在適當的 Bean(JobTaskLifecycleListener)時,這些監聽器會自動配置到任何 AbstractJob 中。監聽這些事件的配置方式與繫結到任何其他 Spring Cloud Stream 通道的方式相同。我們的任務(執行批次工作的任務)充當 Source,而監聽應用程式充當 ProcessorSink

一個範例可能是讓應用程式監聽 job-execution-events 通道,以取得工作的開始和停止。若要配置監聽應用程式,您會將輸入配置為 job-execution-events,如下所示

spring.cloud.stream.bindings.input.destination=job-execution-events

也需要在類別路徑上實作 Binder。
範例批次事件應用程式可以在 Spring Cloud Task Project 的 samples 模組中找到,此處

將批次事件傳送到不同的通道

Spring Cloud Task 為批次事件提供的選項之一是能夠變更特定監聽器可以發出訊息的通道。若要執行此操作,請使用下列配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要將其訊息發出到另一個名為 my-step-execution-events 的通道,而不是預設的 step-execution-events,您可以新增下列配置

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

停用批次事件

若要停用所有批次事件的監聽器功能,請使用下列配置

spring.cloud.task.batch.events.enabled=false

若要停用特定批次事件,請使用下列配置

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

下列清單顯示您可以停用的個別監聽器

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

批次事件的發出順序

依預設,批次事件具有 Ordered.LOWEST_PRECEDENCE。若要變更此值(例如,變更為 5),請使用下列配置

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5