Spring Cloud Stream 整合
任務本身可能很有用,但將任務整合到更大的生態系統中,使其可用於更複雜的處理和協調。本節介紹 Spring Cloud Task 與 Spring Cloud Stream 的整合選項。
從 Spring Cloud Stream 啟動任務
您可以從串流啟動任務。若要執行此操作,請建立一個接收器,以監聽包含 TaskLaunchRequest
作為其有效負載的訊息。TaskLaunchRequest
包含
-
uri
:要執行的任務工件。 -
applicationName
:與任務相關聯的名稱。如果未設定 applicationName,則TaskLaunchRequest
會產生一個由以下內容組成的任務名稱:Task-<UUID>
。 -
commandLineArguments
:包含任務命令列引數的清單。 -
environmentProperties
:包含任務要使用的環境變數的地圖。 -
deploymentProperties
:包含部署器用來部署任務的屬性的地圖。
如果有效負載的類型不同,則接收器會擲回例外。 |
例如,可以建立一個串流,其中包含一個處理器,該處理器從 HTTP 來源接收資料,並建立一個包含 TaskLaunchRequest
的 GenericMessage
,並將訊息傳送到其輸出通道。然後,任務接收器將從其輸入通道接收訊息,然後啟動任務。
若要建立 taskSink,您只需要建立一個包含 EnableTaskLauncher
註解的 Spring Boot 應用程式,如下列範例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 專案的 samples 模組包含範例 Sink 和 Processor。若要將這些範例安裝到您的本機 Maven 儲存庫,請從 spring-cloud-task-samples
目錄執行 Maven 建置,並將 skipInstall
屬性設定為 false
,如下列範例所示
mvn clean install
maven.remoteRepositories.springRepo.url 屬性必須設定為 Spring Boot Uber-jar 所在的遠端儲存庫的位置。如果未設定,則沒有遠端儲存庫,因此它僅依賴本機儲存庫。 |
Spring Cloud Data Flow
若要在 Spring Cloud Data Flow 中建立串流,您必須先註冊我們建立的 Task Sink 應用程式。在下列範例中,我們使用 Spring Cloud Data Flow Shell 註冊 Processor 和 Sink 範例應用程式
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
下列範例顯示如何從 Spring Cloud Data Flow Shell 建立串流
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
Spring Cloud Task 事件
當任務透過 Spring Cloud Stream 通道執行時,Spring Cloud Task 提供透過 Spring Cloud Stream 通道發出事件的功能。任務監聽器用於在名為 task-events
的訊息通道上發布 TaskExecution
。此功能自動裝配到任何具有 spring-cloud-stream
、spring-cloud-stream-<binder>
以及類別路徑上已定義任務的任務中。
若要停用事件發出監聽器,請將 spring.cloud.task.events.enabled 屬性設定為 false 。 |
透過定義適當的類別路徑,下列任務會在 task-events
通道上將 TaskExecution
作為事件發出(在任務的開始和結束時)
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
也需要在類別路徑上實作 Binder。 |
範例任務事件應用程式可以在 Spring Cloud Task Project 的 samples 模組中找到,此處。 |
Spring Batch 事件
透過任務執行 Spring Batch 工作時,可以將 Spring Cloud Task 配置為根據 Spring Batch 中可用的 Spring Batch 監聽器發出資訊訊息。具體來說,以下 Spring Batch 監聽器會自動配置到每個批次工作中,並在透過 Spring Cloud Task 執行時,在相關聯的 Spring Cloud Stream 通道上發出訊息
-
JobExecutionListener
監聽job-execution-events
-
StepExecutionListener
監聽step-execution-events
-
ChunkListener
監聽chunk-events
-
ItemReadListener
監聽item-read-events
-
ItemProcessListener
監聽item-process-events
-
ItemWriteListener
監聽item-write-events
-
SkipListener
監聽skip-events
當內容中存在適當的 Bean(Job
和 TaskLifecycleListener
)時,這些監聽器會自動配置到任何 AbstractJob
中。監聽這些事件的配置方式與繫結到任何其他 Spring Cloud Stream 通道的方式相同。我們的任務(執行批次工作的任務)充當 Source
,而監聽應用程式充當 Processor
或 Sink
。
一個範例可能是讓應用程式監聽 job-execution-events
通道,以取得工作的開始和停止。若要配置監聽應用程式,您會將輸入配置為 job-execution-events
,如下所示
spring.cloud.stream.bindings.input.destination=job-execution-events
也需要在類別路徑上實作 Binder。 |
範例批次事件應用程式可以在 Spring Cloud Task Project 的 samples 模組中找到,此處。 |
將批次事件傳送到不同的通道
Spring Cloud Task 為批次事件提供的選項之一是能夠變更特定監聽器可以發出訊息的通道。若要執行此操作,請使用下列配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例如,如果 StepExecutionListener
需要將其訊息發出到另一個名為 my-step-execution-events
的通道,而不是預設的 step-execution-events
,您可以新增下列配置
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
停用批次事件
若要停用所有批次事件的監聽器功能,請使用下列配置
spring.cloud.task.batch.events.enabled=false
若要停用特定批次事件,請使用下列配置
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
下列清單顯示您可以停用的個別監聽器
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
批次事件的發出順序
依預設,批次事件具有 Ordered.LOWEST_PRECEDENCE
。若要變更此值(例如,變更為 5),請使用下列配置
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5