Overview
在本指南中,您可以学习如何使用 PyMongo 驱动程序执行事务。事务允许您运行一系列操作,这些操作在提交事务之前不会更改任何数据。如果事务中的任何操作返回错误,驱动程序就会取消事务,并在所有数据更改变得可见之前将其丢弃。
在MongoDB中,事务在逻辑会话中运行。会话是您打算按顺序运行的一组相关读取或写入操作。会话允许您在符合ACID的ACID 事务中运行操作,该ACID 事务满足原子性、一致性、隔离性和持久性的预期。MongoDBACID 一致性保证ACID 事务操作中涉及的数据保持一致,即使操作遇到意外错误。
使用PyMongo时,您可以从MongoClient实例创建一个新会话,并将其类型定义为ClientSession 。 我们建议您将MongoClient重复用于多个会话和事务,而不是每次都创建一个新客户端。
警告
仅应将 ClientSession 与创建它的 MongoClient(或关联的 MongoDatabase 或 MongoCollection)一起使用。将 ClientSession 与其他 MongoClient 一起使用会导致操作错误。
因果一致性(Causal Consistency)
MongoDB在客户端会话中实现因果一致性。因果一致性模型ACID 一致性保证会话中的操作按因果顺序运行。客户端观察到的结果与因果关系或操作之间的依赖关系一致。示例,如果您执行一系列操作,其中一个操作在逻辑上依赖于另一个操作的结果,则任何后续读取都会反映这种依赖关系。
注意
客户端端会话即使不执行ACID 事务,也可实现因果一致性。
下表描述了因果一致会话提供的ACID 一致性保证:
保证 | 说明 |
|---|---|
读取写入操作 | 读取操作会反映之前写入操作的结果。 |
单调读取 | 读取操作不会返回反映比先前读取操作更早的数据状态的结果。 |
单调写入 | 如果写入操作必须先于其他写入操作,则驱动程序会先运行此写入操作。 示例,如果调用 |
读取后写入 | 如果写入操作必须在其他读取操作之后进行,驱动程序会先运行读取操作。 示例,如果您调用 |
在因果一致会话中, MongoDB仅ACID 一致性保证以下操作之间的因果一致性:
具有
majority读关注(read concern)的读取操作有
majority写关注(write concern)的写入操作
提示
要学习;了解有关本节中提到的概念的更多信息,请参阅以下MongoDB Server手册条目:
样本数据
本指南中的示例使用Atlas示例数据集中的sample_restaurants.restaurants集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅PyMongo入门教程。
方法
使用start_session()方法启动会话后,可以使用返回的ClientSession提供的以下方法管理会话状态:
方法 | 说明 |
|---|---|
| 在该会话上启动使用给定选项配置的新事务。如果会话已存在正在进行的事务,则返回错误。要了解有关该方法的更多信息,请参阅服务器手册中的startTransaction() 页面。 |
| 结束此会话的ACID 事务。如果该会话没有活动ACID 事务或者ACID 事务已提交或结束,则返回错误。要学习;了解有关此方法的更多信息,请参阅服务器手册中的 abortTransaction() 页面。 |
| 提交此会话的活动事务。如果此会话没有活动事务或事务已结束,则返回错误。要了解有关此方法的更多信息,请参阅服务器手册中的 commitTransaction() 页面。 |
| 在此会话上启动事务并运行 |
| 将会话绑定到上下文经理范围内的所有数据库操作。在 |
| 完成此会话。 如果ACID 事务已启动,此方法将中止事务。 如果没有要结束的活动会话,则返回错误。 |
ClientSession还提供检索会话属性和修改可变会话属性的方法。 要学习;了解有关这些方法的更多信息,请参阅API文档。
重要
如果调用 bind(end_session=False),则必须在使用完会话后显式调用 end_session(),以免会话泄漏。以下代码显示了正确和错误的使用模式:
# Incorrect: leaks the session because no reference exists to call # end_session() on once finished with it with client.start_session().bind(end_session=False): ... # Correct: end_session=True by default, so session ends automatically with client.start_session().bind(): ... # Correct: session variable allows explicit cleanup with client.start_session() as s, s.bind(end_session=False): ... s.end_session() # Correct: nested context managers with client.start_session() as s: with s.bind(): ...
例子
以下示例显示如何通过以下步骤创建会话、创建事务以及提交多文档插入操作:
使用
start_session()方法从客户端创建会话。通过调用bind()方法将其绑定到区块中的所有操作。使用
with_transaction()方法启动事务。插入多个文档。由于会话是绑定的,因此您可以在每个操作中省略
session参数。with_transaction()方法运行插入操作并提交ACID 事务。如果任何操作导致错误,with_transaction()将取消该ACID 事务。此方法可确保在区块退出时正确关闭会话。使用
client.close()方法关闭与服务器的连接。
选择 Synchronous 或 Asynchronous标签页,查看相应的代码:
# Establishes a connection to the MongoDB server client = MongoClient("<connection string>") # Defines the database and collection restaurants_db = client["sample_restaurants"] restaurants_collection = restaurants_db["restaurants"] # Function performs the transaction def insert_documents(session): restaurants_collection_with_session = restaurants_collection.with_options( write_concern=WriteConcern("majority"), read_concern=ReadConcern("local") ) # Inserts documents within the transaction restaurants_collection_with_session.insert_one( {"name": "PyMongo Pizza", "cuisine": "Pizza"}, session=session ) restaurants_collection_with_session.insert_one( {"name": "PyMongo Burger", "cuisine": "Burger"}, session=session ) # Starts a client session async with client.start_session() as session: try: # Uses the with_transaction method to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction(insert_documents) print("Transaction succeeded") except (ConnectionFailure, OperationFailure) as e: print(f"Transaction failed: {e}") # Closes the client connection client.close()
# Establishes a connection to the MongoDB server client = AsyncMongoClient("<connection string>") # Defines the database and collection restaurants_db = client["sample_restaurants"] restaurants_collection = restaurants_db["restaurants"] # Function performs the transaction async def insert_documents(session): restaurants_collection_with_session = restaurants_collection.with_options( write_concern=WriteConcern("majority"), read_concern=ReadConcern("local") ) # Inserts documents within the transaction await restaurants_collection_with_session.insert_one( {"name": "PyMongo Pizza", "cuisine": "Pizza"}, session=session ) await restaurants_collection_with_session.insert_one( {"name": "PyMongo Burger", "cuisine": "Burger"}, session=session ) # Starts a client session with client.start_session() as session: try: # Uses the with_transaction method to start a transaction, execute the callback, and commit (or abort on error). await session.with_transaction(insert_documents) print("Transaction succeeded") except (ConnectionFailure, OperationFailure) as e: print(f"Transaction failed: {e}") # Closes the client connection await client.close()
如果需要更好地控制事务,可以使用start_transaction()方法。 您可以将此方法与上一节中描述的commit_transaction()和abort_transaction()方法结合使用,以手动管理ACID 事务生命周期。
注意
不支持并行操作
PyMongo不支持在单个ACID 事务中运行并行操作。
如果您使用的是MongoDB Server v8.0 或更高版本,则可以通过在 MongoClient实例上调用 bulk_write() 方法,在单个ACID 事务中对多个命名空间执行写入操作。有关更多信息,请参阅批量写入操作指南。
更多信息
要学习;了解有关本指南中提到的概念的更多信息,请参阅MongoDB Server手册中的以下页面:
要学习;了解有关ACID compliance的更多信息,请参阅什么是数据库管理系统中的ACID属性? MongoDB网站上的文章。
API 文档
要进一步了解本指南所讨论的任何类型或方法,请参阅以下 API 文档: