工作階段 & 交易

從 3.6 版開始,MongoDB 支援工作階段的概念。工作階段的使用啟用 MongoDB 的 因果一致性 模型,該模型保證以尊重其因果關係的順序執行操作。這些分為 ServerSession 實例和 ClientSession 實例。在本節中,當我們談到工作階段時,我們指的是 ClientSession

用戶端工作階段內的操作不會與工作階段外的操作隔離。

MongoOperationsReactiveMongoOperations 都提供閘道方法,用於將 ClientSession 繫結到操作。MongoCollectionMongoDatabase 使用實作 MongoDB 集合和資料庫介面的工作階段代理物件,因此您無需在每次呼叫時都新增工作階段。這表示對 MongoCollection#find() 的潛在呼叫會委派給 MongoCollection#find(ClientSession)

諸如 (Reactive)MongoOperations#getCollection 等方法傳回原生 MongoDB Java Driver 閘道物件(例如 MongoCollection),這些物件本身提供 ClientSession 的專用方法。這些方法並非工作階段代理。當直接與 MongoCollectionMongoDatabase 互動時,而非透過 MongoOperations 上的 #execute 回呼之一時,您應在需要時提供 ClientSession

ClientSession 支援

以下範例顯示工作階段的用法

  • 命令式

  • 反應式

ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
    .causallyConsistent(true)
    .build();

ClientSession session = client.startSession(sessionOptions); (1)

template.withSession(() -> session)
    .execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        Person durzo = action.findOne(query, Person.class);  (2)

        Person azoth = new Person("Kylar Stern");
        azoth.setMaster(durzo);

        action.insert(azoth);                                (3)

        return azoth;
    });

session.close()                                              (4)
1 從伺服器取得新的工作階段。
2 如同之前一樣使用 MongoOperation 方法。ClientSession 會自動套用。
3 請務必關閉 ClientSession
4 關閉工作階段。
當處理 DBRef 實例,特別是延遲載入的實例時,務必在載入所有資料之前不要關閉 ClientSession。否則,延遲擷取會失敗。
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();

Publisher<ClientSession> session = client.startSession(sessionOptions); (1)

template.withSession(session)
.execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        return action.findOne(query, Person.class)
            .flatMap(durzo -> {

                Person azoth = new Person("Kylar Stern");
                azoth.setMaster(durzo);

                return action.insert(azoth);                            (2)
            });
    }, ClientSession::close)                                            (3)
    .subscribe();                                                       (4)
1 取得用於新工作階段擷取的 Publisher
2 如同之前一樣使用 ReactiveMongoOperation 方法。ClientSession 會自動取得並套用。
3 請務必關閉 ClientSession
4 在您訂閱之前,不會發生任何事情。如需詳細資訊,請參閱Project Reactor 參考指南

透過使用提供實際工作階段的 Publisher,您可以將工作階段取得延遲到實際訂閱的時間點。不過,您仍然需要在完成時關閉工作階段,以免以過時的工作階段污染伺服器。當您不再需要工作階段時,請使用 execute 上的 doFinally 鉤子來呼叫 ClientSession#close()。如果您偏好更精確地控制工作階段本身,您可以透過驅動程式取得 ClientSession,並透過 Supplier 提供它。

ClientSession 的反應式使用僅限於範本 API 用法。目前沒有與反應式儲存庫的工作階段整合。

MongoDB 交易

從第 4 版開始,MongoDB 支援交易。交易建立在工作階段之上,因此需要作用中的 ClientSession

除非您在應用程式內容中指定 MongoTransactionManager,否則交易支援會停用。您可以使用 setSessionSynchronization(ALWAYS) 來參與進行中的非原生 MongoDB 交易。

若要取得對交易的完整程式化控制,您可能會想要使用 MongoOperations 上的工作階段回呼。

以下範例顯示程式化交易控制

程式化交易
  • 命令式

  • 反應式

ClientSession session = client.startSession(options);                   (1)

template.withSession(session)
    .execute(action -> {

        session.startTransaction();                                     (2)

        try {

            Step step = // ...;
            action.insert(step);

            process(step);

            action.update(Step.class).apply(Update.set("state", // ...

            session.commitTransaction();                                (3)

        } catch (RuntimeException e) {
            session.abortTransaction();                                 (4)
        }
    }, ClientSession::close)                                            (5)
1 取得新的 ClientSession
2 啟動交易。
3 如果一切運作如預期,請提交變更。
4 發生錯誤,因此回復所有內容。
5 完成時不要忘記關閉工作階段。

先前的範例讓您可以完全控制交易行為,同時在回呼中使用工作階段範圍的 MongoOperations 實例,以確保工作階段傳遞到每個伺服器呼叫。為了避免使用此方法帶來的一些額外負荷,您可以使用 TransactionTemplate 來消除手動交易流程的一些雜訊。

Mono<DeleteResult> result = Mono
    .from(client.startSession())                                                             (1)

    .flatMap(session -> {
        session.startTransaction();                                                          (2)

        return Mono.from(collection.deleteMany(session, ...))                                (3)

            .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))   (4)

            .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))     (5)

            .doFinally(signal -> session.close());                                           (6)
      });
1 首先,我們顯然需要啟動工作階段。
2 一旦我們手邊有 ClientSession,請啟動交易。
3 透過將 ClientSession 傳遞到操作中,在交易內操作。
4 如果操作異常完成,我們需要停止交易並保留錯誤。
5 或者,當然,在成功的情況下提交變更。仍然保留操作結果。
6 最後,我們需要確保關閉工作階段。

上述操作的罪魁禍首是保留主要流程 DeleteResult,而不是透過 commitTransaction()abortTransaction() 發布的交易結果,這導致相當複雜的設定。

除非您在應用程式內容中指定 ReactiveMongoTransactionManager,否則交易支援會停用。您可以使用 setSessionSynchronization(ALWAYS) 來參與進行中的非原生 MongoDB 交易。

使用 TransactionTemplate / TransactionalOperator 的交易

Spring Data MongoDB 交易同時支援 TransactionTemplateTransactionalOperator

使用 TransactionTemplate / TransactionalOperator 的交易
  • 命令式

  • 反應式

template.setSessionSynchronization(ALWAYS);                                     (1)

// ...

TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);         (2)

txTemplate.execute(new TransactionCallbackWithoutResult() {

    @Override
    protected void doInTransactionWithoutResult(TransactionStatus status) {     (3)

        Step step = // ...;
        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    }
});
1 在範本 API 組態期間啟用交易同步。
2 使用提供的 PlatformTransactionManager 建立 TransactionTemplate
3 在回呼中,ClientSession 和交易已註冊。
在執行階段變更 MongoTemplate 的狀態(如同您可能認為在先前清單的項目 1 中可能發生的情況)可能會導致執行緒和可見性問題。
template.setSessionSynchronization(ALWAYS);                                          (1)

// ...

TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
                                   new DefaultTransactionDefinition());              (2)


Step step = // ...;
template.insert(step);

Mono<Void> process(step)
    .then(template.update(Step.class).apply(Update.set("state", …))
    .as(rxtx::transactional)                                                         (3)
    .then();
1 啟用交易同步以參與交易。
2 使用提供的 ReactiveTransactionManager 建立 TransactionalOperator
3 TransactionalOperator.transactional(…) 為所有上游操作提供交易管理。

使用 MongoTransactionManager & ReactiveMongoTransactionManager 的交易

MongoTransactionManager / ReactiveMongoTransactionManager 是通往眾所周知的 Spring 交易支援的閘道。它讓應用程式可以使用 Spring 的受管理交易功能MongoTransactionManagerClientSession 繫結到執行緒,而 ReactiveMongoTransactionManager 則使用 ReactorContext 來完成此操作。MongoTemplate 偵測到工作階段,並據此對與交易相關聯的這些資源進行操作。MongoTemplate 也可以參與其他進行中的交易。以下範例顯示如何使用 MongoTransactionManager 建立和使用交易

使用 MongoTransactionManager / ReactiveMongoTransactionManager 的交易
  • 命令式

  • 反應式

@Configuration
static class Config extends AbstractMongoClientConfiguration {

    @Bean
    MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  (1)
        return new MongoTransactionManager(dbFactory);
    }

    // ...
}

@Component
public class StateService {

    @Transactional
    void someBusinessFunction(Step step) {                                        (2)

        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    };
});
1 在應用程式內容中註冊 MongoTransactionManager
2 將方法標記為交易式。
@Transactional(readOnly = true) 建議 MongoTransactionManager 也啟動一個交易,將 ClientSession 新增至傳出的請求。
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {  (1)
        return new ReactiveMongoTransactionManager(factory);
    }

    // ...
}

@Service
public class StateService {

    @Transactional
    Mono<UpdateResult> someBusinessFunction(Step step) {                                  (2)

        return template.insert(step)
            .then(process(step))
            .then(template.update(Step.class).apply(Update.set("state", …));
    };
});
1 在應用程式內容中註冊 ReactiveMongoTransactionManager
2 將方法標記為交易式。
@Transactional(readOnly = true) 建議 ReactiveMongoTransactionManager 也啟動一個交易,將 ClientSession 新增至傳出的請求。

控制 MongoDB 特定的交易選項

交易式服務方法可能需要特定的交易選項才能執行交易。Spring Data MongoDB 的交易管理員支援評估交易標籤,例如 @Transactional(label = { "mongo:readConcern=available" })

預設情況下,使用 mongo: 前置詞的標籤命名空間由預設組態的 MongoTransactionOptionsResolver 評估。交易標籤由 TransactionAttribute 提供,並且可透過 TransactionTemplateTransactionalOperator 進行程式化交易控制。由於它們的宣告性質,@Transactional(label = …) 提供了一個良好的起點,也可以作為文件。

目前,支援以下選項

最大提交時間

控制伺服器上 commitTransaction 操作的最大執行時間。值的格式與 ISO-8601 持續時間格式一致,如同 Duration.parse(…) 所使用的格式。

用法:mongo:maxCommitTime=PT1S

讀取關注點

設定交易的讀取關注點。

用法:mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE

讀取偏好

設定交易的讀取偏好。

用法:mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST

寫入關注點

設定交易的寫入關注點。

用法:mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY

加入外部交易的巢狀交易不會影響初始交易選項,因為交易已啟動。交易選項僅在啟動新交易時套用。

交易內部的特殊行為

在交易內部,MongoDB 伺服器的行為略有不同。

連線設定

MongoDB 驅動程式提供專用的副本集名稱組態選項,可將驅動程式變成自動偵測模式。此選項有助於識別主要副本集節點和交易期間的命令路由。

請務必將 replicaSet 新增至 MongoDB URI。如需更多詳細資訊,請參閱連線字串選項

集合操作

MongoDB 支援在交易內進行集合操作,例如集合建立。這也會影響首次使用時發生的即時集合建立。因此,請務必將所有必要的結構就緒。

暫時性錯誤

MongoDB 可以將特殊標籤新增至交易操作期間引發的錯誤。這些標籤可能表示暫時性失敗,這些失敗可能僅透過重試操作即可消失。我們強烈建議將 Spring Retry 用於這些目的。儘管如此,人們可能會覆寫 MongoTransactionManager#doCommit(MongoTransactionObject) 以實作 重試提交操作 行為,如 MongoDB 參考手冊中所概述。

計數

MongoDB count 對集合統計資料進行操作,這可能無法反映交易內的實際情況。當在多文件交易內部發出 count 命令時,伺服器會回應錯誤 50851。一旦 MongoTemplate 偵測到作用中交易,所有公開的 count() 方法都會轉換並委派給使用 $match$count 運算子的聚合框架,從而保留 Query 設定,例如 collation

當在聚合計數輔助程式內部使用地理位置命令時,限制適用。以下運算子無法使用,必須替換為不同的運算子

  • $where$expr

  • $near → 具有 $center$geoWithin

  • $nearSphere → 具有 $centerSphere$geoWithin

使用 Criteria.near(…)Criteria.nearSphere(…) 的查詢必須重寫為 Criteria.within(…)Criteria.withinSphere(…)。同樣適用於儲存庫查詢方法中必須變更為 withinnear 查詢關鍵字。另請參閱 MongoDB JIRA 票證 DRIVERS-518 以取得進一步參考。

以下程式碼片段顯示工作階段繫結的閉包內部的 count 用法

session.startTransaction();

template.withSession(session)
    .execute(action -> {
        action.count(query(where("state").is("active")), Step.class)
        ...

上面的程式碼片段在以下命令中實現

db.collection.aggregate(
   [
      { $match: { state: "active" } },
      { $count: "totalEntityCount" }
   ]
)

而非

db.collection.find( { state: "active" } ).count()