R2DBC 支援
Spring Integration 提供了通道配接器,可透過使用反應式存取資料庫 (經由 R2DBC 驅動程式) 來接收和傳送訊息。
您需要將此相依性包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-r2dbc</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:6.3.5"
R2DBC 輸入通道配接器
R2dbcMessageSource
是一個可輪詢的 MessageSource
實作,其基於 R2dbcEntityOperations
,並根據 expectSingleResult
選項,產生以 Flux
或 Mono
作為 Payload 的訊息,用於從資料庫提取的資料。要 SELECT
的查詢可以是靜態提供的,也可以基於每次 receive()
呼叫時評估的 SpEL 表達式。 R2dbcMessageSource.SelectCreator
作為評估內容的根物件存在,允許使用 StatementMapper.SelectSpec
Fluent API。預設情況下,此通道配接器將 select 中的記錄對應到 LinkedCaseInsensitiveMap
實例中。可以透過提供 payloadType
選項來自訂,該選項在底層由基於 this.r2dbcEntityOperations.getConverter()
的 EntityRowMapper
使用。 updateSql
是可選的,用於在資料庫中標記已讀取的記錄,以便從後續輪詢中跳過。可以為 UPDATE
操作提供一個 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>
,以根據 SELECT
結果中的記錄將值繫結到 UPDATE
中。
此通道配接器的典型設定可能如下所示
@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
"SELECT * FROM person WHERE name='Name'");
r2dbcMessageSource.setPayloadType(Person.class);
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
r2dbcMessageSource.setBindFunction(
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
return r2dbcMessageSource;
}
使用 Java DSL,此通道配接器的設定如下所示
@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlow
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
(selectCreator) ->
selectCreator.createSelect("person")
.withProjection("*")
.withCriteria(Criteria.where("id").is(1)))
.expectSingleResult(true)
.payloadType(Person.class)
.updateSql("UPDATE Person SET id='2' where id = :id")
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.handle((p, h) -> p)
.channel(MessageChannels.flux())
.get();
}
R2DBC 輸出通道配接器
R2dbcMessageHandler
是一個 ReactiveMessageHandler
實作,用於使用提供的 R2dbcEntityOperations
在資料庫中執行 INSERT
(預設)、UPDATE
或 DELETE
查詢。 R2dbcMessageHandler.Type
可以靜態設定,也可以透過針對請求訊息的 SpEL 表達式進行設定。要執行的查詢可以基於 tableName
、values
和 criteria
表達式選項,或者(如果未提供 tableName
)整個訊息 Payload 被視為 org.springframework.data.relational.core.mapping.Table
實體以針對其執行 SQL。套件 org.springframework.data.relational.core.query
已註冊為 SpEL 評估內容的匯入,以便直接存取用於 UPDATE
和 DELETE
查詢的 Criteria
Fluent API。 valuesExpression
用於 INSERT
和 UPDATE
中,並且必須評估為 Map
以取得欄位-值對,以便針對請求訊息在目標表格中執行變更。
此通道配接器的典型設定可能如下所示
@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
messageHandler.setCriteriaExpression(
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
return messageHandler;
}
使用 Java DSL,此通道配接器的設定如下所示
.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))