工作階段 & 交易
從 3.6 版開始,MongoDB 支援工作階段的概念。工作階段的使用啟用 MongoDB 的 因果一致性 模型,該模型保證以尊重其因果關係的順序執行操作。這些分為 ServerSession
實例和 ClientSession
實例。在本節中,當我們談到工作階段時,我們指的是 ClientSession
。
用戶端工作階段內的操作不會與工作階段外的操作隔離。 |
MongoOperations
和 ReactiveMongoOperations
都提供閘道方法,用於將 ClientSession
繫結到操作。MongoCollection
和 MongoDatabase
使用實作 MongoDB 集合和資料庫介面的工作階段代理物件,因此您無需在每次呼叫時都新增工作階段。這表示對 MongoCollection#find()
的潛在呼叫會委派給 MongoCollection#find(ClientSession)
。
諸如 (Reactive)MongoOperations#getCollection 等方法傳回原生 MongoDB Java Driver 閘道物件(例如 MongoCollection ),這些物件本身提供 ClientSession 的專用方法。這些方法並非工作階段代理。當直接與 MongoCollection 或 MongoDatabase 互動時,而非透過 MongoOperations 上的 #execute 回呼之一時,您應在需要時提供 ClientSession 。 |
ClientSession 支援
以下範例顯示工作階段的用法
-
命令式
-
反應式
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
ClientSession session = client.startSession(sessionOptions); (1)
template.withSession(() -> session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
Person durzo = action.findOne(query, Person.class); (2)
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
action.insert(azoth); (3)
return azoth;
});
session.close() (4)
1 | 從伺服器取得新的工作階段。 |
2 | 如同之前一樣使用 MongoOperation 方法。ClientSession 會自動套用。 |
3 | 請務必關閉 ClientSession 。 |
4 | 關閉工作階段。 |
當處理 DBRef 實例,特別是延遲載入的實例時,務必在載入所有資料之前不要關閉 ClientSession 。否則,延遲擷取會失敗。 |
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
Publisher<ClientSession> session = client.startSession(sessionOptions); (1)
template.withSession(session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
return action.findOne(query, Person.class)
.flatMap(durzo -> {
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
return action.insert(azoth); (2)
});
}, ClientSession::close) (3)
.subscribe(); (4)
1 | 取得用於新工作階段擷取的 Publisher 。 |
2 | 如同之前一樣使用 ReactiveMongoOperation 方法。ClientSession 會自動取得並套用。 |
3 | 請務必關閉 ClientSession 。 |
4 | 在您訂閱之前,不會發生任何事情。如需詳細資訊,請參閱Project Reactor 參考指南。 |
透過使用提供實際工作階段的 Publisher
,您可以將工作階段取得延遲到實際訂閱的時間點。不過,您仍然需要在完成時關閉工作階段,以免以過時的工作階段污染伺服器。當您不再需要工作階段時,請使用 execute
上的 doFinally
鉤子來呼叫 ClientSession#close()
。如果您偏好更精確地控制工作階段本身,您可以透過驅動程式取得 ClientSession
,並透過 Supplier
提供它。
ClientSession 的反應式使用僅限於範本 API 用法。目前沒有與反應式儲存庫的工作階段整合。 |
MongoDB 交易
除非您在應用程式內容中指定 MongoTransactionManager ,否則交易支援會停用。您可以使用 setSessionSynchronization(ALWAYS) 來參與進行中的非原生 MongoDB 交易。 |
若要取得對交易的完整程式化控制,您可能會想要使用 MongoOperations
上的工作階段回呼。
以下範例顯示程式化交易控制
-
命令式
-
反應式
ClientSession session = client.startSession(options); (1)
template.withSession(session)
.execute(action -> {
session.startTransaction(); (2)
try {
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
session.commitTransaction(); (3)
} catch (RuntimeException e) {
session.abortTransaction(); (4)
}
}, ClientSession::close) (5)
1 | 取得新的 ClientSession 。 |
2 | 啟動交易。 |
3 | 如果一切運作如預期,請提交變更。 |
4 | 發生錯誤,因此回復所有內容。 |
5 | 完成時不要忘記關閉工作階段。 |
先前的範例讓您可以完全控制交易行為,同時在回呼中使用工作階段範圍的 MongoOperations
實例,以確保工作階段傳遞到每個伺服器呼叫。為了避免使用此方法帶來的一些額外負荷,您可以使用 TransactionTemplate
來消除手動交易流程的一些雜訊。
Mono<DeleteResult> result = Mono
.from(client.startSession()) (1)
.flatMap(session -> {
session.startTransaction(); (2)
return Mono.from(collection.deleteMany(session, ...)) (3)
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) (4)
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) (5)
.doFinally(signal -> session.close()); (6)
});
1 | 首先,我們顯然需要啟動工作階段。 |
2 | 一旦我們手邊有 ClientSession ,請啟動交易。 |
3 | 透過將 ClientSession 傳遞到操作中,在交易內操作。 |
4 | 如果操作異常完成,我們需要停止交易並保留錯誤。 |
5 | 或者,當然,在成功的情況下提交變更。仍然保留操作結果。 |
6 | 最後,我們需要確保關閉工作階段。 |
上述操作的罪魁禍首是保留主要流程 DeleteResult
,而不是透過 commitTransaction()
或 abortTransaction()
發布的交易結果,這導致相當複雜的設定。
除非您在應用程式內容中指定 ReactiveMongoTransactionManager ,否則交易支援會停用。您可以使用 setSessionSynchronization(ALWAYS) 來參與進行中的非原生 MongoDB 交易。 |
使用 TransactionTemplate / TransactionalOperator 的交易
Spring Data MongoDB 交易同時支援 TransactionTemplate
和 TransactionalOperator
。
TransactionTemplate
/ TransactionalOperator
的交易-
命令式
-
反應式
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); (2)
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { (3)
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
}
});
1 | 在範本 API 組態期間啟用交易同步。 |
2 | 使用提供的 PlatformTransactionManager 建立 TransactionTemplate 。 |
3 | 在回呼中,ClientSession 和交易已註冊。 |
在執行階段變更 MongoTemplate 的狀態(如同您可能認為在先前清單的項目 1 中可能發生的情況)可能會導致執行緒和可見性問題。 |
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition()); (2)
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
.as(rxtx::transactional) (3)
.then();
1 | 啟用交易同步以參與交易。 |
2 | 使用提供的 ReactiveTransactionManager 建立 TransactionalOperator 。 |
3 | TransactionalOperator.transactional(…) 為所有上游操作提供交易管理。 |
使用 MongoTransactionManager & ReactiveMongoTransactionManager 的交易
MongoTransactionManager
/ ReactiveMongoTransactionManager
是通往眾所周知的 Spring 交易支援的閘道。它讓應用程式可以使用 Spring 的受管理交易功能。MongoTransactionManager
將 ClientSession
繫結到執行緒,而 ReactiveMongoTransactionManager
則使用 ReactorContext
來完成此操作。MongoTemplate
偵測到工作階段,並據此對與交易相關聯的這些資源進行操作。MongoTemplate
也可以參與其他進行中的交易。以下範例顯示如何使用 MongoTransactionManager
建立和使用交易
MongoTransactionManager
/ ReactiveMongoTransactionManager
的交易-
命令式
-
反應式
@Configuration
static class Config extends AbstractMongoClientConfiguration {
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { (1)
return new MongoTransactionManager(dbFactory);
}
// ...
}
@Component
public class StateService {
@Transactional
void someBusinessFunction(Step step) { (2)
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});
1 | 在應用程式內容中註冊 MongoTransactionManager 。 |
2 | 將方法標記為交易式。 |
@Transactional(readOnly = true) 建議 MongoTransactionManager 也啟動一個交易,將 ClientSession 新增至傳出的請求。 |
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { (1)
return new ReactiveMongoTransactionManager(factory);
}
// ...
}
@Service
public class StateService {
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step) { (2)
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
};
});
1 | 在應用程式內容中註冊 ReactiveMongoTransactionManager 。 |
2 | 將方法標記為交易式。 |
@Transactional(readOnly = true) 建議 ReactiveMongoTransactionManager 也啟動一個交易,將 ClientSession 新增至傳出的請求。 |
控制 MongoDB 特定的交易選項
交易式服務方法可能需要特定的交易選項才能執行交易。Spring Data MongoDB 的交易管理員支援評估交易標籤,例如 @Transactional(label = { "mongo:readConcern=available" })
。
預設情況下,使用 mongo:
前置詞的標籤命名空間由預設組態的 MongoTransactionOptionsResolver
評估。交易標籤由 TransactionAttribute
提供,並且可透過 TransactionTemplate
和 TransactionalOperator
進行程式化交易控制。由於它們的宣告性質,@Transactional(label = …)
提供了一個良好的起點,也可以作為文件。
目前,支援以下選項
- 最大提交時間
-
控制伺服器上 commitTransaction 操作的最大執行時間。值的格式與 ISO-8601 持續時間格式一致,如同
Duration.parse(…)
所使用的格式。用法:
mongo:maxCommitTime=PT1S
- 讀取關注點
-
設定交易的讀取關注點。
用法:
mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE
- 讀取偏好
-
設定交易的讀取偏好。
用法:
mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST
- 寫入關注點
-
設定交易的寫入關注點。
用法:
mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY
加入外部交易的巢狀交易不會影響初始交易選項,因為交易已啟動。交易選項僅在啟動新交易時套用。 |
交易內部的特殊行為
在交易內部,MongoDB 伺服器的行為略有不同。
連線設定
MongoDB 驅動程式提供專用的副本集名稱組態選項,可將驅動程式變成自動偵測模式。此選項有助於識別主要副本集節點和交易期間的命令路由。
請務必將 replicaSet 新增至 MongoDB URI。如需更多詳細資訊,請參閱連線字串選項。 |
集合操作
MongoDB 不支援在交易內進行集合操作,例如集合建立。這也會影響首次使用時發生的即時集合建立。因此,請務必將所有必要的結構就緒。
暫時性錯誤
MongoDB 可以將特殊標籤新增至交易操作期間引發的錯誤。這些標籤可能表示暫時性失敗,這些失敗可能僅透過重試操作即可消失。我們強烈建議將 Spring Retry 用於這些目的。儘管如此,人們可能會覆寫 MongoTransactionManager#doCommit(MongoTransactionObject)
以實作 重試提交操作 行為,如 MongoDB 參考手冊中所概述。
計數
MongoDB count
對集合統計資料進行操作,這可能無法反映交易內的實際情況。當在多文件交易內部發出 count
命令時,伺服器會回應錯誤 50851。一旦 MongoTemplate
偵測到作用中交易,所有公開的 count()
方法都會轉換並委派給使用 $match
和 $count
運算子的聚合框架,從而保留 Query
設定,例如 collation
。
當在聚合計數輔助程式內部使用地理位置命令時,限制適用。以下運算子無法使用,必須替換為不同的運算子
-
$where
→$expr
-
$near
→ 具有$center
的$geoWithin
-
$nearSphere
→ 具有$centerSphere
的$geoWithin
使用 Criteria.near(…)
和 Criteria.nearSphere(…)
的查詢必須重寫為 Criteria.within(…)
和 Criteria.withinSphere(…)
。同樣適用於儲存庫查詢方法中必須變更為 within
的 near
查詢關鍵字。另請參閱 MongoDB JIRA 票證 DRIVERS-518 以取得進一步參考。
以下程式碼片段顯示工作階段繫結的閉包內部的 count
用法
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
上面的程式碼片段在以下命令中實現
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)
而非
db.collection.find( { state: "active" } ).count()