Apache Cassandra 支援

Spring Integration 提供通道配接器(從 6.0 版開始),用於針對 Apache Cassandra 叢集執行資料庫操作。它完全基於 Spring Data for Apache Cassandra 專案。

您需要將此依賴項包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-cassandra</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-cassandra:6.3.5"

Cassandra 輸出元件

CassandraMessageHandler 是一個 AbstractReplyProducingMessageHandler 實作,可以在單向(預設)和請求-回覆模式(producesReply 選項)下工作。它預設為非同步(setAsync(false) 可重設),並針對提供的 ReactiveCassandraOperations 執行反應式 INSERTUPDATEDELETESTATEMENT 操作。操作類型可以透過 CassandraMessageHandler.Type 選項設定。ingestQuery 將模式設定為 INSERTquerystatementExpression,或 statementProcessor 將模式設定為 STATEMENT

以下程式碼片段示範此通道配接器或閘道的各種設定

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
    return flow -> flow
            .handle(Cassandra.outboundGateway(cassandraOperations)
                    .query("SELECT * FROM book WHERE author = :author limit :size")
                    .parameter("author", "payload")
                    .parameter("size", m -> m.getHeaders().get("limit")))
            .channel(c -> c.flux("resultChannel"));
}
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
    integrationFlow {
        handle(
            Cassandra.outboundChannelAdapter(cassandraOperations)
                              .statementExpression("T(QueryBuilder).truncate('book').build()")
        ) { async(false) }
    }
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");

    Map<String, Expression> params = new HashMap<>();
    params.put("author", PARSER.parseExpression("payload"));
    params.put("size", PARSER.parseExpression("headers.limit"));

    cassandraMessageHandler.setParameterExpressions(params);

    cassandraMessageHandler.setOutputChannel(resultChannel());
    cassandraMessageHandler.setProducesReply(true);
    return cassandraMessageHandler;
}
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
                                        cassandra-template="cassandraTemplate"
                                        write-options="writeOptions"
                                        auto-startup="false"
                                        async="false"/>

<int-cassandra:outbound-gateway id="outgateway"
                                request-channel="input"
                                cassandra-template="cassandraTemplate"
                                mode="STATEMENT"
                                write-options="writeOptions"
                                query="SELECT * FROM book limit :size"
                                reply-channel="resultChannel"
                                auto-startup="true">
    <int-cassandra:parameter-expression name="author" expression="payload"/>
    <int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>

如果 CassandraMessageHandler 在預設非同步模式下用作閘道,則會產生 Mono<WriteResult>,這會根據提供的 MessageChannel 實作進行處理。對於真正的反應式處理,建議輸出通道設定使用 FluxMessageChannel。在同步模式下,會呼叫 Mono.block() 以取得回覆值。

如果執行 INSERTUPDATEDELETE 操作,則請求訊息 Payload 中應包含一個實體(標記為 org.springframework.data.cassandra.core.mapping.Table)。如果 Payload 是實體列表,則會執行各自的批次操作。

ingestQuery 模式預期 Payload 以要插入的值矩陣形式呈現 - List<List<?>>。例如,如果實體像這樣

@Table("book")
public record Book(@PrimaryKey String isbn,
                   String title,
                   @Indexed String author,
                   int pages,
                   LocalDate saleDate,
                   boolean isInStock) {

}

且通道配接器具有此設定

@Bean
public MessageHandler cassandraMessageHandler3() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
    cassandraMessageHandler.setIngestQuery(cqlIngest);
    cassandraMessageHandler.setAsync(false);
    return cassandraMessageHandler;
}

請求訊息 Payload 必須像這樣轉換

List<List<Object>> ingestBooks =
    payload.stream()
            .map(book ->
                    List.<Object>of(
                            book.isbn(),
                            book.title(),
                            book.author(),
                            book.pages(),
                            book.saleDate(),
                            book.isInStock()))
            .toList();

對於更複雜的使用案例,Payload 可以是 com.datastax.oss.driver.api.core.cql.Statement 的實例。建議使用 com.datastax.oss.driver.api.querybuilder.QueryBuilder API 來建構要針對 Apache Cassandra 執行的各種 Statement。例如,要從 Book 表格中移除所有資料,可以將 Payload 如下的訊息傳送至 CassandraMessageHandlerQueryBuilder.truncate("book").build()。或者,對於基於請求訊息的邏輯,可以為 CassandraMessageHandler 提供 statementExpressionstatementProcessor,以根據該訊息建構 Statement。為了方便起見,com.datastax.oss.driver.api.querybuilder 已註冊為 SpEL 評估內容中的 import,因此目標運算式可以像這樣簡單

statement-expression="T(QueryBuilder).selectFrom("book").all()"

setParameterExpressions(Map<String, Expression> parameterExpressions) 代表可綁定的具名查詢參數,且僅與 setQuery(String query) 選項一起使用。請參閱上述 Java 和 XML 範例。