MongoDb 支援
版本 2.1 引入了對 MongoDB 的支援:一個「高效能、開放原始碼、面向文件的資料庫」。
您需要將此依賴項包含到您的專案中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mongodb</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.3.5"
若要下載、安裝和執行 MongoDB,請參閱 MongoDB 文件。
連線到 MongoDb
阻塞式或反應式?
從 5.3 版開始,Spring Integration 提供對反應式 MongoDB 驅動程式的支援,以在存取 MongoDB 時啟用非阻塞式 I/O。若要啟用反應式支援,請將 MongoDB 反應式串流驅動程式新增至您的依賴項
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
對於常規同步用戶端,您需要將其各自的驅動程式新增至依賴項中
-
Maven
-
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
它們在框架中都是 optional
,以提供更好的終端使用者選擇支援。
若要開始與 MongoDB 互動,您首先需要連線到它。Spring Integration 建構在另一個 Spring 專案 Spring Data MongoDB 提供的支援之上。它提供了名為 MongoDatabaseFactory
和 ReactiveMongoDatabaseFactory
的工廠類別,簡化了與 MongoDB Client API 的整合。
Spring Data 預設提供阻塞式 MongoDB 驅動程式,但您可以透過包含上述依賴項來選擇反應式用法。 |
使用 MongoDatabaseFactory
若要連線到 MongoDB,您可以使用 MongoDatabaseFactory
介面的實作。
以下範例展示如何使用 SimpleMongoClientDatabaseFactory
-
Java
-
XML
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory
接受兩個引數:一個 MongoClient
實例和一個指定資料庫名稱的 String
。如果您需要設定屬性,例如 host
、port
和其他屬性,您可以透過使用底層 MongoClients
類別提供的其中一個建構子來傳遞這些屬性。有關如何設定 MongoDB 的更多資訊,請參閱 Spring-Data-MongoDB 參考文件。
使用 ReactiveMongoDatabaseFactory
若要使用反應式驅動程式連線到 MongoDB,您可以使用 ReactiveMongoDatabaseFactory
介面的實作。
以下範例展示如何使用 SimpleReactiveMongoDatabaseFactory
-
Java
-
XML
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB 訊息儲存區
如企業整合模式 (EIP) 書籍中所述,訊息儲存區 讓您可以持久化訊息。當處理具有緩衝訊息能力的元件 (QueueChannel
、aggregator
、resequencer
和其他元件) 時,如果可靠性是一個考量,這樣做會很有用。在 Spring Integration 中,MessageStore
策略也為 聲明檢查 模式提供了基礎,該模式也在 EIP 中描述。
Spring Integration 的 MongoDB 模組提供了 MongoDbMessageStore
,它是 MessageStore
策略 (主要由聲明檢查模式使用) 和 MessageGroupStore
策略 (主要由彙集器和重新排序器模式使用) 的實作。
以下範例設定 MongoDbMessageStore
以使用 QueueChannel
和 aggregator
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
上述範例是一個簡單的 Bean 設定,它期望 MongoDbFactory
作為建構子引數。
MongoDbMessageStore
透過使用 Spring Data Mongo 對應機制,將 Message
擴充為具有所有巢狀屬性的 Mongo 文件。當您需要存取 payload
或 headers
以進行稽核或分析時,這非常有用,例如,針對儲存的訊息。
MongoDbMessageStore 使用自訂 MappingMongoConverter 實作將 Message 實例儲存為 MongoDB 文件,並且 Message 的屬性 (payload 和 header 值) 有一些限制。 |
從 5.1.6 版開始,MongoDbMessageStore
可以使用自訂轉換器進行設定,這些轉換器會傳播到內部 MappingMongoConverter
實作中。有關更多資訊,請參閱 MongoDbMessageStore.setCustomConverters(Object… customConverters)
JavaDocs。
Spring Integration 3.0 引入了 ConfigurableMongoDbMessageStore
。它實作了 MessageStore
和 MessageGroupStore
介面。此類別可以接收 MongoTemplate
作為建構子引數,您可以使用它來設定自訂 WriteConcern
等。另一個建構子需要 MappingMongoConverter
和 MongoDbFactory
,這讓您可以為 Message
實例及其屬性提供一些自訂轉換。請注意,預設情況下,ConfigurableMongoDbMessageStore
使用標準 Java 序列化來讀取和寫入 Message
實例到 MongoDB 以及從 MongoDB 讀取 (請參閱 MongoDbMessageBytesConverter
),並依賴 MongoTemplate
中其他屬性的預設值。它從提供的 MongoDbFactory
和 MappingMongoConverter
建構 MongoTemplate
。ConfigurableMongoDbMessageStore
儲存的集合預設名稱為 configurableStoreMessages
。當訊息包含複雜資料類型時,我們建議使用此實作來建立穩健且靈活的解決方案。
從 6.0.8 版開始,AbstractConfigurableMongoDbMessageStore
提供了一個 setCreateIndexes(boolean)
選項 (預設為 true
),可用於停用自動索引建立。以下範例展示如何宣告 Bean 並停用自動索引建立
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}
MongoDB 通道訊息儲存區
4.0 版引入了新的 MongoDbChannelMessageStore
。它是針對在 QueueChannel
實例中使用的最佳化 MessageGroupStore
。使用 priorityEnabled = true
,您可以在 <int:priority-queue>
實例中使用它,以實現持久化訊息的優先順序輪詢。優先順序 MongoDB 文件欄位從 IntegrationMessageHeaderAccessor.PRIORITY
(priority
) 訊息標頭填入。
此外,所有 MongoDB MessageStore
實例現在都為 MessageGroup
文件提供了一個 sequence
欄位。sequence
值是針對來自相同集合的簡單 sequence
文件執行 $inc
操作的結果,該文件是按需建立的。當訊息儲存在同一毫秒內時,sequence
欄位用於 poll
操作,以提供先進先出 (FIFO) 訊息順序 (在優先順序內,如果已設定)。
我們不建議對優先順序和非優先順序使用相同的 MongoDbChannelMessageStore Bean,因為 priorityEnabled 選項適用於整個儲存區。但是,相同的 collection 可以用於 MongoDbChannelMessageStore 類型,因為從儲存區輪詢訊息已排序並使用索引。若要設定該情境,您可以從另一個訊息儲存區 Bean 擴充一個訊息儲存區 Bean,如下列範例所示 |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
使用 AbstractConfigurableMongoDbMessageStore 並停用自動索引建立
從 6.0.8 版開始,AbstractConfigurableMongoDbMessageStore
實作了 setCreateIndex(boolean)
,可用於停用或啟用 (預設) 自動索引建立。以下範例展示如何宣告 Bean 並停用自動索引建立
@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);
return mongoDbChannelMessageStore;
}
MongoDB 中繼資料儲存區
Spring Integration 4.2 引入了一個新的基於 MongoDB 的 MetadataStore
(請參閱 中繼資料儲存區) 實作。您可以使用 MongoDbMetadataStore
來跨應用程式重新啟動維護中繼資料狀態。您可以將這個新的 MetadataStore
實作與配接器一起使用,例如
若要指示這些配接器使用新的 MongoDbMetadataStore
,請宣告一個 Bean 名稱為 metadataStore
的 Spring Bean。Feed 輸入通道配接器會自動選取並使用宣告的 MongoDbMetadataStore
。以下範例展示如何宣告名稱為 metadataStore
的 Bean
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
MongoDbMetadataStore
也實作了 ConcurrentMetadataStore
,使其可以在多個應用程式實例之間可靠地共用,其中只允許一個實例儲存或修改金鑰的值。由於 MongoDB 保證,所有這些操作都是原子性的。
MongoDB 輸入通道配接器
MongoDB 輸入通道配接器是一個輪詢消費者,它從 MongoDB 讀取資料並將其作為 Message
Payload 傳送。以下範例展示如何設定 MongoDB 輸入通道配接器
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
如上述設定所示,您可以透過使用 inbound-channel-adapter
元素並為各種屬性提供值來設定 MongoDb 輸入通道配接器,例如
-
query
:JSON 查詢 (請參閱 MongoDB 查詢) -
query-expression
:SpEL 運算式,其評估結果為 JSON 查詢字串 (如上面的query
屬性) 或o.s.data.mongodb.core.query.Query
的實例。與query
屬性互斥。 -
entity-class
:Payload 物件的類型。如果未提供,則會傳回com.mongodb.DBObject
。 -
collection-name
或collection-name-expression
:識別要使用的 MongoDB 集合的名稱。 -
mongodb-factory
:對o.s.data.mongodb.MongoDbFactory
實例的參考 -
mongo-template
:對o.s.data.mongodb.core.MongoTemplate
實例的參考 -
其他在所有其他輸入配接器中通用的屬性 (例如 'channel')。
您不能同時設定 mongo-template 和 mongodb-factory 。 |
上述範例相對簡單且靜態,因為它對 query
具有常值,並使用 collection
的預設名稱。有時,您可能需要根據某些條件在執行階段變更這些值。若要這樣做,請使用它們的 -expression
等效項 (query-expression
和 collection-name-expression
),其中提供的運算式可以是任何有效的 SpEL 運算式。
此外,您可能希望對成功處理的從 MongoDB 讀取的資料執行一些後處理。例如,您可能希望在文件處理後移動或移除文件。您可以透過使用 Spring Integration 2.2 新增的交易同步功能來執行此操作,如下列範例所示
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
以下範例展示了上述範例中參考的 DocumentCleaner
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
您可以透過使用 transactional
元素將您的 Poller 宣告為交易性的。此元素可以參考真實的交易管理器 (例如,如果流程的其他部分調用 JDBC)。如果您沒有「真實」交易,則可以使用 o.s.i.transaction.PseudoTransactionManager
的實例,它是 Spring 的 PlatformTransactionManager
的實作,並在沒有實際交易時啟用 Mongo 配接器的交易同步功能。
這樣做不會使 MongoDB 本身成為交易性的。它允許在成功 (提交) 之前或之後或失敗 (回滾) 之後採取動作的同步。 |
一旦您的 Poller 是交易性的,您就可以在 transactional
元素上設定 o.s.i.transaction.TransactionSynchronizationFactory
的實例。TransactionSynchronizationFactory
建立 TransactionSynchronization
的實例。為了您的方便,我們公開了一個預設的基於 SpEL 的 TransactionSynchronizationFactory
,讓您可以設定 SpEL 運算式,其執行與交易協調 (同步)。支援 commit 前、commit 後和 rollback 後事件的運算式,以及每個事件的通道,評估結果 (如果有的話) 會傳送到該通道。對於每個子元素,您可以指定 expression
和 channel
屬性。如果僅存在 channel
屬性,則收到的訊息會作為特定同步情境的一部分傳送到那裡。如果僅存在 expression
屬性,並且運算式的結果是非 Null 值,則會產生一個以結果作為 Payload 的訊息,並傳送到預設通道 (NullChannel
) 並顯示在記錄中 (在 DEBUG
層級)。如果您希望評估結果傳送到特定通道,請新增 channel
屬性。如果運算式的結果為 Null 或 void,則不會產生訊息。
有關交易同步的更多資訊,請參閱 交易同步。
從 5.5 版開始,可以使用 updateExpression
設定 MongoDbMessageSource
,它必須評估為具有 MongoDb update
語法的 String
或 org.springframework.data.mongodb.core.query.Update
實例。它可以作為上述後處理程序的替代方案使用,並且它會修改從集合中擷取的實體,因此它們不會在下一個輪詢週期再次從集合中提取 (假設更新變更了查詢中使用的某些值)。當叢集中使用同一集合的多個 MongoDbMessageSource
實例時,仍然建議使用交易來實現執行隔離和資料一致性。
MongoDB 變更串流輸入通道配接器
從 5.3 版開始,spring-integration-mongodb
模組引入了 MongoDbChangeStreamMessageProducer
- 一個反應式 MessageProducerSupport
實作,用於 Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)
API。此元件預設產生具有 ChangeStreamEvent
的 body
作為 Payload 的訊息 Flux
,以及一些與變更串流相關的標頭 (請參閱 MongoHeaders
)。建議將此 MongoDbChangeStreamMessageProducer
與 FluxMessageChannel
組合使用,作為按需訂閱和下游事件消耗的 outputChannel
。
此通道配接器的 Java DSL 設定可能如下所示
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
當 MongoDbChangeStreamMessageProducer
停止,或下游訂閱取消,或 MongoDb 變更串流產生 OperationType.INVALIDATE
時,Publisher
會完成。可以再次啟動通道配接器,並建立新的來源資料 Publisher
,並在 MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)
中自動訂閱。如果在啟動之間需要從其他位置消耗變更串流事件,則可以針對新選項重新設定此通道配接器。
有關 Spring Data MongoDb 中變更串流支援的更多資訊,請參閱 文件。
MongoDB 輸出通道配接器
MongoDB 輸出通道配接器讓您可以將訊息 Payload 寫入 MongoDB 文件儲存區,如下列範例所示
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
如上述設定所示,您可以透過使用 outbound-channel-adapter
元素來設定 MongoDB 輸出通道配接器,並為各種屬性提供值,例如
-
collection-name
或collection-name-expression
:識別要使用的 MongoDb 集合的名稱。 -
mongo-converter
:對o.s.data.mongodb.core.convert.MongoConverter
實例的參考,它協助將原始 Java 物件轉換為 JSON 文件表示法。 -
mongodb-factory
:對o.s.data.mongodb.MongoDbFactory
實例的參考。 -
mongo-template
:對o.s.data.mongodb.core.MongoTemplate
實例的參考。注意:您不能同時設定 mongo-template 和 mongodb-factory。 -
其他在所有輸入配接器中通用的屬性 (例如 'channel')。
上述範例相對簡單且靜態,因為它對 collection-name
具有常值。有時,您可能需要根據某些條件在執行階段變更此值。若要這樣做,請使用 collection-name-expression
,其中提供的運算式是任何有效的 SpEL 運算式。
MongoDB 輸出閘道
5.0 版引入了 MongoDB 輸出閘道。它允許您透過將訊息傳送到其請求通道來查詢資料庫。然後,閘道將回應傳送到回覆通道。您可以使用訊息 Payload 和標頭來指定查詢和集合名稱,如下列範例所示
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory
@Autowired
lateinit var mongoConverter: MongoConverter
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
您可以將以下屬性與 MongoDB 輸出閘道一起使用
-
collection-name
或collection-name-expression
:識別要使用的 MongoDB 集合的名稱。 -
mongo-converter
:對o.s.data.mongodb.core.convert.MongoConverter
實例的參考,它協助將原始 Java 物件轉換為 JSON 文件表示法。 -
mongodb-factory
:對o.s.data.mongodb.MongoDbFactory
實例的參考。 -
mongo-template
:對o.s.data.mongodb.core.MongoTemplate
實例的參考。注意:您不能同時設定mongo-template
和mongodb-factory
。 -
entity-class
:要傳遞到 MongoTemplate 中的find(..)
和findOne(..)
方法的實體類別的完整名稱。如果未提供此屬性,則預設值為org.bson.Document
。 -
query
或query-expression
:指定 MongoDB 查詢。有關更多查詢範例,請參閱 MongoDB 文件。 -
collection-callback
:對org.springframework.data.mongodb.core.CollectionCallback
實例的參考。自 5.0.11 以來,最好是o.s.i.mongodb.outbound.MessageCollectionCallback
的實例,其中包含請求訊息內容。有關更多資訊,請參閱其 Javadocs。注意:您不能同時擁有collection-callback
和任何查詢屬性。
作為 query
和 query-expression
屬性的替代方案,您可以透過使用 collectionCallback
屬性作為對 MessageCollectionCallback
函數介面實作的參考來指定其他資料庫操作。以下範例指定計數操作
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB 反應式通道配接器
從 5.3 版開始,提供了 ReactiveMongoDbStoringMessageHandler
和 ReactiveMongoDbMessageSource
實作。它們基於 Spring Data 中的 ReactiveMongoOperations
,並且需要 org.mongodb:mongodb-driver-reactivestreams
依賴項。
當反應式串流組合涉及整合流程定義時,ReactiveMongoDbStoringMessageHandler
是 ReactiveMessageHandler
的實作,在框架中原生支援。有關更多資訊,請參閱 ReactiveMessageHandler。
從設定角度來看,它與許多其他標準通道配接器沒有區別。例如,使用 Java DSL,可以像這樣使用通道配接器
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
在此範例中,我們將透過提供的 ReactiveMongoDatabaseFactory
連線到 MongoDb,並將來自請求訊息的資料儲存到名稱為 data
的預設集合中。實際操作將從內部建立的 ReactiveStreamsConsumer
中的反應式串流組合按需執行。
ReactiveMongoDbMessageSource
是基於提供的 ReactiveMongoDatabaseFactory
或 ReactiveMongoOperations
和 MongoDb 查詢 (或運算式) 的 AbstractMessageSource
實作,根據 expectSingleResult
選項調用 find()
或 findOne()
操作,並使用預期的 entityClass
類型來轉換查詢結果。當產生的訊息 Payload 中的 Publisher
(根據 expectSingleResult
選項為 Flux
或 Mono
) 被訂閱時,會按需執行查詢執行和結果評估。當下游使用分割器和 FluxMessageChannel
時,框架可以自動訂閱此類 Payload (基本上是 flatMap
)。否則,目標應用程式有責任訂閱下游端點中輪詢的發佈者。
使用 Java DSL,可以像這樣設定通道配接器
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
從 5.5 版開始,可以使用 updateExpression
設定 ReactiveMongoDbMessageSource
。它具有與阻塞式 MongoDbMessageSource
相同的功能。有關更多資訊,請參閱 MongoDB 輸入通道配接器 和 AbstractMongoDbMessageSourceSpec
JavaDocs。