R2DBC 支援

Spring Integration 提供了通道配接器,可透過使用反應式存取資料庫 (經由 R2DBC 驅動程式) 來接收和傳送訊息。

您需要將此相依性包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:6.3.5"

R2DBC 輸入通道配接器

R2dbcMessageSource 是一個可輪詢的 MessageSource 實作,其基於 R2dbcEntityOperations,並根據 expectSingleResult 選項,產生以 FluxMono 作為 Payload 的訊息,用於從資料庫提取的資料。要 SELECT 的查詢可以是靜態提供的,也可以基於每次 receive() 呼叫時評估的 SpEL 表達式。 R2dbcMessageSource.SelectCreator 作為評估內容的根物件存在,允許使用 StatementMapper.SelectSpec Fluent API。預設情況下,此通道配接器將 select 中的記錄對應到 LinkedCaseInsensitiveMap 實例中。可以透過提供 payloadType 選項來自訂,該選項在底層由基於 this.r2dbcEntityOperations.getConverter()EntityRowMapper 使用。 updateSql 是可選的,用於在資料庫中標記已讀取的記錄,以便從後續輪詢中跳過。可以為 UPDATE 操作提供一個 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>,以根據 SELECT 結果中的記錄將值繫結到 UPDATE 中。

此通道配接器的典型設定可能如下所示

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道配接器的設定如下所示

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC 輸出通道配接器

R2dbcMessageHandler 是一個 ReactiveMessageHandler 實作,用於使用提供的 R2dbcEntityOperations 在資料庫中執行 INSERT (預設)、UPDATEDELETE 查詢。 R2dbcMessageHandler.Type 可以靜態設定,也可以透過針對請求訊息的 SpEL 表達式進行設定。要執行的查詢可以基於 tableNamevaluescriteria 表達式選項,或者(如果未提供 tableName)整個訊息 Payload 被視為 org.springframework.data.relational.core.mapping.Table 實體以針對其執行 SQL。套件 org.springframework.data.relational.core.query 已註冊為 SpEL 評估內容的匯入,以便直接存取用於 UPDATEDELETE 查詢的 Criteria Fluent API。 valuesExpression 用於 INSERTUPDATE 中,並且必須評估為 Map 以取得欄位-值對,以便針對請求訊息在目標表格中執行變更。

此通道配接器的典型設定可能如下所示

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道配接器的設定如下所示

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))