使用 R2DBC 進行資料存取

R2DBC ("Reactive Relational Database Connectivity",反應式關聯式資料庫連線能力) 是一個社群驅動的規格制定工作,旨在標準化使用反應式模式存取 SQL 資料庫的方式。

套件階層

Spring Framework 的 R2DBC 抽象化框架包含兩個不同的套件

  • coreorg.springframework.r2dbc.core 套件包含 DatabaseClient 類別以及各種相關類別。請參閱 使用 R2DBC 核心類別來控制基本 R2DBC 處理和錯誤處理

  • connectionorg.springframework.r2dbc.connection 套件包含一個實用類別,用於輕鬆存取 ConnectionFactory,以及各種簡單的 ConnectionFactory 實作,您可以使用它們來測試和執行未修改的 R2DBC。請參閱 控制資料庫連線

使用 R2DBC 核心類別來控制基本 R2DBC 處理和錯誤處理

本節介紹如何使用 R2DBC 核心類別來控制基本 R2DBC 處理,包括錯誤處理。它包含以下主題

使用 DatabaseClient

DatabaseClient 是 R2DBC 核心套件中的中心類別。它處理資源的建立和釋放,這有助於避免常見錯誤,例如忘記關閉連線。它執行核心 R2DBC 工作流程的基本任務(例如語句建立和執行),讓應用程式程式碼提供 SQL 並提取結果。DatabaseClient 類別

  • 執行 SQL 查詢

  • 更新語句和預存程序呼叫

  • Result 實例執行迭代

  • 捕獲 R2DBC 例外,並將它們轉換為 org.springframework.dao 套件中定義的通用、更具資訊性的例外階層。(請參閱 一致的例外階層。)

用戶端具有使用反應式類型的函數式、流暢 API,用於宣告式組合。

當您為程式碼使用 DatabaseClient 時,您只需要實作 java.util.function 介面,為它們提供明確定義的契約。給定 DatabaseClient 類別提供的 ConnectionFunction 回呼會建立 Publisher。對於提取 Row 結果的對映函數也是如此。

您可以在 DAO 實作中使用 DatabaseClient,方法是透過使用 ConnectionFactory 參考直接實例化,或者您可以在 Spring IoC 容器中配置它,並將其作為 Bean 參考提供給 DAO。

建立 DatabaseClient 物件的最簡單方法是透過靜態工廠方法,如下所示

  • Java

  • Kotlin

DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
ConnectionFactory 應始終配置為 Spring IoC 容器中的 Bean。

前面的方法使用預設設定建立 DatabaseClient

您也可以從 DatabaseClient.builder() 取得 Builder 實例。您可以透過呼叫以下方法來自訂用戶端

  • ….bindMarkers(…):提供特定的 BindMarkersFactory 來配置具名參數到資料庫綁定標記的轉換。

  • ….executeFunction(…):設定 ExecuteFunction 如何執行 Statement 物件。

  • ….namedParameters(false):停用具名參數擴展。預設為啟用。

Dialect 由 BindMarkersFactoryResolverConnectionFactory 解析,通常是透過檢查 ConnectionFactoryMetadata
您可以透過 META-INF/spring.factories 註冊一個實作 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 的類別,讓 Spring 自動探索您的 BindMarkersFactoryBindMarkersFactoryResolver 使用 Spring 的 SpringFactoriesLoader 從類路徑中探索綁定標記提供者實作。

目前支援的資料庫有

  • H2

  • MariaDB

  • Microsoft SQL Server

  • MySQL

  • Postgres

此類別發出的所有 SQL 都記錄在 DEBUG 層級,類別對應於用戶端實例的完整限定類別名稱(通常為 DefaultDatabaseClient)。此外,每次執行都會在反應式序列中註冊一個檢查點,以協助偵錯。

以下章節提供了一些 DatabaseClient 用法範例。這些範例並未詳盡列出 DatabaseClient 公開的所有功能。請參閱隨附的 javadoc 以了解更多資訊。

執行語句

DatabaseClient 提供執行語句的基本功能。以下範例顯示了建立新表格所需包含的內容,以實現最小但功能完整的程式碼

  • Java

  • Kotlin

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .await()

DatabaseClient 旨在方便、流暢地使用。它在執行規範的每個階段都公開了中間方法、延續方法和終端方法。前面的範例使用 then() 來傳回完成 Publisher,該 Publisher 在查詢(或查詢,如果 SQL 查詢包含多個語句)完成後立即完成。

execute(…) 接受 SQL 查詢字串或查詢 Supplier<String>,以將實際查詢建立延遲到執行時。

查詢 (SELECT)

SQL 查詢可以透過 Row 物件或受影響的列數傳回值。DatabaseClient 可以傳回更新的列數或列本身,具體取決於發出的查詢。

以下查詢從表格中取得 idname 欄位

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person")
        .fetch().awaitSingle()

以下查詢使用綁定變數

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitSingle()

您可能已經注意到上面範例中使用了 fetch()fetch() 是一個延續運算子,可讓您指定要取用多少資料。

呼叫 first() 會從結果傳回第一列,並捨棄其餘列。您可以使用以下運算子取用資料

  • first() 傳回整個結果的第一列。它的 Kotlin Coroutine 變體命名為 awaitSingle() 用於不可為 Null 的傳回值,如果值是可選的,則命名為 awaitSingleOrNull()

  • one() 傳回正好一個結果,如果結果包含更多列,則會失敗。使用 Kotlin Coroutines,awaitOne() 用於正好一個值,如果值可能為 null,則使用 awaitOneOrNull()

  • all() 傳回結果的所有列。使用 Kotlin Coroutines 時,請使用 flow()

  • rowsUpdated() 傳回受影響的列數 (INSERT/UPDATE/DELETE 計數)。它的 Kotlin Coroutine 變體命名為 awaitRowsUpdated()

在不指定進一步對映詳細資訊的情況下,查詢會將表格結果傳回為 Map,其鍵是不區分大小寫的欄位名稱,對應到其欄位值。

您可以透過提供 Function<Row, T> 來控制結果對映,該函數針對每個 Row 呼叫,以便它可以傳回任意值(單數值、集合和 Map 以及物件)。

以下範例提取 name 欄位並發出其值

  • Java

  • Kotlin

Flux<String> names = client.sql("SELECT name FROM person")
        .map(row -> row.get("name", String.class))
        .all();
val names = client.sql("SELECT name FROM person")
        .map{ row: Row -> row.get("name", String.class) }
        .flow()

或者,有一個捷徑可以對映到單個值

	Flux<String> names = client.sql("SELECT name FROM person")
			.mapValue(String.class)
			.all();

或者,您可以對映到具有 Bean 屬性或記錄元件的結果物件

	// assuming a name property on Person
	Flux<Person> persons = client.sql("SELECT name FROM person")
			.mapProperties(Person.class)
			.all();
null 呢?

關聯式資料庫結果可以包含 null 值。Reactive Streams 規範禁止發出 null 值。該要求要求在提取器函數中正確處理 null。雖然您可以從 Row 取得 null 值,但您不得發出 null 值。您必須將任何 null 值包裝在物件中(例如,單數值的 Optional),以確保提取器函數永遠不會直接傳回 null 值。

使用 DatabaseClient 更新 (INSERTUPDATEDELETE)

修改語句的唯一區別在於,這些語句通常不傳回表格資料,因此您可以使用 rowsUpdated() 來取用結果。

以下範例顯示了一個 UPDATE 語句,該語句傳回更新的列數

  • Java

  • Kotlin

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitRowsUpdated()

將值繫結到查詢

典型的應用程式需要參數化 SQL 語句,以根據某些輸入來選取或更新列。這些通常是受 WHERE 子句約束的 SELECT 語句,或是接受輸入參數的 INSERTUPDATE 語句。如果參數未正確逸出,則參數化語句會帶來 SQL 注入的風險。DatabaseClient 利用 R2DBC 的 bind API 來消除查詢參數的 SQL 注入風險。您可以透過 execute(…) 運算子提供參數化 SQL 語句,並將參數繫結到實際的 Statement。然後,您的 R2DBC 驅動程式會使用預備語句和參數替換來執行語句。

參數繫結支援兩種繫結策略

  • 依索引,使用從零開始的參數索引。

  • 依名稱,使用佔位符名稱。

以下範例顯示了查詢的參數繫結

    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bind("id", "joe")
	    	.bind("name", "Joe")
			.bind("age", 34);

或者,您可以傳入名稱和值的 Map

	Map<String, Object> params = new LinkedHashMap<>();
	params.put("id", "joe");
	params.put("name", "Joe");
	params.put("age", 34);
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindValues(params);

或者,您可以傳入具有 Bean 屬性或記錄元件的參數物件

	// assuming id, name, age properties on Person
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindProperties(new Person("joe", "Joe", 34);

或者,您可以使用位置參數將值繫結到語句。索引從零開始。

    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bind(0, "joe")
	    	.bind(1, "Joe")
			.bind(2, 34);

如果您的應用程式繫結到許多參數,則可以使用單次呼叫來實現相同的目的

    List<?> values = List.of("joe", "Joe", 34);
    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bindValues(values);
R2DBC 原生綁定標記

R2DBC 使用資料庫原生綁定標記,這些標記取決於實際的資料庫供應商。例如,Postgres 使用索引標記,例如 $1$2$n。另一個範例是 SQL Server,它使用以 @ 為字首的具名綁定標記。

這與 JDBC 不同,JDBC 需要 ? 作為綁定標記。在 JDBC 中,實際的驅動程式會將 ? 綁定標記轉換為資料庫原生標記,作為其語句執行的一部分。

Spring Framework 的 R2DBC 支援可讓您使用原生綁定標記或具有 :name 語法的具名綁定標記。

具名參數支援利用 BindMarkersFactory 實例在查詢執行時將具名參數擴展為原生綁定標記,這讓您在各種資料庫供應商之間具有一定程度的查詢可移植性。

查詢預處理器將具名的 Collection 參數展開為一系列綁定標記,以消除基於引數數量動態建立查詢的需求。巢狀物件陣列會展開,以允許使用(例如)選取列表。

考慮以下查詢

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))

先前的查詢可以參數化並按如下方式執行

  • Java

  • Kotlin

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann",  50});

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples)
選取列表的用法取決於供應商。

以下範例顯示了使用 IN 述詞的更簡單變體

  • Java

  • Kotlin

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", arrayOf(35, 50))
R2DBC 本身不支援類似 Collection 的值。儘管如此,展開上面範例中給定的 List 適用於 Spring R2DBC 支援中的具名參數,例如,用於 IN 子句,如上所示。但是,插入或更新陣列類型欄位(例如,在 Postgres 中)需要基礎 R2DBC 驅動程式支援的陣列類型:通常是 Java 陣列,例如,String[] 以更新 text[] 欄位。請勿傳遞 Collection<String> 或類似物件作為陣列參數。

語句篩選器

有時您需要在實際 Statement 執行之前微調其選項。為此,請向 DatabaseClient 註冊一個 Statement 篩選器 (StatementFilterFunction),以攔截和修改執行中的語句,如下列範例所示

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
	    .bind("name", …)
	    .bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
		.bind("name", …)
		.bind("state", …)

DatabaseClient 也公開了一個簡化的 filter(…) 多載,它接受 Function<Statement, Statement>

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter(statement -> s.returnGeneratedValues("id"));

client.sql("SELECT id, name, state FROM table")
	    .filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter { statement -> s.returnGeneratedValues("id") }

client.sql("SELECT id, name, state FROM table")
	    .filter { statement -> s.fetchSize(25) }

StatementFilterFunction 實作允許篩選 Statement 和篩選 Result 物件。

DatabaseClient 最佳實務

DatabaseClient 類別的實例在配置後是執行緒安全的。這很重要,因為這表示您可以配置 DatabaseClient 的單個實例,然後安全地將此共用參考注入到多個 DAO(或儲存庫)中。DatabaseClient 是有狀態的,因為它維護對 ConnectionFactory 的參考,但此狀態不是會話狀態。

使用 DatabaseClient 類別的常見做法是在 Spring 配置檔案中配置 ConnectionFactory,然後將該共用的 ConnectionFactory Bean 相依性注入到您的 DAO 類別中。DatabaseClientConnectionFactory 的 setter 中建立。這會產生類似以下的 DAO

  • Java

  • Kotlin

public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory);
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {

	private val databaseClient = DatabaseClient.create(connectionFactory)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}

另一種顯式組態的替代方案是使用組件掃描和註解支援來進行依賴注入。在這種情況下,您可以使用 @Component 註解類別(使其成為組件掃描的候選者),並使用 @Autowired 註解 ConnectionFactory setter 方法。以下範例展示了如何做到這一點

  • Java

  • Kotlin

@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	@Autowired (2)
	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory); (3)
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 使用 @Component 註解類別。
2 使用 @Autowired 註解 ConnectionFactory setter 方法。
3 使用 ConnectionFactory 建立新的 DatabaseClient
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)

	private val databaseClient = DatabaseClient(connectionFactory) (3)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 使用 @Component 註解類別。
2 ConnectionFactory 的建構子注入。
3 使用 ConnectionFactory 建立新的 DatabaseClient

無論您選擇使用(或不使用)上述哪種範本初始化樣式,都很少需要每次要執行 SQL 時都建立新的 DatabaseClient 類別實例。一旦配置完成,DatabaseClient 實例就是執行緒安全的。如果您的應用程式存取多個資料庫,您可能需要多個 DatabaseClient 實例,這需要多個 ConnectionFactory,並因此需要多個配置不同的 DatabaseClient 實例。

檢索自動產生的金鑰

當將列插入定義自動遞增或身分識別欄位的資料表時,INSERT 陳述式可能會產生金鑰。若要完全控制要產生的欄位名稱,只需註冊一個 StatementFilterFunction,為所需的欄位請求產生的金鑰即可。

  • Java

  • Kotlin

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter(statement -> s.returnGeneratedValues("id"))
		.map(row -> row.get("id", Integer.class))
		.first();

// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { statement -> s.returnGeneratedValues("id") }
		.map { row -> row.get("id", Integer.class) }
		.awaitOne()

// generatedId emits the generated key once the INSERT statement has finished

控制資料庫連線

本節涵蓋

使用 ConnectionFactory

Spring 透過 ConnectionFactory 取得資料庫的 R2DBC 連線。ConnectionFactory 是 R2DBC 規範的一部分,並且是驅動程式的常見進入點。它讓容器或框架可以向應用程式程式碼隱藏連線池和交易管理問題。作為開發人員,您不需要了解如何連線到資料庫的詳細資訊。那是管理員的責任,由他們設定 ConnectionFactory。在您開發和測試程式碼時,您很可能同時扮演這兩個角色,但您不一定需要知道生產資料來源是如何配置的。

當您使用 Spring 的 R2DBC 層時,您可以使用第三方提供的連線池實作來配置您自己的連線池。一個流行的實作是 R2DBC Pool (r2dbc-pool)。Spring 發行版中的實作僅用於測試目的,不提供池化功能。

要配置 ConnectionFactory

  1. 使用 ConnectionFactory 取得連線,就像您通常取得 R2DBC ConnectionFactory 一樣。

  2. 提供 R2DBC URL(請參閱您的驅動程式文件以取得正確的值)。

以下範例展示如何配置 ConnectionFactory

  • Java

  • Kotlin

ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");

使用 ConnectionFactoryUtils

ConnectionFactoryUtils 類別是一個方便且功能強大的輔助類別,它提供了 static 方法來從 ConnectionFactory 取得連線,並在必要時關閉連線。

它支援訂閱者 Context 綁定的連線,例如 R2dbcTransactionManager

使用 SingleConnectionFactory

SingleConnectionFactory 類別是 DelegatingConnectionFactory 介面的實作,它封裝了一個單一的 Connection,該連線在使用後不會關閉。

如果任何客戶端程式碼在假設池化連線的情況下呼叫 close(例如在使用持久性工具時),您應該將 suppressClose 屬性設定為 true。此設定會傳回一個抑制關閉的代理,該代理封裝了物理連線。請注意,您不能再將其轉換為原生 Connection 或類似物件。

SingleConnectionFactory 主要是一個測試類別,並且可能用於特定需求,例如,如果您的 R2DBC 驅動程式允許這種用法,則可以使用管線化。與池化的 ConnectionFactory 相反,它始終重複使用相同的連線,避免過度建立物理連線。

使用 TransactionAwareConnectionFactoryProxy

TransactionAwareConnectionFactoryProxy 是目標 ConnectionFactory 的代理。該代理封裝了目標 ConnectionFactory,以增加對 Spring 管理的交易的感知。

如果您使用的 R2DBC 客戶端未以其他方式與 Spring 的 R2DBC 支援整合,則需要使用此類別。在這種情況下,您仍然可以使用此客戶端,同時讓此客戶端參與 Spring 管理的交易。通常最好將 R2DBC 客戶端與對 ConnectionFactoryUtils 的適當存取整合,以進行資源管理。

有關更多詳細資訊,請參閱 TransactionAwareConnectionFactoryProxy javadoc。

使用 R2dbcTransactionManager

R2dbcTransactionManager 類別是單一 R2DBC ConnectionFactoryReactiveTransactionManager 實作。它將來自指定 ConnectionFactory 的 R2DBC Connection 綁定到訂閱者 Context,可能允許每個 ConnectionFactory 一個訂閱者 Connection

應用程式程式碼需要透過 ConnectionFactoryUtils.getConnection(ConnectionFactory) 檢索 R2DBC Connection,而不是 R2DBC 的標準 ConnectionFactory.create()。所有框架類別(例如 DatabaseClient)都隱式地使用此策略。如果未使用交易管理器,則查找策略的行為與 ConnectionFactory.create() 完全相同,因此可以在任何情況下使用。