MongoDb 支援

版本 2.1 引入了對 MongoDB 的支援:一個「高效能、開放原始碼、面向文件的資料庫」。

您需要將此依賴項包含到您的專案中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.3.5"

若要下載、安裝和執行 MongoDB,請參閱 MongoDB 文件

連線到 MongoDb

阻塞式或反應式?

從 5.3 版開始,Spring Integration 提供對反應式 MongoDB 驅動程式的支援,以在存取 MongoDB 時啟用非阻塞式 I/O。若要啟用反應式支援,請將 MongoDB 反應式串流驅動程式新增至您的依賴項

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"

對於常規同步用戶端,您需要將其各自的驅動程式新增至依賴項中

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"

它們在框架中都是 optional,以提供更好的終端使用者選擇支援。

若要開始與 MongoDB 互動,您首先需要連線到它。Spring Integration 建構在另一個 Spring 專案 Spring Data MongoDB 提供的支援之上。它提供了名為 MongoDatabaseFactoryReactiveMongoDatabaseFactory 的工廠類別,簡化了與 MongoDB Client API 的整合。

Spring Data 預設提供阻塞式 MongoDB 驅動程式,但您可以透過包含上述依賴項來選擇反應式用法。

使用 MongoDatabaseFactory

若要連線到 MongoDB,您可以使用 MongoDatabaseFactory 介面的實作。

以下範例展示如何使用 SimpleMongoClientDatabaseFactory

  • Java

  • XML

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory 接受兩個引數:一個 MongoClient 實例和一個指定資料庫名稱的 String。如果您需要設定屬性,例如 hostport 和其他屬性,您可以透過使用底層 MongoClients 類別提供的其中一個建構子來傳遞這些屬性。有關如何設定 MongoDB 的更多資訊,請參閱 Spring-Data-MongoDB 參考文件。

使用 ReactiveMongoDatabaseFactory

若要使用反應式驅動程式連線到 MongoDB,您可以使用 ReactiveMongoDatabaseFactory 介面的實作。

以下範例展示如何使用 SimpleReactiveMongoDatabaseFactory

  • Java

  • XML

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB 訊息儲存區

企業整合模式 (EIP) 書籍中所述,訊息儲存區 讓您可以持久化訊息。當處理具有緩衝訊息能力的元件 (QueueChannelaggregatorresequencer 和其他元件) 時,如果可靠性是一個考量,這樣做會很有用。在 Spring Integration 中,MessageStore 策略也為 聲明檢查 模式提供了基礎,該模式也在 EIP 中描述。

Spring Integration 的 MongoDB 模組提供了 MongoDbMessageStore,它是 MessageStore 策略 (主要由聲明檢查模式使用) 和 MessageGroupStore 策略 (主要由彙集器和重新排序器模式使用) 的實作。

以下範例設定 MongoDbMessageStore 以使用 QueueChannelaggregator

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

上述範例是一個簡單的 Bean 設定,它期望 MongoDbFactory 作為建構子引數。

MongoDbMessageStore 透過使用 Spring Data Mongo 對應機制,將 Message 擴充為具有所有巢狀屬性的 Mongo 文件。當您需要存取 payloadheaders 以進行稽核或分析時,這非常有用,例如,針對儲存的訊息。

MongoDbMessageStore 使用自訂 MappingMongoConverter 實作將 Message 實例儲存為 MongoDB 文件,並且 Message 的屬性 (payloadheader 值) 有一些限制。

從 5.1.6 版開始,MongoDbMessageStore 可以使用自訂轉換器進行設定,這些轉換器會傳播到內部 MappingMongoConverter 實作中。有關更多資訊,請參閱 MongoDbMessageStore.setCustomConverters(Object…​ customConverters) JavaDocs。

Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore。它實作了 MessageStoreMessageGroupStore 介面。此類別可以接收 MongoTemplate 作為建構子引數,您可以使用它來設定自訂 WriteConcern 等。另一個建構子需要 MappingMongoConverterMongoDbFactory,這讓您可以為 Message 實例及其屬性提供一些自訂轉換。請注意,預設情況下,ConfigurableMongoDbMessageStore 使用標準 Java 序列化來讀取和寫入 Message 實例到 MongoDB 以及從 MongoDB 讀取 (請參閱 MongoDbMessageBytesConverter),並依賴 MongoTemplate 中其他屬性的預設值。它從提供的 MongoDbFactoryMappingMongoConverter 建構 MongoTemplateConfigurableMongoDbMessageStore 儲存的集合預設名稱為 configurableStoreMessages。當訊息包含複雜資料類型時,我們建議使用此實作來建立穩健且靈活的解決方案。

從 6.0.8 版開始,AbstractConfigurableMongoDbMessageStore 提供了一個 setCreateIndexes(boolean) 選項 (預設為 true),可用於停用自動索引建立。以下範例展示如何宣告 Bean 並停用自動索引建立

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB 通道訊息儲存區

4.0 版引入了新的 MongoDbChannelMessageStore。它是針對在 QueueChannel 實例中使用的最佳化 MessageGroupStore。使用 priorityEnabled = true,您可以在 <int:priority-queue> 實例中使用它,以實現持久化訊息的優先順序輪詢。優先順序 MongoDB 文件欄位從 IntegrationMessageHeaderAccessor.PRIORITY (priority) 訊息標頭填入。

此外,所有 MongoDB MessageStore 實例現在都為 MessageGroup 文件提供了一個 sequence 欄位。sequence 值是針對來自相同集合的簡單 sequence 文件執行 $inc 操作的結果,該文件是按需建立的。當訊息儲存在同一毫秒內時,sequence 欄位用於 poll 操作,以提供先進先出 (FIFO) 訊息順序 (在優先順序內,如果已設定)。

我們不建議對優先順序和非優先順序使用相同的 MongoDbChannelMessageStore Bean,因為 priorityEnabled 選項適用於整個儲存區。但是,相同的 collection 可以用於 MongoDbChannelMessageStore 類型,因為從儲存區輪詢訊息已排序並使用索引。若要設定該情境,您可以從另一個訊息儲存區 Bean 擴充一個訊息儲存區 Bean,如下列範例所示
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

使用 AbstractConfigurableMongoDbMessageStore 並停用自動索引建立

從 6.0.8 版開始,AbstractConfigurableMongoDbMessageStore 實作了 setCreateIndex(boolean),可用於停用或啟用 (預設) 自動索引建立。以下範例展示如何宣告 Bean 並停用自動索引建立

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB 中繼資料儲存區

Spring Integration 4.2 引入了一個新的基於 MongoDB 的 MetadataStore (請參閱 中繼資料儲存區) 實作。您可以使用 MongoDbMetadataStore 來跨應用程式重新啟動維護中繼資料狀態。您可以將這個新的 MetadataStore 實作與配接器一起使用,例如

若要指示這些配接器使用新的 MongoDbMetadataStore,請宣告一個 Bean 名稱為 metadataStore 的 Spring Bean。Feed 輸入通道配接器會自動選取並使用宣告的 MongoDbMetadataStore。以下範例展示如何宣告名稱為 metadataStore 的 Bean

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

MongoDbMetadataStore 也實作了 ConcurrentMetadataStore,使其可以在多個應用程式實例之間可靠地共用,其中只允許一個實例儲存或修改金鑰的值。由於 MongoDB 保證,所有這些操作都是原子性的。

MongoDB 輸入通道配接器

MongoDB 輸入通道配接器是一個輪詢消費者,它從 MongoDB 讀取資料並將其作為 Message Payload 傳送。以下範例展示如何設定 MongoDB 輸入通道配接器

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

如上述設定所示,您可以透過使用 inbound-channel-adapter 元素並為各種屬性提供值來設定 MongoDb 輸入通道配接器,例如

  • query:JSON 查詢 (請參閱 MongoDB 查詢)

  • query-expression:SpEL 運算式,其評估結果為 JSON 查詢字串 (如上面的 query 屬性) 或 o.s.data.mongodb.core.query.Query 的實例。與 query 屬性互斥。

  • entity-class:Payload 物件的類型。如果未提供,則會傳回 com.mongodb.DBObject

  • collection-namecollection-name-expression:識別要使用的 MongoDB 集合的名稱。

  • mongodb-factory:對 o.s.data.mongodb.MongoDbFactory 實例的參考

  • mongo-template:對 o.s.data.mongodb.core.MongoTemplate 實例的參考

  • 其他在所有其他輸入配接器中通用的屬性 (例如 'channel')。

您不能同時設定 mongo-templatemongodb-factory

上述範例相對簡單且靜態,因為它對 query 具有常值,並使用 collection 的預設名稱。有時,您可能需要根據某些條件在執行階段變更這些值。若要這樣做,請使用它們的 -expression 等效項 (query-expressioncollection-name-expression),其中提供的運算式可以是任何有效的 SpEL 運算式。

此外,您可能希望對成功處理的從 MongoDB 讀取的資料執行一些後處理。例如,您可能希望在文件處理後移動或移除文件。您可以透過使用 Spring Integration 2.2 新增的交易同步功能來執行此操作,如下列範例所示

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

以下範例展示了上述範例中參考的 DocumentCleaner

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

您可以透過使用 transactional 元素將您的 Poller 宣告為交易性的。此元素可以參考真實的交易管理器 (例如,如果流程的其他部分調用 JDBC)。如果您沒有「真實」交易,則可以使用 o.s.i.transaction.PseudoTransactionManager 的實例,它是 Spring 的 PlatformTransactionManager 的實作,並在沒有實際交易時啟用 Mongo 配接器的交易同步功能。

這樣做不會使 MongoDB 本身成為交易性的。它允許在成功 (提交) 之前或之後或失敗 (回滾) 之後採取動作的同步。

一旦您的 Poller 是交易性的,您就可以在 transactional 元素上設定 o.s.i.transaction.TransactionSynchronizationFactory 的實例。TransactionSynchronizationFactory 建立 TransactionSynchronization 的實例。為了您的方便,我們公開了一個預設的基於 SpEL 的 TransactionSynchronizationFactory,讓您可以設定 SpEL 運算式,其執行與交易協調 (同步)。支援 commit 前、commit 後和 rollback 後事件的運算式,以及每個事件的通道,評估結果 (如果有的話) 會傳送到該通道。對於每個子元素,您可以指定 expressionchannel 屬性。如果僅存在 channel 屬性,則收到的訊息會作為特定同步情境的一部分傳送到那裡。如果僅存在 expression 屬性,並且運算式的結果是非 Null 值,則會產生一個以結果作為 Payload 的訊息,並傳送到預設通道 (NullChannel) 並顯示在記錄中 (在 DEBUG 層級)。如果您希望評估結果傳送到特定通道,請新增 channel 屬性。如果運算式的結果為 Null 或 void,則不會產生訊息。

有關交易同步的更多資訊,請參閱 交易同步

從 5.5 版開始,可以使用 updateExpression 設定 MongoDbMessageSource,它必須評估為具有 MongoDb update 語法的 Stringorg.springframework.data.mongodb.core.query.Update 實例。它可以作為上述後處理程序的替代方案使用,並且它會修改從集合中擷取的實體,因此它們不會在下一個輪詢週期再次從集合中提取 (假設更新變更了查詢中使用的某些值)。當叢集中使用同一集合的多個 MongoDbMessageSource 實例時,仍然建議使用交易來實現執行隔離和資料一致性。

MongoDB 變更串流輸入通道配接器

從 5.3 版開始,spring-integration-mongodb 模組引入了 MongoDbChangeStreamMessageProducer - 一個反應式 MessageProducerSupport 實作,用於 Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API。此元件預設產生具有 ChangeStreamEventbody 作為 Payload 的訊息 Flux,以及一些與變更串流相關的標頭 (請參閱 MongoHeaders)。建議將此 MongoDbChangeStreamMessageProducerFluxMessageChannel 組合使用,作為按需訂閱和下游事件消耗的 outputChannel

此通道配接器的 Java DSL 設定可能如下所示

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

MongoDbChangeStreamMessageProducer 停止,或下游訂閱取消,或 MongoDb 變更串流產生 OperationType.INVALIDATE 時,Publisher 會完成。可以再次啟動通道配接器,並建立新的來源資料 Publisher,並在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) 中自動訂閱。如果在啟動之間需要從其他位置消耗變更串流事件,則可以針對新選項重新設定此通道配接器。

有關 Spring Data MongoDb 中變更串流支援的更多資訊,請參閱 文件

MongoDB 輸出通道配接器

MongoDB 輸出通道配接器讓您可以將訊息 Payload 寫入 MongoDB 文件儲存區,如下列範例所示

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

如上述設定所示,您可以透過使用 outbound-channel-adapter 元素來設定 MongoDB 輸出通道配接器,並為各種屬性提供值,例如

  • collection-namecollection-name-expression:識別要使用的 MongoDb 集合的名稱。

  • mongo-converter:對 o.s.data.mongodb.core.convert.MongoConverter 實例的參考,它協助將原始 Java 物件轉換為 JSON 文件表示法。

  • mongodb-factory:對 o.s.data.mongodb.MongoDbFactory 實例的參考。

  • mongo-template:對 o.s.data.mongodb.core.MongoTemplate 實例的參考。注意:您不能同時設定 mongo-template 和 mongodb-factory。

  • 其他在所有輸入配接器中通用的屬性 (例如 'channel')。

上述範例相對簡單且靜態,因為它對 collection-name 具有常值。有時,您可能需要根據某些條件在執行階段變更此值。若要這樣做,請使用 collection-name-expression,其中提供的運算式是任何有效的 SpEL 運算式。

MongoDB 輸出閘道

5.0 版引入了 MongoDB 輸出閘道。它允許您透過將訊息傳送到其請求通道來查詢資料庫。然後,閘道將回應傳送到回覆通道。您可以使用訊息 Payload 和標頭來指定查詢和集合名稱,如下列範例所示

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory

    @Autowired
    lateinit var mongoConverter: MongoConverter

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

您可以將以下屬性與 MongoDB 輸出閘道一起使用

  • collection-namecollection-name-expression:識別要使用的 MongoDB 集合的名稱。

  • mongo-converter:對 o.s.data.mongodb.core.convert.MongoConverter 實例的參考,它協助將原始 Java 物件轉換為 JSON 文件表示法。

  • mongodb-factory:對 o.s.data.mongodb.MongoDbFactory 實例的參考。

  • mongo-template:對 o.s.data.mongodb.core.MongoTemplate 實例的參考。注意:您不能同時設定 mongo-templatemongodb-factory

  • entity-class:要傳遞到 MongoTemplate 中的 find(..)findOne(..) 方法的實體類別的完整名稱。如果未提供此屬性,則預設值為 org.bson.Document

  • queryquery-expression:指定 MongoDB 查詢。有關更多查詢範例,請參閱 MongoDB 文件

  • collection-callback:對 org.springframework.data.mongodb.core.CollectionCallback 實例的參考。自 5.0.11 以來,最好是 o.s.i.mongodb.outbound.MessageCollectionCallback 的實例,其中包含請求訊息內容。有關更多資訊,請參閱其 Javadocs。注意:您不能同時擁有 collection-callback 和任何查詢屬性。

作為 queryquery-expression 屬性的替代方案,您可以透過使用 collectionCallback 屬性作為對 MessageCollectionCallback 函數介面實作的參考來指定其他資料庫操作。以下範例指定計數操作

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB 反應式通道配接器

從 5.3 版開始,提供了 ReactiveMongoDbStoringMessageHandlerReactiveMongoDbMessageSource 實作。它們基於 Spring Data 中的 ReactiveMongoOperations,並且需要 org.mongodb:mongodb-driver-reactivestreams 依賴項。

當反應式串流組合涉及整合流程定義時,ReactiveMongoDbStoringMessageHandlerReactiveMessageHandler 的實作,在框架中原生支援。有關更多資訊,請參閱 ReactiveMessageHandler

從設定角度來看,它與許多其他標準通道配接器沒有區別。例如,使用 Java DSL,可以像這樣使用通道配接器

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

在此範例中,我們將透過提供的 ReactiveMongoDatabaseFactory 連線到 MongoDb,並將來自請求訊息的資料儲存到名稱為 data 的預設集合中。實際操作將從內部建立的 ReactiveStreamsConsumer 中的反應式串流組合按需執行。

ReactiveMongoDbMessageSource 是基於提供的 ReactiveMongoDatabaseFactoryReactiveMongoOperations 和 MongoDb 查詢 (或運算式) 的 AbstractMessageSource 實作,根據 expectSingleResult 選項調用 find()findOne() 操作,並使用預期的 entityClass 類型來轉換查詢結果。當產生的訊息 Payload 中的 Publisher (根據 expectSingleResult 選項為 FluxMono) 被訂閱時,會按需執行查詢執行和結果評估。當下游使用分割器和 FluxMessageChannel 時,框架可以自動訂閱此類 Payload (基本上是 flatMap)。否則,目標應用程式有責任訂閱下游端點中輪詢的發佈者。

使用 Java DSL,可以像這樣設定通道配接器

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

從 5.5 版開始,可以使用 updateExpression 設定 ReactiveMongoDbMessageSource。它具有與阻塞式 MongoDbMessageSource 相同的功能。有關更多資訊,請參閱 MongoDB 輸入通道配接器AbstractMongoDbMessageSourceSpec JavaDocs。