版本 3.0.4
© 2009-2022 VMware, Inc. 保留所有權利。
您可以為了個人使用以及散佈給他人而複製本文件,前提是您不得針對此類副本收取任何費用,且進一步前提是每個副本都包含本著作權聲明,無論是以印刷或電子方式散佈。
序言
1. 關於本文件
Spring Cloud Task 參考指南提供 html、pdf 和 epub 格式。最新副本可在 docs.spring.io/spring-cloud-task/docs/current-SNAPSHOT/reference/html/ 取得。
您可以為了個人使用以及散佈給他人而複製本文件,前提是您不得針對此類副本收取任何費用,且進一步前提是每個副本都包含本著作權聲明,無論是以印刷或電子方式散佈。
2. 取得協助
在使用 Spring Cloud Task 時遇到問題嗎?我們很樂意協助您!
-
提出問題。我們會監控 stackoverflow.com 上標記為
spring-cloud-task
的問題。 -
回報 Spring Cloud Task 的錯誤,請至 github.com/spring-cloud/spring-cloud-task/issues。
Spring Cloud Task 的所有內容(包括文件)都是開放原始碼。如果您發現文件有問題,或者只是想改進它們,請參與貢獻。 |
3. 初始步驟
如果您剛開始使用 Spring Cloud Task 或一般的 'Spring',我們建議您閱讀開始入門章節。
若要從頭開始,請閱讀以下章節
若要依照教學課程進行,請閱讀開發您的第一個 Spring Cloud Task 應用程式
若要執行您的範例,請閱讀執行範例
開始入門
如果您剛開始使用 Spring Cloud Task,您應該閱讀本節。在這裡,我們回答基本的「是什麼?」、「如何?」和「為什麼?」問題。我們先簡要介紹 Spring Cloud Task。然後,我們將建置一個 Spring Cloud Task 應用程式,並在過程中討論一些核心原則。
4. Spring Cloud Task 簡介
Spring Cloud Task 讓建立生命週期短暫的微服務變得容易。它提供的功能可讓生命週期短暫的 JVM 處理程序在生產環境中按需執行。
5. 系統需求
您需要安裝 Java (Java 17 或更高版本)。若要建置,您也需要安裝 Maven。
5.1. 資料庫需求
Spring Cloud Task 使用關聯式資料庫來儲存已執行任務的結果。雖然您可以開始在沒有資料庫的情況下開發任務 (任務的狀態會記錄為任務儲存庫更新的一部分),但在生產環境中,您會想要使用受支援的資料庫。Spring Cloud Task 目前支援以下資料庫
-
DB2
-
H2
-
HSQLDB
-
MySql
-
Oracle
-
Postgres
-
SqlServer
6. 開發您的第一個 Spring Cloud Task 應用程式
一個好的起點是簡單的「Hello, World!」應用程式,因此我們建立 Spring Cloud Task 等效的應用程式,以突顯框架的功能。大多數 IDE 都對 Apache Maven 有良好的支援,因此我們將其用作此專案的建置工具。
spring.io 網站包含許多使用 Spring Boot 的 「Getting Started 」指南。如果您需要解決特定問題,請先查看那裡。您可以前往 Spring Initializr 並建立新專案,以縮短以下步驟。這樣做會自動產生新的專案結構,以便您可以立即開始編碼。我們建議您試用 Spring Initializr 以熟悉它。 |
6.1. 使用 Spring Initializr 建立 Spring Task 專案
現在我們可以建立並測試一個將 Hello, World!
列印到主控台的應用程式。
若要執行此操作
-
請造訪 Spring Initialzr 網站。
-
建立新的 Maven 專案,Group 名稱設為
io.spring.demo
,Artifact 名稱設為helloworld
。 -
在 Dependencies 文字方塊中,輸入
task
,然後選取Cloud Task
相依性。 -
在 Dependencies 文字方塊中,輸入
jdbc
,然後選取JDBC
相依性。 -
在 Dependencies 文字方塊中,輸入
h2
,然後選取H2
。(或您最愛的資料庫) -
按一下 Generate Project 按鈕
-
-
解壓縮 helloworld.zip 檔案,並將專案匯入您最愛的 IDE。
6.2. 編寫程式碼
若要完成我們的應用程式,我們需要使用以下內容更新產生的 HelloworldApplication
,使其啟動 Task (任務)。
package io.spring.Helloworld;
@SpringBootApplication
@EnableTask
public class HelloworldApplication {
@Bean
public ApplicationRunner applicationRunner() {
return new HelloWorldApplicationRunner();
}
public static void main(String[] args) {
SpringApplication.run(HelloworldApplication.class, args);
}
public static class HelloWorldApplicationRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("Hello, World!");
}
}
}
雖然它看起來很小,但實際上發生了很多事情。如需更多關於 Spring Boot 特定的資訊,請參閱 Spring Boot 參考文件。
現在我們可以開啟 src/main/resources
中的 application.properties
檔案。我們需要在 application.properties
中組態兩個屬性
-
application.name
:設定應用程式名稱 (會轉譯為任務名稱) -
logging.level
:將 Spring Cloud Task 的記錄設定為DEBUG
,以便檢視正在發生的情況。
以下範例顯示如何同時執行這兩者
logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld
6.2.1. Task Auto Configuration (任務自動組態)
當包含 Spring Cloud Task Starter 相依性時,Task (任務) 會自動組態所有 bean 以引導其功能。此組態的一部分會註冊 TaskRepository
及其基礎架構以供使用。
在我們的示範中,TaskRepository
使用內嵌的 H2 資料庫來記錄任務的結果。此 H2 內嵌資料庫並非生產環境的實用解決方案,因為 H2 DB 在任務結束後就會消失。但是,為了快速入門體驗,我們可以在範例中使用它,並將儲存庫中更新的內容回顯到記錄中。在組態章節 (本文件稍後部分),我們將涵蓋如何自訂 Spring Cloud Task 提供的元件組態。
當我們的範例應用程式執行時,Spring Boot 會啟動我們的 HelloWorldCommandLineRunner
,並將「Hello, World!」訊息輸出到標準輸出。TaskLifecycleListener
會記錄任務的開始和任務在儲存庫中的結束。
6.2.2. main 方法
main 方法作為任何 Java 應用程式的進入點。我們的 main 方法委派給 Spring Boot 的 SpringApplication 類別。
6.2.3. ApplicationRunner
Spring 包含許多引導應用程式邏輯的方法。Spring Boot 提供了一種方便的方法,可透過其 *Runner
介面 (CommandLineRunner
或 ApplicationRunner
) 以有組織的方式執行此操作。行為良好的任務可以使用這兩個 Runner (執行器) 之一來引導任何邏輯。
任務的生命週期從 *Runner#run
方法執行之前,到所有方法完成之後才算完成。Spring Boot 允許應用程式使用多個 *Runner
實作,Spring Cloud Task 也是如此。
從 CommandLineRunner 或 ApplicationRunner 以外的機制 (例如,使用 InitializingBean#afterPropertiesSet ) 引導的任何處理都不會由 Spring Cloud Task 記錄。 |
6.3. 執行範例
此時,我們的應用程式應該可以運作了。由於此應用程式是以 Spring Boot 為基礎,我們可以從命令列執行它,方法是從應用程式的根目錄使用 $ mvn spring-boot:run
,如下列範例所示 (及其輸出)
$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.3.RELEASE)
2018-07-23 17:44:34.426 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472 INFO 1978 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2018-07-23 17:44:35.410 INFO 1978 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [ main] o.s.c.t.c.SimpleTaskConfiguration : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [ main] o.s.c.t.c.DefaultTaskConfigurer : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [ main] o.s.c.t.r.s.TaskRepositoryInitializer : Initializing task schema for h2 database
2018-07-23 17:44:35.525 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558 INFO 1978 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733 INFO 1978 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738 INFO 1978 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772 INFO 1978 --- [ main] i.s.d.helloworld.HelloworldApplication : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [ main] o.s.c.t.r.support.SimpleTaskRepository : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}
前面的輸出中有三行是我們這裡感興趣的
-
SimpleTaskRepository
記錄了TaskRepository
中項目的建立。 -
CommandLineRunner
的執行,由「Hello, World!」輸出示範。 -
SimpleTaskRepository
記錄了TaskRepository
中任務的完成。
簡單的任務應用程式可以在 Spring Cloud Task 專案的範例模組中找到 這裡。 |
功能
本節更詳細地介紹 Spring Cloud Task,包括如何使用它、如何組態它以及適當的擴充點。
7. Spring Cloud Task 的生命週期
在大多數情況下,現代雲端環境是圍繞預期不會結束的處理程序執行而設計的。如果它們確實結束,通常會重新啟動它們。雖然大多數平台確實有一些方法可以執行在結束時不會重新啟動的處理程序,但該執行的結果通常不會以可消費的方式維護。Spring Cloud Task 提供在環境中執行生命週期短暫的處理程序並記錄結果的功能。這樣做可以透過訊息整合任務,圍繞生命週期短暫的處理程序以及較長時間執行的服務建立微服務架構。
雖然此功能在雲端環境中很有用,但在傳統部署模型中也可能出現相同的問題。當使用排程器 (例如 cron) 執行 Spring Boot 應用程式時,能夠在應用程式完成後監控其結果可能會很有用。
Spring Cloud Task 採用 Spring Boot 應用程式可以有開始和結束並且仍然成功的做法。批次應用程式是一個範例,說明預期會結束 (且通常生命週期短暫) 的處理程序如何有幫助。
Spring Cloud Task 記錄給定任務的生命週期事件。大多數長時間執行的處理程序 (以大多數 Web 應用程式為代表) 不會儲存其生命週期事件。Spring Cloud Task 核心的任務會儲存。
生命週期包含單個任務執行。這是組態為任務的 Spring Boot 應用程式的實體執行 (也就是說,它具有 Sprint Cloud Task 相依性)。
在任務開始時,在執行任何 CommandLineRunner
或 ApplicationRunner
實作之前,會在 TaskRepository
中建立一個項目,以記錄開始事件。此事件是透過 Spring Framework 觸發的 SmartLifecycle#start
觸發的。這向系統指示所有 bean 都已準備好使用,並且在執行 Spring Boot 提供的任何 CommandLineRunner
或 ApplicationRunner
實作之前發生。
任務的記錄只會在成功引導 ApplicationContext 時發生。如果 Context (上下文) 完全無法引導,則不會記錄任務的執行。 |
在完成 Spring Boot 的所有 *Runner#run
呼叫或 ApplicationContext
失敗 (由 ApplicationFailedEvent
指示) 時,任務執行會在儲存庫中更新結果。
如果應用程式需要在任務完成時 (已呼叫所有 *Runner#run 方法且任務儲存庫已更新) 關閉 ApplicationContext ,請將屬性 spring.cloud.task.closecontextEnabled 設定為 true。 |
7.1. TaskExecution (任務執行)
儲存在 TaskRepository
中的資訊在 TaskExecution
類別中建模,並包含以下資訊
欄位 | 描述 |
---|---|
|
任務執行的唯一 ID。 |
|
從 |
|
任務的名稱,由組態的 |
|
任務開始的時間,由 |
|
任務完成的時間,由 |
|
結束時可用的任何資訊。這可以透過 |
|
如果例外狀況是任務結束的原因 (由 |
|
字串命令列引數的 |
7.2. 映射結束代碼
當任務完成時,它會嘗試將結束代碼傳回作業系統。如果我們查看我們的原始範例,我們可以發現我們沒有控制應用程式的那個方面。因此,如果擲回例外狀況,JVM 會傳回一個代碼,該代碼可能對您在偵錯中沒有任何用處。
因此,Spring Boot 提供了一個介面 ExitCodeExceptionMapper
,可讓您將未捕獲的例外狀況映射到結束代碼。這樣做可讓您在結束代碼層級指示出錯的地方。此外,透過以這種方式映射結束代碼,Spring Cloud Task 會記錄傳回的結束代碼。
如果任務以 SIG-INT 或 SIG-TERM 終止,則結束代碼為零,除非程式碼中另有指定。
當任務正在執行時,結束代碼在儲存庫中儲存為 null。任務完成後,會根據本節稍早描述的準則儲存適當的結束代碼。 |
8. 組態
Spring Cloud Task 提供隨即可用的組態,如 DefaultTaskConfigurer
和 SimpleTaskConfiguration
類別中所定義。本節將逐步介紹預設值,以及如何根據您的需求自訂 Spring Cloud Task。
8.1. DataSource (資料來源)
Spring Cloud Task 使用資料來源來儲存任務執行的結果。依預設,我們提供 H2 的記憶體內實例,以提供簡單的引導開發方法。但是,在生產環境中,您可能想要組態自己的 DataSource
。
如果您的應用程式僅使用單個 DataSource
,並且該資料來源同時充當您的業務結構描述和任務儲存庫,您只需提供任何 DataSource
即可 (最簡單的方法是透過 Spring Boot 的組態慣例)。Spring Cloud Task 會自動將此 DataSource
用於儲存庫。
如果您的應用程式使用多個 DataSource
,您需要使用適當的 DataSource
組態任務儲存庫。此自訂可以透過 TaskConfigurer
的實作來完成。
8.2. 資料表前綴
TaskRepository
的一個可修改屬性是任務資料表的資料表前綴。依預設,它們都以 TASK_
作為前綴。TASK_EXECUTION
和 TASK_EXECUTION_PARAMS
是兩個範例。但是,可能有理由修改此前綴。如果結構描述名稱需要前置於資料表名稱,或者如果同一結構描述中需要多個任務資料表集,則必須變更資料表前綴。您可以透過將 spring.cloud.task.tablePrefix
設定為您需要的前綴來執行此操作,如下所示
spring.cloud.task.tablePrefix=yourPrefix
透過使用 spring.cloud.task.tablePrefix
,使用者會承擔建立任務資料表的責任,這些資料表既符合任務資料表結構描述的準則,又符合使用者業務需求所需的修改。建立您自己的 Task DDL 時,您可以利用 Spring Cloud Task Schema DDL 作為指南,如 這裡 所示。
8.3. 啟用/停用資料表初始化
如果您要建立任務資料表,並且不希望 Spring Cloud Task 在任務啟動時建立它們,請將 spring.cloud.task.initialize-enabled
屬性設定為 false
,如下所示
spring.cloud.task.initialize-enabled=false
它預設為 true
。
屬性 spring.cloud.task.initialize.enable 已被取代。 |
8.4. 外部產生的任務 ID
在某些情況下,您可能想要允許任務請求時間與基礎架構實際啟動任務之間的時間差。Spring Cloud Task 可讓您在請求任務時建立 TaskExecution
。然後,將產生的 TaskExecution
的執行 ID 傳遞給任務,以便它可以透過任務的生命週期更新 TaskExecution
。
可以透過呼叫 TaskRepository
實作上的 createTaskExecution
方法來建立 TaskExecution
,該實作參考保存 TaskExecution
物件的資料儲存區。
為了組態您的 Task (任務) 以使用產生的 TaskExecutionId
,請新增以下屬性
spring.cloud.task.executionid=yourtaskId
8.5. 外部任務 ID
Spring Cloud Task 可讓您為每個 TaskExecution
儲存外部任務 ID。為了組態您的 Task (任務) 以使用產生的 TaskExecutionId
,請新增以下屬性
spring.cloud.task.external-execution-id=<externalTaskId>
8.6. 父任務 ID
Spring Cloud Task 可讓您為每個 TaskExecution
儲存父任務 ID。一個範例是執行另一個或多個任務的任務,並且您想要記錄哪個任務啟動了每個子任務。為了組態您的 Task (任務) 以設定父 TaskExecutionId
,請在子任務上新增以下屬性
spring.cloud.task.parent-execution-id=<parentExecutionTaskId>
8.7. TaskConfigurer (任務組態器)
TaskConfigurer
是一個策略介面,可讓您自訂 Spring Cloud Task 元件的組態方式。依預設,我們提供 DefaultTaskConfigurer
,它提供邏輯預設值:基於 Map
的記憶體內元件 (如果未提供 DataSource
,則對開發很有用) 和基於 JDBC 的元件 (如果 DataSource
可用,則很有用)。
TaskConfigurer
可讓您組態三個主要元件
元件 | 描述 | 預設值 (由 DefaultTaskConfigurer 提供) |
---|---|---|
|
要使用的 |
|
|
要使用的 |
|
|
執行任務更新時要使用的交易管理器。 |
如果使用 |
您可以透過建立 TaskConfigurer
介面的自訂實作來自訂前面表格中描述的任何元件。通常,擴充 DefaultTaskConfigurer
(如果找不到 TaskConfigurer
,則會提供它) 並覆寫所需的 getter 就足夠了。但是,可能需要從頭開始實作您自己的。
除非使用者使用 getter 方法來提供要公開為 Spring Bean 的實作,否則不應直接使用 TaskConfigurer 中的 getter 方法。 |
8.8. Task Execution Listener (任務執行監聽器)
TaskExecutionListener
可讓您為任務生命週期期間發生的特定事件註冊監聽器。若要執行此操作,請建立一個實作 TaskExecutionListener
介面的類別。實作 TaskExecutionListener
介面的類別會收到以下事件的通知
-
onTaskStartup
:在將TaskExecution
儲存到TaskRepository
之前。 -
onTaskEnd
:在更新TaskRepository
中的TaskExecution
項目並標記任務的最終狀態之前。 -
onTaskFailed
:在擲回任務未處理的例外狀況時,在呼叫onTaskEnd
方法之前。
Spring Cloud Task 也可讓您使用以下方法註解將 TaskExecution
監聽器新增至 bean 中的方法
-
@BeforeTask
:在將TaskExecution
儲存到TaskRepository
之前 -
@AfterTask
:在更新TaskRepository
中的TaskExecution
項目並標記任務的最終狀態之前。 -
@FailedTask
:在擲回任務未處理的例外狀況時,在呼叫@AfterTask
方法之前。
以下範例顯示正在使用的三個註解
public class MyBean {
@BeforeTask
public void methodA(TaskExecution taskExecution) {
}
@AfterTask
public void methodB(TaskExecution taskExecution) {
}
@FailedTask
public void methodC(TaskExecution taskExecution, Throwable throwable) {
}
}
在鏈中比 TaskLifecycleListener 更早插入 ApplicationListener 可能會導致非預期的效果。 |
8.8.1. Task Execution Listener (任務執行監聽器) 擲回的例外狀況
如果 TaskExecutionListener
事件處理常式擲回例外狀況,則該事件處理常式的所有監聽器處理都會停止。例如,如果三個 onTaskStartup
監聽器已啟動,且第一個 onTaskStartup
事件處理常式擲回例外狀況,則不會呼叫其他兩個 onTaskStartup
方法。但是,會呼叫 TaskExecutionListeners
的其他事件處理常式 (onTaskEnd
和 onTaskFailed
)。
當 TaskExecutionListener
事件處理常式擲回例外狀況時,傳回的結束代碼是 ExitCodeEvent 回報的結束代碼。如果未發出 ExitCodeEvent
,則會評估擲回的 Exception (例外狀況),以查看它是否為 ExitCodeGenerator 類型。如果是,則從 ExitCodeGenerator
傳回結束代碼。否則,傳回 1
。
在 onTaskStartup
方法中擲回例外狀況的情況下,應用程式的結束代碼將為 1
。如果在 onTaskEnd
或 onTaskFailed
方法中擲回例外狀況,則應用程式的結束代碼將是使用上面列舉的規則建立的結束代碼。
在 onTaskStartup 、onTaskEnd 或 onTaskFailed 中擲回例外狀況的情況下,您無法使用 ExitCodeExceptionMapper 覆寫應用程式的結束代碼。 |
8.8.2. 結束訊息
您可以使用 TaskExecutionListener
以程式設計方式設定任務的結束訊息。這是透過設定 TaskExecution
的 exitMessage
來完成的,然後該訊息會傳遞到 TaskExecutionListener
。以下範例顯示使用 @AfterTask
ExecutionListener
註解的方法
@AfterTask
public void afterMe(TaskExecution taskExecution) {
taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}
可以在任何監聽器事件 (onTaskStartup
、onTaskFailed
和 onTaskEnd
) 中設定 ExitMessage
。三個監聽器的優先順序順序如下
-
onTaskEnd
-
onTaskFailed
-
onTaskStartup
例如,如果您為 onTaskStartup
和 onTaskFailed
監聽器設定了 exitMessage
,且任務在沒有失敗的情況下結束,則來自 onTaskStartup
的 exitMessage
會儲存在儲存庫中。 否則,如果發生失敗,則會儲存來自 onTaskFailed
的 exitMessage
。 此外,如果您使用 onTaskEnd
監聽器設定 exitMessage
,則來自 onTaskEnd
的 exitMessage
將取代來自 onTaskStartup
和 onTaskFailed
的結束訊息。
8.9. 限制 Spring Cloud Task 實例
Spring Cloud Task 讓您建立在同一時間只能執行一個具有給定任務名稱的任務。 為此,您需要建立任務名稱,並為每個任務執行設定 spring.cloud.task.single-instance-enabled=true
。 當第一個任務執行正在執行時,任何其他時間您嘗試執行具有相同任務名稱和 `spring.cloud.task.single-instance-enabled=true` 的任務時,該任務將會失敗,並顯示以下錯誤訊息:Task with name "application" is already running.
spring.cloud.task.single-instance-enabled
的預設值為 false
。 以下範例顯示如何將 spring.cloud.task.single-instance-enabled
設定為 true
spring.cloud.task.single-instance-enabled=true 或 false
若要使用此功能,您必須將以下 Spring Integration 依賴項新增至您的應用程式
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
如果任務因為啟用此功能且另一個具有相同任務名稱的任務正在執行而失敗,則應用程式的結束代碼將為 1。 |
8.9.1. Spring AOT 和原生編譯的單一實例用法
若要在建立原生編譯應用程式時使用 Spring Cloud Task 的單一實例功能,您需要在建置時啟用此功能。 為此,請新增 process-aot 執行並將 spring.cloud.task.single-step-instance-enabled=true
設定為 JVM 引數,如下所示
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.cloud.task.single-instance-enabled=true
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
8.10. 為 ApplicationRunner 和 CommandLineRunner 啟用觀察
若要為 ApplicationRunner
或 CommandLineRunner
啟用任務觀察,請將 spring.cloud.task.observation.enabled
設定為 true。
具有啟用觀察功能的範例任務應用程式,使用 SimpleMeterRegistry
,可以在這裡找到。
8.11. 停用 Spring Cloud Task 自動配置
在 Spring Cloud Task 不應為實作自動配置的情況下,您可以停用 Task 的自動配置。 這可以透過將以下註解新增至您的 Task 應用程式來完成
@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})
您也可以透過將 spring.cloud.task.autoconfiguration.enabled
屬性設定為 false
來停用 Task 自動配置。
8.12. 關閉 Context
如果應用程式需要在任務完成時(所有 *Runner#run
方法都已呼叫且任務儲存庫已更新)關閉 ApplicationContext
,請將屬性 spring.cloud.task.closecontextEnabled
設定為 true
。
另一個關閉 context 的情況是當 Task Execution 完成但應用程式未終止時。 在這些情況下,context 會保持開啟,因為已配置執行緒(例如:如果您正在使用 TaskExecutor)。 在這些情況下,在啟動任務時將 spring.cloud.task.closecontextEnabled
屬性設定為 true
。 這將在任務完成後關閉應用程式的 context。 因此允許應用程式終止。
8.13. 啟用任務指標
Spring Cloud Task 與 Micrometer 整合,並為其執行的 Task 建立觀察。 若要啟用 Task 可觀察性整合,您必須將 spring-boot-starter-actuator
、您偏好的註冊表實作(如果您想要發布指標)和 micrometer-tracing(如果您想要發布追蹤資料)新增至您的任務應用程式。 以下是啟用任務可觀察性和使用 Influx 指標的 Maven 依賴項範例
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-influx</artifactId>
<scope>runtime</scope>
</dependency>
8.14. Spring Task 和 Spring Cloud Task 屬性
術語 task
是業界常用的詞彙。 其中一個範例是 Spring Boot 提供了 spring.task
屬性,而 Spring Cloud Task 提供了 spring.cloud.task
屬性。 這在過去造成了一些混淆,認為這兩組屬性直接相關。 然而,它們代表 Spring 生態系統中提供的 2 組不同功能。
-
spring.task
指的是設定ThreadPoolTaskScheduler
的屬性。 -
spring.cloud.task
指的是設定 Spring Cloud Task 功能的屬性。
Batch
本節將更詳細地介紹 Spring Cloud Task 與 Spring Batch 的整合。 本節涵蓋追蹤 Job Execution 與執行它的 Task 之間的關聯,以及透過 Spring Cloud Deployer 進行遠端分割。
9. 將 Job Execution 與執行它的 Task 建立關聯
Spring Boot 提供了在 über-jar 中執行批次 Job 的設施。 Spring Boot 對此功能的支援讓開發人員可以在該執行中執行多個批次 Job。 Spring Cloud Task 提供了將 Job 的執行(Job Execution)與 Task 的執行建立關聯的能力,以便可以追溯到彼此。
Spring Cloud Task 透過使用 TaskBatchExecutionListener
來實現此功能。 預設情況下,此監聽器會在任何同時配置了 Spring Batch Job(透過在 context 中定義 Job
類型的 bean)和 classpath 上具有 spring-cloud-task-batch
jar 的 context 中自動配置。 監聽器會注入到滿足這些條件的所有 Job 中。
9.1. 覆寫 TaskBatchExecutionListener
若要防止監聽器注入到目前 context 中的任何批次 Job 中,您可以使用標準 Spring Boot 機制停用自動配置。
若要僅將監聽器注入到 context 中的特定 Job 中,請覆寫 batchTaskExecutionListenerBeanPostProcessor
並提供 Job bean ID 的清單,如下例所示
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
您可以在 Spring Cloud Task Project 的範例模組中找到範例批次應用程式,這裡。 |
10. 遠端分割
Spring Cloud Deployer 提供了在大多數雲端基礎架構上啟動基於 Spring Boot 的應用程式的設施。 DeployerPartitionHandler
和 DeployerStepExecutionHandler
將 worker 步驟執行的啟動委派給 Spring Cloud Deployer。
若要配置 DeployerStepExecutionHandler
,您必須提供一個 Resource
,代表要執行的 Spring Boot über-jar、一個 TaskLauncherHandler
和一個 JobExplorer
。 您可以配置任何環境屬性,以及要同時執行的 worker 最大數量、輪詢結果的間隔(預設為 10 秒)和逾時時間(預設為 -1 或無逾時)。 以下範例顯示如何配置此 PartitionHandler
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
當將環境變數傳遞給分割區時,每個分割區可能位於具有不同環境設定的不同機器上。 因此,您應該僅傳遞所需的環境變數。 |
請注意,在上面的範例中,我們已將 worker 的最大數量設定為 2。 設定 worker 的最大數量會建立應同時執行的分割區的最大數量。
預期要執行的 Resource
是一個 Spring Boot über-jar,其中 DeployerStepExecutionHandler
配置為目前 context 中的 CommandLineRunner
。 前面範例中列舉的儲存庫應該是 über-jar 所在的遠端儲存庫。 管理器和 worker 都應該能夠看到與 Job 儲存庫和任務儲存庫相同的資料儲存區。 一旦底層基礎架構啟動了 Spring Boot jar 且 Spring Boot 啟動了 DeployerStepExecutionHandler
,步驟處理常式就會執行請求的 Step
。 以下範例顯示如何配置 DeployerStepExecutionHandler
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
您可以在 Spring Cloud Task 專案的範例模組中找到範例遠端分割應用程式,這裡。 |
10.1. 非同步啟動遠端批次分割區
預設情況下,批次分割區會循序啟動。 但是,在某些情況下,這可能會影響效能,因為每次啟動都會阻塞,直到資源(例如:在 Kubernetes 中佈建 pod)被佈建。 在這些情況下,您可以為 DeployerPartitionHandler
提供 ThreadPoolTaskExecutor
。 這將根據 ThreadPoolTaskExecutor
的配置啟動遠端批次分割區。 例如
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
我們需要關閉 context,因為使用 ThreadPoolTaskExecutor 會留下一個活動的執行緒,因此應用程式將不會終止。 為了適當地關閉應用程式,我們需要將 spring.cloud.task.closecontextEnabled 屬性設定為 true 。 |
10.2. Kubernetes 平台上開發批次分割應用程式的注意事項
-
在 Kubernetes 平台上部署分割應用程式時,您必須為 Spring Cloud Kubernetes Deployer 使用以下依賴項
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId> </dependency>
-
任務應用程式及其分割區的應用程式名稱需要遵循以下 regex 模式:
[a-z0-9]([-a-z0-9]*[a-z0-9])
。 否則,將會拋出例外。
11. 批次資訊訊息
Spring Cloud Task 提供了批次 Job 發出資訊訊息的能力。“Spring Batch 事件”章節詳細介紹了此功能。
12. 批次 Job 結束代碼
如先前所述,Spring Cloud Task 應用程式支援記錄任務執行的結束代碼的能力。 但是,在您在任務中執行 Spring Batch Job 的情況下,無論 Batch Job Execution 如何完成,當使用預設的 Batch/Boot 行為時,任務的結果始終為零。 請記住,任務是一個 boot 應用程式,並且從任務傳回的結束代碼與 boot 應用程式相同。 若要在批次 Job 傳回 FAILED
的 BatchStatus 時,覆寫此行為並允許任務傳回零以外的結束代碼,請將 spring.cloud.task.batch.fail-on-job-failure
設定為 true
。 然後,結束代碼可以是 1(預設值)或基於指定的 ExitCodeGenerator
)
此功能使用新的 ApplicationRunner
來取代 Spring Boot 提供的那個。 預設情況下,它配置為相同的順序。 但是,如果您想要自訂 ApplicationRunner
執行的順序,您可以透過設定 spring.cloud.task.batch.applicationRunnerOrder
屬性來設定其順序。 若要讓您的任務根據批次 Job Execution 的結果傳回結束代碼,您需要編寫自己的 CommandLineRunner
。
單一步驟批次 Job Starter
本節將介紹如何使用 Spring Cloud Task 中包含的 starter 開發具有單個 Step
的 Spring Batch Job
。 此 starter 讓您可以使用配置來定義 ItemReader
、ItemWriter
或完整的單一步驟 Spring Batch Job
。 有關 Spring Batch 及其功能的更多資訊,請參閱Spring Batch 文件。
若要取得 Maven 的 starter,請將以下內容新增至您的建置
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
<version>2.3.0</version>
</dependency>
若要取得 Gradle 的 starter,請將以下內容新增至您的建置
compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"
13. 定義 Job
您可以使用 starter 來定義最少一個 ItemReader
或 ItemWriter
,或最多一個完整的 Job
。 在本節中,我們定義了配置 Job
需要定義哪些屬性。
13.1. 屬性
首先,starter 提供了一組屬性,讓您可以配置具有一個 Step 的 Job 的基本設定
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
Job 的名稱。 |
|
|
|
Step 的名稱。 |
|
|
|
每個交易要處理的項目數。 |
配置上述屬性後,您就有了一個具有單個、基於 chunk 的 step 的 Job。 這個基於 chunk 的 step 會讀取、處理和寫入 Map<String, Object>
實例作為項目。 但是,step 尚未執行任何操作。 您需要配置 ItemReader
、可選的 ItemProcessor
和 ItemWriter
以使其執行某些操作。 若要配置其中一個,您可以使用屬性並配置已提供自動配置的選項之一,或者您可以使用標準 Spring 配置機制配置您自己的。
如果您配置自己的,則輸入和輸出類型必須與 step 中的其他類型相符。 此 starter 中的 ItemReader 實作和 ItemWriter 實作都使用 Map<String, Object> 作為輸入和輸出項目。 |
14. ItemReader 實作的自動配置
此 starter 為四種不同的 ItemReader
實作提供自動配置:AmqpItemReader
、FlatFileItemReader
、JdbcCursorItemReader
和 KafkaItemReader
。 在本節中,我們概述如何使用提供的自動配置來配置每個實作。
14.1. AmqpItemReader
您可以使用 AmqpItemReader
從 AMQP 的佇列或主題讀取。 此 ItemReader
實作的自動配置取決於兩組配置。 第一個是 AmqpTemplate
的配置。 您可以自行配置,或使用 Spring Boot 提供的自動配置。 請參閱Spring Boot AMQP 文件。 配置 AmqpTemplate
後,您可以透過設定以下屬性來啟用批次功能以支援它
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
如果為 |
|
|
|
指示是否應註冊 |
有關更多資訊,請參閱AmqpItemReader
文件。
14.2. FlatFileItemReader
FlatFileItemReader
讓您可以從平面檔案(例如 CSV 和其他檔案格式)讀取。 若要從檔案讀取,您可以透過正常的 Spring 配置(LineTokenizer
、RecordSeparatorPolicy
、FieldSetMapper
、LineMapper
或 SkippedLinesCallback
)自行提供一些元件。 您也可以使用以下屬性來配置 reader
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
決定是否應為重新啟動儲存狀態。 |
|
|
|
用於在 |
|
|
|
要從檔案讀取的最大項目數。 |
|
|
0 |
已讀取的項目數。 用於重新啟動。 |
|
|
空列表 |
指示檔案中註解行(要忽略的行)的字串列表。 |
|
|
|
要讀取的資源。 |
|
|
|
如果設定為 |
|
|
|
讀取檔案時要使用的編碼。 |
|
|
0 |
指示在檔案開頭要跳過的行數。 |
|
|
|
指示檔案是否為分隔檔案(CSV 和其他格式)。 此屬性和 |
|
|
|
如果讀取分隔檔案,則指示要剖析的分隔符號。 |
|
|
|
用於確定用於引號值的字元。 |
|
|
空列表 |
索引列表,用於確定記錄中要包含在項目中的欄位。 |
|
|
|
指示檔案的記錄是否按欄號剖析。 此屬性和 |
|
|
空列表 |
欄範圍列表,用於剖析固定寬度記錄。 請參閱Range 文件。 |
|
|
|
從記錄剖析的每個欄位的名稱列表。 這些名稱是此 |
|
|
|
如果設定為 |
14.3. JdbcCursorItemReader
JdbcCursorItemReader
針對關係資料庫執行查詢,並迭代結果 cursor (ResultSet
) 以提供結果項目。 此自動配置讓您可以提供 PreparedStatementSetter
、RowMapper
或兩者。 您也可以使用以下屬性來配置 JdbcCursorItemReader
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
決定是否應為重新啟動儲存狀態。 |
|
|
|
用於在 |
|
|
|
要從檔案讀取的最大項目數。 |
|
|
0 |
已讀取的項目數。 用於重新啟動。 |
|
|
給驅動程式的提示,指示每次呼叫資料庫系統時要擷取多少筆記錄。 為了獲得最佳效能,您通常希望將其設定為與 chunk size 相符。 |
|
|
|
要從資料庫讀取的最大項目數。 |
|
|
|
查詢逾時的毫秒數。 |
|
|
|
|
決定 reader 在處理時是否應忽略 SQL 警告。 |
|
|
|
指示是否應在每次讀取後驗證 cursor 的位置,以驗證 |
|
|
|
指示驅動程式是否支援 cursor 的絕對定位。 |
|
|
|
指示連線是否與其他處理共用(因此是交易的一部分)。 |
|
|
|
要從中讀取的 SQL 查詢。 |
您也可以使用以下屬性,專門為 reader 指定 JDBC DataSource:.JdbcCursorItemReader
屬性
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
決定是否應啟用 |
|
|
|
資料庫的 JDBC URL。 |
|
|
|
資料庫的登入使用者名稱。 |
|
|
|
資料庫的登入密碼。 |
|
|
|
JDBC 驅動程式的完整限定名稱。 |
如果未指定 jdbccursoritemreader_datasource ,則 JDBCCursorItemReader 將使用預設的 DataSource 。 |
14.4. KafkaItemReader
從 Kafka 主題擷取資料分割非常有用,而這正是 KafkaItemReader
可以做到的。 若要配置 KafkaItemReader
,需要兩個配置部分。 首先,需要使用 Spring Boot 的 Kafka 自動配置來配置 Kafka(請參閱Spring Boot Kafka 文件)。 配置 Spring Boot 中的 Kafka 屬性後,您可以透過設定以下屬性來配置 KafkaItemReader
本身
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
用於在 |
|
|
|
要從中讀取的主題名稱。 |
|
|
空列表 |
要從中讀取的分割區索引列表。 |
|
|
30 |
|
|
|
|
決定是否應為重新啟動儲存狀態。 |
14.5. 原生編譯
單一步驟批次處理的優點是,當您使用 JVM 時,它可以讓您動態選擇在執行階段要使用的 reader 和 writer bean。 但是,當您使用原生編譯時,您必須在建置時而不是執行階段確定 reader 和 writer。 以下範例展示了如何做到這一點
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
<configuration>
<jvmArguments>
-Dspring.batch.job.flatfileitemreader.name=fooReader
-Dspring.batch.job.flatfileitemwriter.name=fooWriter
</jvmArguments>
</configuration>
</execution>
</executions>
</plugin>
15. ItemProcessor 配置
如果 ApplicationContext
中有一個可用的 ItemProcessor
,則單一步驟批次 Job 自動配置會接受它。 如果找到正確類型的(ItemProcessor<Map<String, Object>, Map<String, Object>>
),它將自動注入到 step 中。
16. ItemWriter 實作的自動配置
此 starter 為與支援的 ItemReader
實作相符的 ItemWriter
實作提供自動配置:AmqpItemWriter
、FlatFileItemWriter
、JdbcItemWriter
和 KafkaItemWriter
。 本節介紹如何使用自動配置來配置支援的 ItemWriter
。
16.1. AmqpItemWriter
若要寫入 RabbitMQ 佇列,您需要兩組配置。 首先,您需要一個 AmqpTemplate
。 取得此物件的最簡單方法是使用 Spring Boot 的 RabbitMQ 自動配置。 請參閱Spring Boot AMQP 文件。
配置 AmqpTemplate
後,您可以透過設定以下屬性來配置 AmqpItemWriter
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
如果為 |
|
|
|
指示是否應註冊 |
16.2. FlatFileItemWriter
若要將檔案作為 step 的輸出寫入,您可以配置 FlatFileItemWriter
。 自動配置接受已明確配置的元件(例如 LineAggregator
、FieldExtractor
、FlatFileHeaderCallback
或 FlatFileFooterCallback
)以及已透過設定以下指定屬性配置的元件
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
要讀取的資源。 |
|
|
|
指示輸出檔案是否為分隔檔案。 如果為 |
|
|
|
指示輸出檔案是否為格式化檔案。 如果為 |
|
|
|
用於為格式化檔案產生輸出的格式。 格式化是透過使用 |
|
|
|
產生檔案時要使用的 |
|
|
0 |
記錄的最大長度。 如果為 0,則大小為無界限。 |
|
|
0 |
記錄的最小長度。 |
|
|
|
用於分隔分隔檔案中欄位的 |
|
|
|
寫入檔案時要使用的編碼。 |
|
|
|
指示是否應在刷新時將檔案強制同步到磁碟。 |
|
|
|
從記錄剖析的每個欄位的名稱列表。 這些名稱是此 |
|
|
|
指示如果找到輸出檔案,是否應附加到檔案。 |
|
|
|
在輸出檔案中使用什麼 |
|
|
|
用於在 |
|
|
|
決定是否應為重新啟動儲存狀態。 |
|
|
|
如果設定為 |
|
|
|
如果設定為 |
|
|
|
指示 reader 是否為交易式佇列(指示讀取的項目在失敗時會傳回佇列)。 |
16.3. JdbcBatchItemWriter
若要將 step 的輸出寫入關係資料庫,此 starter 提供了自動配置 JdbcBatchItemWriter
的功能。 自動配置讓您可以透過設定以下屬性來提供您自己的 ItemPreparedStatementSetter
或 ItemSqlParameterSourceProvider
和配置選項
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
用於在 |
|
|
|
用於插入每個項目的 SQL。 |
|
|
|
是否驗證每個插入都會導致至少更新一筆記錄。 |
您也可以使用以下屬性,專門為 writer 指定 JDBC DataSource:.JdbcBatchItemWriter
屬性
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
決定是否應啟用 |
|
|
|
資料庫的 JDBC URL。 |
|
|
|
資料庫的登入使用者名稱。 |
|
|
|
資料庫的登入密碼。 |
|
|
|
JDBC 驅動程式的完整限定名稱。 |
如果未指定 jdbcbatchitemwriter_datasource ,則 JdbcBatchItemWriter 將使用預設的 DataSource 。 |
16.4. KafkaItemWriter
若要將 step 輸出寫入 Kafka 主題,您需要 KafkaItemWriter
。 此 starter 透過使用來自兩個位置的設施,為 KafkaItemWriter
提供自動配置。 第一個是 Spring Boot 的 Kafka 自動配置。 (請參閱Spring Boot Kafka 文件。)其次,此 starter 讓您可以在 writer 上配置兩個屬性。
屬性 | 類型 | 預設值 | 描述 |
---|---|---|---|
|
|
|
要寫入的 Kafka 主題。 |
|
|
|
傳遞給 writer 的項目是否都要作為刪除事件傳送到主題。 |
有關 KafkaItemWriter
的配置選項的更多資訊,請參閱KafkaItemWiter
文件。
16.5. Spring AOT
當將 Spring AOT 與單一步驟批次 Starter 一起使用時,您必須在編譯時設定 reader 和 writer 名稱屬性(除非您為 reader 和/或 writer 建立 bean)。 若要執行此操作,您必須在 boot maven 外掛程式或 gradle 外掛程式中,將您希望使用的 reader 和 writer 的名稱包含為引數或環境變數。 例如,如果您希望在 Maven 中啟用 FlatFileItemReader
和 FlatFileItemWriter
,它看起來會像這樣
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<id>process-aot</id>
<goals>
<goal>process-aot</goal>
</goals>
</execution>
</executions>
<configuration>
<arguments>
<argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
<argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
</arguments>
</configuration>
</plugin>
Spring Cloud Stream 整合
單獨的任務可能很有用,但將任務整合到更大的生態系統中,使其可用於更複雜的處理和協調。 本節介紹 Spring Cloud Task 與 Spring Cloud Stream 的整合選項。
17. 從 Spring Cloud Stream 啟動任務
您可以從 stream 啟動任務。 若要執行此操作,請建立一個 sink,以監聽將 TaskLaunchRequest
作為其 payload 的訊息。 TaskLaunchRequest
包含
-
uri
:要執行的任務工件的 URI。 -
applicationName
:與任務相關聯的名稱。 如果未設定 applicationName,則TaskLaunchRequest
會產生一個由以下內容組成的任務名稱:Task-<UUID>
。 -
commandLineArguments
:包含任務命令列引數的列表。 -
environmentProperties
:包含任務要使用的環境變數的地圖。 -
deploymentProperties
:包含 deployer 用於部署任務的屬性的地圖。
如果 payload 的類型不同,則 sink 會拋出例外。 |
例如,可以建立一個 stream,其中包含一個處理器,該處理器從 HTTP 來源接收資料,並建立一個包含 TaskLaunchRequest
的 GenericMessage
,並將訊息傳送到其輸出通道。 然後,任務 sink 將從其輸入通道接收訊息,然後啟動任務。
若要建立 taskSink,您只需要建立一個包含 EnableTaskLauncher
註解的 Spring Boot 應用程式,如下例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 專案的範例模組包含一個範例 Sink 和 Processor。 若要將這些範例安裝到您的本機 Maven 儲存庫,請從 spring-cloud-task-samples
目錄執行 Maven 建置,並將 skipInstall
屬性設定為 false
,如下例所示
mvn clean install
maven.remoteRepositories.springRepo.url 屬性必須設定為 über-jar 所在的遠端儲存庫的位置。 如果未設定,則沒有遠端儲存庫,因此它僅依賴本機儲存庫。 |
17.1. Spring Cloud Data Flow
若要在 Spring Cloud Data Flow 中建立 stream,您必須先註冊我們建立的 Task Sink 應用程式。 在以下範例中,我們透過使用 Spring Cloud Data Flow shell 來註冊 Processor 和 Sink 範例應用程式
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下範例顯示如何從 Spring Cloud Data Flow shell 建立 stream
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
18. Spring Cloud Task 事件
Spring Cloud Task 提供了當任務透過 Spring Cloud Stream 通道執行時,透過 Spring Cloud Stream 通道發射事件的功能。任務監聽器用於在名為 task-events
的訊息通道上發布 TaskExecution
。此功能會自動裝配到任何具有 spring-cloud-stream
、spring-cloud-stream-<binder>
,並且在其類別路徑上定義任務的任務中。
若要停用事件發射監聽器,請將 spring.cloud.task.events.enabled 屬性設定為 false 。 |
在定義適當的類別路徑後,以下任務會在 task-events
通道上發射 TaskExecution
作為事件(在任務開始和結束時)。
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
也需要在類別路徑上定義 binder 實作。 |
範例任務事件應用程式可以在 Spring Cloud Task Project 的 samples 模組中找到,這裡。 |
18.1. 停用特定任務事件
若要停用任務事件,您可以將 spring.cloud.task.events.enabled
屬性設定為 false
。
19. Spring Batch 事件
當透過任務執行 Spring Batch job 時,可以將 Spring Cloud Task 設定為根據 Spring Batch 中可用的 Spring Batch 監聽器發射資訊訊息。具體來說,以下 Spring Batch 監聽器會自動設定到每個 batch job 中,並在透過 Spring Cloud Task 執行時,在相關聯的 Spring Cloud Stream 通道上發射訊息。
-
JobExecutionListener
監聽job-execution-events
-
StepExecutionListener
監聽step-execution-events
-
ChunkListener
監聽chunk-events
-
ItemReadListener
監聽item-read-events
-
ItemProcessListener
監聽item-process-events
-
ItemWriteListener
監聽item-write-events
-
SkipListener
監聽skip-events
當 context 中存在適當的 beans (Job
和 TaskLifecycleListener
) 時,這些監聽器會自動設定到任何 AbstractJob
中。監聽這些事件的設定方式與繫結到任何其他 Spring Cloud Stream 通道的方式相同。我們的任務(執行 batch job 的任務)作為 Source
,而監聽應用程式則作為 Processor
或 Sink
。
一個範例可能是讓應用程式監聽 job-execution-events
通道,以獲取 job 的開始和停止事件。若要設定監聽應用程式,您需要將輸入設定為 job-execution-events
,如下所示:
spring.cloud.stream.bindings.input.destination=job-execution-events
也需要在類別路徑上定義 binder 實作。 |
範例 batch 事件應用程式可以在 Spring Cloud Task Project 的 samples 模組中找到,這裡。 |
19.1. 將 Batch 事件發送到不同的通道
Spring Cloud Task 為 batch 事件提供的選項之一是能夠更改特定監聽器可以發射其訊息的通道。若要執行此操作,請使用以下設定:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例如,如果 StepExecutionListener
需要將其訊息發射到另一個名為 my-step-execution-events
的通道,而不是預設的 step-execution-events
,您可以新增以下設定:
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
19.2. 停用 Batch 事件
若要停用所有 batch 事件的監聽器功能,請使用以下設定:
spring.cloud.task.batch.events.enabled=false
若要停用特定 batch 事件,請使用以下設定:
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表顯示您可以停用的個別監聽器:
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
19.3. Batch 事件的發射順序
預設情況下,batch 事件具有 Ordered.LOWEST_PRECEDENCE
。若要變更此值(例如,變更為 5),請使用以下設定:
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5
附錄
20. 任務儲存庫架構
本附錄提供任務儲存庫中使用之資料庫架構的 ERD。

20.1. 表格資訊
儲存任務執行資訊。
欄位名稱 | 必要 | 類型 | 欄位長度 | 註解 |
---|---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
Spring Cloud Task Framework 在應用程式啟動時建立從 |
START_TIME |
FALSE |
DATETIME(6) |
X |
Spring Cloud Task Framework 在應用程式啟動時建立此值。 |
END_TIME |
FALSE |
DATETIME(6) |
X |
Spring Cloud Task Framework 在應用程式結束時建立此值。 |
TASK_NAME |
FALSE |
VARCHAR |
100 |
Spring Cloud Task Framework 在應用程式啟動時會將此值設定為 "Application",除非使用者使用 |
EXIT_CODE |
FALSE |
INTEGER |
X |
遵循 Spring Boot 預設值,除非使用者如此處所述覆寫。 |
EXIT_MESSAGE |
FALSE |
VARCHAR |
2500 |
使用者定義,如此處所述。 |
ERROR_MESSAGE |
FALSE |
VARCHAR |
2500 |
Spring Cloud Task Framework 在應用程式結束時建立此值。 |
LAST_UPDATED |
TRUE |
TIMESTAMP |
X |
Spring Cloud Task Framework 在應用程式啟動時建立此值。或者,如果記錄是在任務外部建立的,則必須在記錄建立時填入該值。 |
EXTERNAL_EXECUTION_ID |
FALSE |
VARCHAR |
250 |
如果設定了 |
PARENT_TASK_EXECUTION_ID |
FALSE |
BIGINT |
X |
如果設定了 |
儲存用於任務執行的參數
欄位名稱 | 必要 | 類型 | 欄位長度 |
---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
TASK_PARAM |
FALSE |
VARCHAR |
2500 |
用於將任務執行連結到 batch 執行。
欄位名稱 | 必要 | 類型 | 欄位長度 |
---|---|---|---|
TASK_EXECUTION_ID |
TRUE |
BIGINT |
X |
JOB_EXECUTION_ID |
TRUE |
BIGINT |
X |
用於此處討論的 single-instance-enabled
功能。
欄位名稱 | 必要 | 類型 | 欄位長度 | 註解 |
---|---|---|---|---|
LOCK_KEY |
TRUE |
CHAR |
36 |
此鎖定的 UUID |
REGION |
TRUE |
VARCHAR |
100 |
使用者可以使用此欄位建立鎖定群組。 |
CLIENT_ID |
TRUE |
CHAR |
36 |
包含要鎖定之應用程式名稱的任務執行 ID。 |
CREATED_DATE |
TRUE |
DATETIME |
X |
建立條目的日期 |
用於設定每個資料庫類型表格的 DDL 可以在此處找到。 |
20.2. SQL Server
預設情況下,Spring Cloud Task 使用序列表格來決定 TASK_EXECUTION
表格的 TASK_EXECUTION_ID
。但是,當同時使用 SQL Server 啟動多個任務時,可能會導致 TASK_SEQ
表格上發生死鎖。解決方案是刪除 TASK_EXECUTION_SEQ
表格,並使用相同的名稱建立序列。例如:
DROP TABLE TASK_SEQ;
CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
START WITH 1
INCREMENT BY 1;
將 START WITH 設定為高於目前執行 ID 的值。 |
21. 建置此文件
此專案使用 Maven 來產生此文件。若要自行產生,請執行以下命令:$ mvn clean install -DskipTests -P docs
。
22. 可觀察性 metadata
22.1. 可觀察性 - 指標
以下您可以找到此專案宣告的所有指標列表。
22.1.1. 任務活動中
圍繞任務執行建立的指標。
指標名稱 spring.cloud.task
(由慣例類別 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定義)。類型 timer
。
指標名稱 spring.cloud.task.active
(由慣例類別 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定義)。類型 long task timer
。
在啟動 Observation 後新增的 KeyValues 可能會從 *.active 指標中遺失。 |
Micrometer 內部使用 nanoseconds 作為基本單位。但是,每個後端決定實際的基本單位。(例如:Prometheus 使用秒) |
封閉類別的完整限定名稱 org.springframework.cloud.task.listener.TaskExecutionObservation
。
所有標籤都必須以 spring.cloud.task 前綴開頭! |
名稱 |
描述 |
|
CF 雲端的應用程式 ID。 |
|
CF 雲端的應用程式名稱。 |
|
CF 雲端的應用程式版本。 |
|
CF 雲端的執行個體索引。 |
|
CF 雲端的組織名稱。 |
|
CF 雲端的空間 ID。 |
|
CF 雲端的空間名稱。 |
|
任務執行 ID。 |
|
任務結束代碼。 |
|
任務的外部執行 ID。 |
|
任務名稱量測。 |
|
任務父執行 ID。 |
|
任務狀態。可以是成功或失敗。 |
22.1.2. 任務執行器觀察
在執行任務執行器時建立的觀察。
指標名稱 spring.cloud.task.runner
(由慣例類別 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定義)。類型 timer
。
指標名稱 spring.cloud.task.runner.active
(由慣例類別 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定義)。類型 long task timer
。
在啟動 Observation 後新增的 KeyValues 可能會從 *.active 指標中遺失。 |
Micrometer 內部使用 nanoseconds 作為基本單位。但是,每個後端決定實際的基本單位。(例如:Prometheus 使用秒) |
封閉類別的完整限定名稱 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
。
所有標籤都必須以 spring.cloud.task 前綴開頭! |
名稱 |
描述 |
|
由 Spring Cloud Task 執行的 bean 名稱。 |
22.2. 可觀察性 - Spans
以下您可以找到此專案宣告的所有 spans 列表。
22.2.1. 任務活動中 Span
圍繞任務執行建立的指標。
Span 名稱 spring.cloud.task
(由慣例類別 org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention
定義)。
封閉類別的完整限定名稱 org.springframework.cloud.task.listener.TaskExecutionObservation
。
所有標籤都必須以 spring.cloud.task 前綴開頭! |
名稱 |
描述 |
|
CF 雲端的應用程式 ID。 |
|
CF 雲端的應用程式名稱。 |
|
CF 雲端的應用程式版本。 |
|
CF 雲端的執行個體索引。 |
|
CF 雲端的組織名稱。 |
|
CF 雲端的空間 ID。 |
|
CF 雲端的空間名稱。 |
|
任務執行 ID。 |
|
任務結束代碼。 |
|
任務的外部執行 ID。 |
|
任務名稱量測。 |
|
任務父執行 ID。 |
|
任務狀態。可以是成功或失敗。 |
22.2.2. 任務執行器觀察 Span
在執行任務執行器時建立的觀察。
Span 名稱 spring.cloud.task.runner
(由慣例類別 org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention
定義)。
封閉類別的完整限定名稱 org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation
。
所有標籤都必須以 spring.cloud.task 前綴開頭! |
名稱 |
描述 |
|
由 Spring Cloud Task 執行的 bean 名稱。 |