Overview
在本指南中,您可以了解如何使用 Java Reactive Streams 驱动程序执行事务。事务允许您运行一系列操作,这些操作在所有数据更改成功后才会应用。如果事务中的任何操作失败,驱动程序会取消事务并在所有数据更改变得可见之前将其丢弃。
在MongoDB中,事务在逻辑会话中运行。 会话是您打算按顺序运行的一组相关读取或写入操作。 通过会话,您可以为一群组操作启用因果一致性,并运行ACID事务。 MongoDBACID 一致性保证ACID 事务操作中涉及的数据保持一致,即使操作遇到意外错误。
使用Java Reactive Streams驱动程序时,您可以从 MongoClient实例创建一个新会话,并将其类型定义为 ClientSession。我们建议您将客户端重复用于多个会话和事务,而不是每次都实例化一个新客户端。
警告
仅应将 ClientSession 与创建它的 MongoClient(或关联的 MongoDatabase 或 MongoCollection)一起使用。将 ClientSession 与其他 MongoClient 一起使用会导致操作错误。
样本数据
本指南中的示例使用Atlas示例数据集中的sample_restaurants.restaurants和sample_mflix.movies集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅入门。
重要
项目 Reactor 库
本指南使用 Project Reactor 库来使用Java Reactive Streams驱动程序方法返回的 Publisher 实例。要学习;了解有关 Project Reactor 库及其使用方法的更多信息,请参阅 Reactor 文档中的入门。要进一步学习;了解如何使用本指南中的 Project Reactor 库方法,请参阅“将数据写入MongoDB”指南。
因果一致性(Causal Consistency)
MongoDB在某些客户端会话中实现因果一致性。因果一致性模型ACID 一致性保证在分布式系统中,会话中的操作按因果顺序运行。客户端观察到的结果与因果关系或操作之间的依赖关系一致。示例,如果您执行一系列操作,其中一个操作在逻辑上依赖于另一个操作的结果,则任何后续读取都会反映这种依赖关系。
为了保证因果一致性,客户端端会话必须满足以下要求:
启动会话时,驱动程序必须启用因果一致性选项。该选项默认启用。
操作必须在单个线程的单个会话中运行。否则,会话或线程必须相互传达optime和集群时间值。 要查看传达这些值的两个会话的示例,请参阅MongoDB Server手册中的因果一致性示例。
您必须使用
MAJORITY读关注(read concern)。您必须使用
MAJORITY写关注(write concern)。这是默认的写关注(write concern)值。
下表描述了因果一致会话提供的ACID 一致性保证:
保证 | 说明 |
|---|---|
读取写入操作 | 读取操作会反映之前写入操作的结果。 |
单调读取 | 读取操作不会返回反映比先前读取操作更早的数据状态的结果。 |
单调写入 | 如果写入操作必须先于其他写入操作,则服务器会先运行此写入操作。 示例,如果调用 |
读取后写入 | 如果写入操作必须在其他读取操作之后执行,服务器会先执行读取操作。 示例,如果您调用 |
提示
要学习;了解有关本节中提到的概念的更多信息,请参阅以下MongoDB Server手册条目:
事务方法
在MongoClient实例上使用startSession()方法创建ClientSession 。 然后,您可以使用ClientSession提供的方法修改会话状态。 下表详细介绍了可用于管理ACID 事务的方法:
方法 | 说明 |
|---|---|
| 在此会话上启动使用给定选项配置的新ACID 事务。 如果会话已存在正在进行的ACID 事务,则会引发异常。 要学习;了解有关此方法的更多信息,请参阅MongoDB Server手册中的 startTransaction() 页面。 |
| 结束此会话的ACID 事务事务。 如果会话没有活动ACID 事务或者ACID 事务已提交或结束,则会引发异常。 要学习;了解有关此方法的更多信息,请参阅MongoDB Server手册中的 abortTransaction() 页面。 |
| 提交此会话的ACID 事务事务。 如果会话没有活动ACID 事务或ACID 事务已结束,则会引发异常。 要学习;了解有关此方法的更多信息,请参阅MongoDB Server手册中的 commitTransaction() 页面。 |
事务示例
以下示例演示了如何创建会话、创建ACID 事务以及在一个ACID 事务中将文档插入到多个集合中。 该代码执行以下步骤:
使用
startSession()方法从客户端创建会话使用
startTransaction()方法启动ACID 事务将文档插入
restaurants和movies集合使用
commitTransaction()方法提交ACID 事务
MongoClient mongoClient = MongoClients.create(settings); MongoDatabase restaurantsDatabase = mongoClient.getDatabase("sample_restaurants"); MongoCollection<Document> restaurants = restaurantsDatabase.getCollection("restaurants"); MongoDatabase moviesDatabase = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> movies = moviesDatabase.getCollection("movies"); Mono.from(mongoClient.startSession()) .flatMap(session -> { // Begins the transaction session.startTransaction(); // Inserts documents in the given order return Mono.from(restaurants.insertOne(session, new Document("name", "Reactive Streams Pizza").append("cuisine", "Pizza"))) .then(Mono.from(movies.insertOne(session, new Document("title", "Java: Into the Streams").append("type", "Movie")))) // Commits the transaction .flatMap(result -> Mono.from(session.commitTransaction()) .thenReturn(result)) .onErrorResume(error -> Mono.from(session.abortTransaction()).then(Mono.error(error))) .doFinally(signalType -> session.close()); }) // Closes the client after the transaction completes .doFinally(signalType -> mongoClient.close()) // Prints the results of the transaction .subscribe( result -> System.out.println("Transaction succeeded"), error -> System.err.println("Transaction failed: " + error) );
注意
不支持并行操作
Java Reactive Streams驱动程序不支持在单个事务中运行并行操作。
如果您使用的是MongoDB Server v8.0 或更高版本,则可以使用批量写入操作在单个ACID 事务中对多个命名空间执行写入操作。要学习;了解更多信息,请参阅批量写入操作指南的客户端批量写入部分。
更多信息
要了解有关本指南中提到的概念的更多信息,请参阅服务器手册中的以下页面:
API 文档
要进一步了解本指南所讨论的任何类型或方法,请参阅以下 API 文档: