資料庫
如同大多數企業應用程式風格,資料庫是批次處理的核心儲存機制。然而,批次處理與其他應用程式風格不同,因為系統必須處理的資料集非常龐大。如果 SQL 語句傳回 100 萬筆資料列,則結果集可能會將所有傳回結果保留在記憶體中,直到所有資料列都已讀取完畢。Spring Batch 針對此問題提供了兩種解決方案
基於游標的 ItemReader
實作
使用資料庫游標通常是大多數批次開發人員的預設方法,因為它是資料庫針對「串流」關聯式資料問題的解決方案。Java ResultSet
類別本質上是操作游標的物件導向機制。ResultSet
維護指向目前資料列的游標。在 ResultSet
上呼叫 next
會將此游標移至下一列。基於游標的 Spring Batch ItemReader
實作會在初始化時開啟游標,並針對每次呼叫 read
將游標向前移動一列,傳回可用於處理的對應物件。然後呼叫 close
方法以確保釋放所有資源。Spring 核心 JdbcTemplate
透過使用回呼模式來解決此問題,以完整對應 ResultSet
中的所有資料列,並在將控制權傳回給方法呼叫者之前關閉。但是,在批次處理中,這必須等到 step 完成。下圖顯示了基於游標的 ItemReader
如何運作的通用圖表。請注意,雖然範例使用 SQL(因為 SQL 廣為人知),但任何技術都可以實作基本方法。

此範例說明了基本模式。給定一個具有三個欄位的 'FOO' 表格:ID
、NAME
和 BAR
,選取 ID 大於 1 但小於 7 的所有資料列。這會將游標的開頭(資料列 1)放在 ID 2 上。此資料列的結果應為完全對應的 Foo
物件。再次呼叫 read()
會將游標移至下一列,即 ID 為 3 的 Foo
。這些讀取結果會在每次 read
後寫出,允許物件被垃圾回收(假設沒有實例變數維護對它們的參考)。
JdbcCursorItemReader
JdbcCursorItemReader
是基於游標技術的 JDBC 實作。它直接與 ResultSet
協作,並需要 SQL 語句針對從 DataSource
取得的連線執行。以下資料庫結構描述用作範例
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
許多人偏好為每個資料列使用網域物件,因此以下範例使用 RowMapper
介面的實作來對應 CustomerCredit
物件
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
由於 JdbcCursorItemReader
與 JdbcTemplate
共用關鍵介面,因此查看如何使用 JdbcTemplate
讀取此資料的範例很有用,以便與 ItemReader
進行比較。就本範例而言,假設 CUSTOMER
資料庫中有 1,000 個資料列。第一個範例使用 JdbcTemplate
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
執行上述程式碼片段後,customerCredits
清單包含 1,000 個 CustomerCredit
物件。在 query 方法中,從 DataSource
取得連線,針對它執行提供的 SQL,並針對 ResultSet
中的每個資料列呼叫 mapRow
方法。將其與以下範例中顯示的 JdbcCursorItemReader
方法進行比較
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
執行上述程式碼片段後,計數器等於 1,000。如果上述程式碼將傳回的 customerCredit
放入清單中,則結果將與 JdbcTemplate
範例完全相同。但是,ItemReader
的最大優勢在於它允許項目被「串流」。read
方法可以呼叫一次,項目可以由 ItemWriter
寫出,然後可以使用 read
取得下一個項目。這允許項目讀取和寫入以「區塊」完成並定期提交,這是高效能批次處理的本質。此外,它很容易設定為注入到 Spring Batch Step
中。
-
Java
-
XML
以下範例顯示如何在 Java 中將 ItemReader
注入到 Step
中
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
以下範例顯示如何在 XML 中將 ItemReader
注入到 Step
中
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
其他屬性
由於在 Java 中開啟游標有許多不同的選項,因此 JdbcCursorItemReader
上有許多可以設定的屬性,如下表所述
ignoreWarnings |
決定是否記錄 SQLWarnings 或導致例外。預設值為 |
fetchSize |
向 JDBC 驅動程式提供提示,說明當 |
maxRows |
設定基礎 |
queryTimeout |
設定驅動程式等待 |
verifyCursorPosition |
由於 |
saveState |
指示是否應將讀取器的狀態儲存在 |
driverSupportsAbsolute |
指示 JDBC 驅動程式是否支援在 |
setUseSharedExtendedConnection |
指示游標使用的連線是否應由所有其他處理程序使用,從而共用相同的交易。如果設定為 |
HibernateCursorItemReader
正如一般的 Spring 使用者在是否使用 ORM 解決方案時做出重要決策一樣,這會影響他們是否使用 JdbcTemplate
或 HibernateTemplate
,Spring Batch 使用者也有相同的選擇。HibernateCursorItemReader
是游標技術的 Hibernate 實作。Hibernate 在批次處理中的使用一直備受爭議。這主要是因為 Hibernate 最初是為支援線上應用程式風格而開發的。但是,這並不表示它不能用於批次處理。解決此問題最簡單的方法是使用 StatelessSession
而不是標準 session。這消除了 Hibernate 使用的所有快取和 dirty checking,這些可能會在批次情境中造成問題。有關 stateless 和一般 hibernate session 之間差異的更多資訊,請參閱您特定 hibernate 版本的說明文件。HibernateCursorItemReader
可讓您宣告 HQL 語句並傳入 SessionFactory
,它將以與 JdbcCursorItemReader
相同的基本方式,在每次呼叫 read 時傳回一個項目。以下範例設定使用與 JDBC 讀取器相同的 'customer credit' 範例
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
此設定的 ItemReader
以與 JdbcCursorItemReader
描述的完全相同的方式傳回 CustomerCredit
物件,前提是已為 Customer
表格正確建立 hibernate 對應檔案。「useStatelessSession」屬性預設為 true,但在此處新增以引起人們注意開啟或關閉它的能力。同樣值得注意的是,可以使用 setFetchSize
屬性設定基礎游標的擷取大小。與 JdbcCursorItemReader
一樣,設定也很簡單。
-
Java
-
XML
以下範例顯示如何在 Java 中注入 Hibernate ItemReader
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
以下範例顯示如何在 XML 中注入 Hibernate ItemReader
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
StoredProcedureItemReader
有時需要使用預存程序取得游標資料。StoredProcedureItemReader
的運作方式與 JdbcCursorItemReader
類似,不同之處在於,它不是執行查詢來取得游標,而是執行傳回游標的預存程序。預存程序可以透過三種不同的方式傳回游標
-
作為傳回的
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。 -
作為 ref-cursor 以 out 參數形式傳回(由 Oracle 和 PostgreSQL 使用)。
-
作為預存函數呼叫的傳回值。
-
Java
-
XML
以下 Java 範例設定使用與先前範例相同的 'customer credit' 範例
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
以下 XML 範例設定使用與先前範例相同的 'customer credit' 範例
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
先前的範例依賴預存程序來提供 ResultSet
作為傳回結果(來自先前的選項 1)。
如果預存程序傳回 ref-cursor
(選項 2),則我們需要提供作為傳回 ref-cursor
的 out 參數的位置。
-
Java
-
XML
以下範例顯示如何在 Java 中使用作為 ref-cursor 的第一個參數
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
以下範例顯示如何在 XML 中使用作為 ref-cursor 的第一個參數
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果游標是從預存函數傳回的(選項 3),則我們需要將屬性 "function" 設定為 true
。它預設為 false
。
-
Java
-
XML
以下範例顯示在 Java 中將屬性設定為 true
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
以下範例顯示在 XML 中將屬性設定為 true
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有這些情況下,我們都需要定義 RowMapper
以及 DataSource
和實際的程序名稱。
如果預存程序或函數接受參數,則必須使用 parameters
屬性宣告和設定它們。以下針對 Oracle 的範例宣告了三個參數。第一個是傳回 ref-cursor 的 out
參數,第二個和第三個是接受 INTEGER
類型值的 in 參數。
-
Java
-
XML
以下範例顯示如何在 Java 中使用參數
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
以下範例顯示如何在 XML 中使用參數
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了參數宣告之外,我們還需要指定 PreparedStatementSetter
實作,以設定呼叫的參數值。這與上述 JdbcCursorItemReader
的運作方式相同。 其他屬性 中列出的所有其他屬性也適用於 StoredProcedureItemReader
。
分頁 ItemReader
實作
使用資料庫游標的替代方案是執行多個查詢,其中每個查詢擷取部分結果。我們將此部分稱為頁面。每個查詢都必須指定起始資料列編號和我們想要在頁面中傳回的資料列數。
JdbcPagingItemReader
分頁 ItemReader
的一種實作是 JdbcPagingItemReader
。JdbcPagingItemReader
需要 PagingQueryProvider
負責提供用於擷取構成頁面的資料列的 SQL 查詢。由於每個資料庫都有自己的提供分頁支援的策略,因此我們需要為每個支援的資料庫類型使用不同的 PagingQueryProvider
。還有 SqlPagingQueryProviderFactoryBean
,它可以自動偵測正在使用的資料庫並判斷適當的 PagingQueryProvider
實作。這簡化了設定,並且是建議的最佳實務。
SqlPagingQueryProviderFactoryBean
要求您指定 select
子句和 from
子句。您也可以提供選用的 where
子句。這些子句和必要的 sortKey
用於建構 SQL 語句。
在 sortKey 上具有唯一索引約束非常重要,以確保在執行之間不會遺失任何資料。 |
在讀取器開啟後,它會以與任何其他 ItemReader
相同的基本方式,在每次呼叫 read
時傳回一個項目。當需要其他資料列時,分頁會在幕後發生。
-
Java
-
XML
以下 Java 範例設定使用與先前顯示的基於游標的 ItemReaders
類似的 'customer credit' 範例
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
以下 XML 範例設定使用與先前顯示的基於游標的 ItemReaders
類似的 'customer credit' 範例
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
此設定的 ItemReader
使用必須指定的 RowMapper
傳回 CustomerCredit
物件。「pageSize」屬性決定每次執行查詢時從資料庫讀取的實體數量。
「parameterValues」屬性可用於指定查詢的參數值 Map
。如果您在 where
子句中使用具名參數,則每個項目的索引鍵應與具名參數的名稱相符。如果您使用傳統的 '?' 佔位符,則每個項目的索引鍵應為佔位符的編號,從 1 開始。
JpaPagingItemReader
分頁 ItemReader
的另一種實作是 JpaPagingItemReader
。JPA 沒有類似於 Hibernate StatelessSession
的概念,因此我們必須使用 JPA 規範提供的其他功能。由於 JPA 支援分頁,因此在將 JPA 用於批次處理時,這是一個自然而然的選擇。在讀取每個頁面後,實體會分離,並且清除持續性內容,以便在處理頁面後可以對實體進行垃圾回收。
JpaPagingItemReader
可讓您宣告 JPQL 語句並傳入 EntityManagerFactory
。然後,它會以與任何其他 ItemReader
相同的基本方式,在每次呼叫 read 時傳回一個項目。當需要其他實體時,分頁會在幕後發生。
-
Java
-
XML
以下 Java 範例設定使用與先前顯示的 JDBC 讀取器相同的 'customer credit' 範例
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
以下 XML 範例設定使用與先前顯示的 JDBC 讀取器相同的 'customer credit' 範例
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
此設定的 ItemReader
以與上述 JdbcPagingItemReader
描述的完全相同的方式傳回 CustomerCredit
物件,前提是 CustomerCredit
物件具有正確的 JPA 註解或 ORM 對應檔案。「pageSize」屬性決定每次查詢執行時從資料庫讀取的實體數量。
資料庫 ItemWriters
雖然平面檔案和 XML 檔案都有特定的 ItemWriter
實例,但在資料庫世界中沒有完全相同的等效項。這是因為交易提供了所有需要的功能。ItemWriter
實作對於檔案是必要的,因為它們必須表現得像交易一樣,追蹤寫入的項目並在適當的時間刷新或清除。資料庫不需要此功能,因為寫入已包含在交易中。使用者可以建立自己的 DAO 來實作 ItemWriter
介面,或使用從為通用處理考量而撰寫的自訂 ItemWriter
中的 DAO。無論哪種方式,它們都應該可以正常運作,沒有任何問題。需要注意的一件事是透過批次處理輸出提供的效能和錯誤處理能力。這在使用 hibernate 作為 ItemWriter
時最常見,但在使用 JDBC 批次模式時可能會遇到相同的問題。批次處理資料庫輸出沒有任何固有的缺陷,前提是我們小心刷新且資料中沒有錯誤。但是,寫入時的任何錯誤都可能會造成混淆,因為無法知道哪個個別項目導致例外,甚至無法知道是否有任何個別項目負責,如下圖所示

如果項目在寫入之前被緩衝,則只有在提交之前刷新緩衝區時才會擲回任何錯誤。例如,假設每個區塊寫入 20 個項目,而第 15 個項目擲回 DataIntegrityViolationException
。就 Step
而言,所有 20 個項目都已成功寫入,因為在實際寫入之前,無法知道是否發生錯誤。一旦呼叫 Session#flush()
,緩衝區就會清空並命中例外。此時,Step
無能為力。交易必須回滾。通常,此例外可能會導致項目被跳過(取決於 skip/retry 策略),然後不會再次寫入。但是,在批次情境中,無法知道哪個項目導致問題。發生失敗時,正在寫入整個緩衝區。解決此問題的唯一方法是在每個項目之後刷新,如下圖所示

這是一個常見的使用案例,尤其是在使用 Hibernate 時,而 ItemWriter
實作的簡單準則是每次呼叫 write()
時都刷新。這樣做可以可靠地跳過項目,而 Spring Batch 會在內部處理錯誤後對 ItemWriter
呼叫的粒度。