Overview
在本指南中,您可以学习;了解如何使用PyMongo执行批量操作。批量操作通过在单个方法中执行多个写入操作来减少对服务器的调用次数。
Collection 和 MongoClient 类均提供 bulk_write() 方法。在 Collection实例上调用 bulk_write() 时,您可以对单个集合执行多个写入操作。在 MongoClient实例上调用 bulk_write() 时,您可以跨多个命名空间执行批量写入。在MongoDB中,命名空间由数据库名称和集合名称组成,格式为 <database>.<collection>。
重要
要对 MongoClient实例执行批量操作,请确保您的应用程序满足以下要求:
使用PyMongo v4.9 或更高版本
连接到MongoDB Server v8.0 或更高版本
样本数据
本指南中的示例使用Atlas示例数据集中的 sample_restaurants.restaurants 和 sample_mflix.movies 集合。要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅PyMongo入门教程。
定义写入操作
对于要执行的每个写入操作,请创建以下操作类之一的实例:
InsertOneUpdateOneUpdateManyReplaceOneDeleteOneDeleteMany
然后,将这些实例的列表传递给bulk_write()方法。
重要
确保将写入操作类导入到应用程序文件中,如以下代码所示:
from pymongo import InsertOne, UpdateOne, UpdateMany, ReplaceOne, DeleteOne, DeleteMany
以下部分介绍如何创建上述类的实例,您可以使用这些实例执行集合和客户端端批量操作。
插入操作
要执行插入操作,请创建 InsertOne 的实例并指定要插入的文档。将以下关键字参数传递给 InsertOne 构造函数:
namespace:要在其中插入文档的命名空间空间。如果对单个集合执行批量操作,则此参数是可选的。document:要插入的文档。
以下示例创建了一个InsertOne实例:
operation = InsertOne( namespace="sample_restaurants.restaurants", document={ "name": "Mongo's Deli", "cuisine": "Sandwiches", "borough": "Manhattan", "restaurant_id": "1234" } )
您还可以通过将自定义类的实例传递给构造函数来创建 InsertOne 的实例。如果您使用类型检查工具,这将提供额外的类型安全性。您传递的实例必须从 TypedDict 类继承。
注意
Python 3.7 及更早版本中的 TypedDict
TypedDict 类位于 typing 模块中,该模块仅在Python 3.8 及更高版本中可用。要在早期版本的Python中使用 TypedDict 类,请安装typing_extensions包。
以下示例使用自定义类构造一个 InsertOne实例,以提高类型安全性:
class Restaurant (TypedDict): name: str cuisine: str borough: str restaurant_id: str operation = pymongo.InsertOne(Restaurant( name="Mongo's Deli", cuisine="Sandwiches", borough="Manhattan", restaurant_id="1234"))
要插入多个文档,请为每个文档创建一个InsertOne实例。
注意
_id字段必须是唯一的
在MongoDB集合中,每个文档都必须包含具有唯一值的 _id字段。
如果为 _id字段指定值,则必须确保该值在集合中是唯一的。如果不指定值,驱动程序会自动为该字段生成唯一的 ObjectId 值。
我们建议让驱动程序自动生成 _id 值以确保唯一性。重复的 _id 值违反了唯一索引约束,导致驱动程序返回错误。
更新操作
要更新文档,请创建UpdateOne的实例并传入以下参数:
namespace:执行更新的命名空间空间。如果对单个集合执行批量操作,则此参数是可选的。filter:查询过滤,指定用于匹配集合中的文档的条件。update:要执行的更新。有关更新操作的更多信息,请参阅MongoDB Server手册中的字段更新操作指南。
UpdateOne 更新与查询筛选器匹配的第一个文档。
以下示例创建了一个UpdateOne实例:
operation = UpdateOne( namespace="sample_restaurants.restaurants", filter={ "name": "Mongo's Deli" }, update={ "$set": { "cuisine": "Sandwiches and Salads" }} )
要更新多个文档,请创建UpdateMany的实例并传入相同的参数。 UpdateMany更新与查询过滤匹配的所有文档。
以下示例创建了一个UpdateMany实例:
operation = UpdateMany( namespace="sample_restaurants.restaurants", filter={ "name": "Mongo's Deli" }, update={ "$set": { "cuisine": "Sandwiches and Salads" }} )
替换操作
替换操作会删除指定文档的所有字段和值,然后替换为新的字段和值。要执行替换操作,请创建 ReplaceOne 的实例并传入以下参数:
namespace:要在其中执行替换操作的命名空间。如果对单个集合执行批量操作,则此参数是可选的。filter:查询过滤,指定用于匹配要替换的文档的条件。replacement:包含要存储在匹配文档中的新字段和值的文档。
以下示例创建了一个ReplaceOne实例:
operation = ReplaceOne( namespace="sample_restaurants.restaurants", filter={ "restaurant_id": "1234" }, replacement={ "name": "Mongo's Pizza", "cuisine": "Pizza", "borough": "Brooklyn", "restaurant_id": "5678" } )
您还可以通过将自定义类的实例传递给构造函数来创建 ReplaceOne 的实例。如果您使用类型检查工具,这将提供额外的类型安全性。您传递的实例必须从 TypedDict 类继承。
注意
Python 3.7 及更早版本中的 TypedDict
TypedDict 类位于 typing 模块中,该模块仅在Python 3.8 及更高版本中可用。要在早期版本的Python中使用 TypedDict 类,请安装typing_extensions包。
以下示例使用自定义类构造了一个 ReplaceOne实例,以提高类型安全性:
class Restaurant (TypedDict): name: str cuisine: str borough: str restaurant_id: str operation = pymongo.ReplaceOne( { "restaurant_id": "1234" }, Restaurant(name="Mongo's Pizza", cuisine="Pizza", borough="Brooklyn", restaurant_id="5678") )
要替换多个文档,必须为每个文档创建一个ReplaceOne实例。
删除操作
要删除文档,请创建 DeleteOne 的实例并传入以下参数:
namespace:要删除文档的命名空间空间。如果对单个集合执行批量操作,则此参数是可选的。filter:查询过滤,指定用于匹配要删除的文档的条件。
DeleteOne 仅删除与查询过滤匹配的第一个文档。
以下示例创建了一个DeleteOne实例:
operation = DeleteOne( namespace="sample_restaurants.restaurants", filter={ "restaurant_id": "5678" } )
要删除多个文档,请创建 DeleteMany实例并传入命名空间和查询过滤,指定要删除的文档。DeleteMany 会删除与查询过滤匹配的所有文档。
以下示例创建了一个DeleteMany实例:
operation = DeleteMany( namespace="sample_restaurants.restaurants", filter={ "name": "Mongo's Deli" } )
调用 bulk_write() 方法
为要执行的每个操作定义类实例后,将这些实例的列表传递给 bulk_write() 方法。在 Collection实例上调用 bulk_write() 方法以写入单个集合,或在 MongoClient实例上调用 方法以写入多个命名空间。
如果对 Collection 调用的任何写入操作失败, PyMongo将引发 BulkWriteError 并且不会执行任何进一步的操作。BulkWriteError 提供了一个 details 属性,其中包括失败的操作以及有关异常的详细信息。
如果对 MongoClient 调用的任何写入操作失败, PyMongo将引发 ClientBulkWriteException 并且不会执行任何进一步的操作。ClientBulkWriteException 提供了 error 属性,其中包括有关异常的信息。
注意
当PyMongo运行批量操作时,它会使用运行该操作的集合或客户端的 write_concern。您还可以在使用 MongoClient.bulk_write() 方法时为操作设置设立写关注(write concern)。无论执行顺序如何,驱动程序在尝试所有操作后都会报告所有写关注(write concern)错误。
要学习;了解有关写入关注的更多信息,请参阅MongoDB Server手册中的写关注。
集合批量写入示例
以下示例通过在 Collection实例上使用 bulk_write() 方法对 restaurants集合执行多个写入操作。选择Synchronous或Asynchronous标签页以查看相应的代码:
operations = [ InsertOne( document={ "name": "Mongo's Deli", "cuisine": "Sandwiches", "borough": "Manhattan", "restaurant_id": "1234" } ), InsertOne( document={ "name": "Mongo's Deli", "cuisine": "Sandwiches", "borough": "Brooklyn", "restaurant_id": "5678" } ), UpdateMany( filter={ "name": "Mongo's Deli" }, update={ "$set": { "cuisine": "Sandwiches and Salads" }} ), DeleteOne( filter={ "restaurant_id": "1234" } ) ] results = restaurants.bulk_write(operations) print(results)
BulkWriteResult({'writeErrors': [], 'writeConcernErrors': [], 'nInserted': 2, 'nUpserted': 0, 'nMatched': 2, 'nModified': 2, 'nRemoved': 1, 'upserted': []}, acknowledged=True)
operations = [ InsertOne( document={ "name": "Mongo's Deli", "cuisine": "Sandwiches", "borough": "Manhattan", "restaurant_id": "1234" } ), InsertOne( document={ "name": "Mongo's Deli", "cuisine": "Sandwiches", "borough": "Brooklyn", "restaurant_id": "5678" } ), UpdateMany( filter={ "name": "Mongo's Deli" }, update={ "$set": { "cuisine": "Sandwiches and Salads" }} ), DeleteOne( filter={ "restaurant_id": "1234" } ) ] results = await restaurants.bulk_write(operations) print(results)
BulkWriteResult({'writeErrors': [], 'writeConcernErrors': [], 'nInserted': 2, 'nUpserted': 0, 'nMatched': 2, 'nModified': 2, 'nRemoved': 1, 'upserted': []}, acknowledged=True)
客户端批量写入示例
以下示例通过在 MongoClient实例上使用 bulk_write() 方法,对 sample_restaurants.restaurants 和 sample_mflix.movies 命名空间执行多个写入操作。选择Synchronous或Asynchronous标签页以查看相应的代码:
operations = [ InsertOne( namespace="sample_mflix.movies", document={ "title": "Minari", "runtime": 217, "genres": ["Drama", "Comedy"] } ), UpdateOne( namespace="sample_mflix.movies", filter={ "title": "Minari" }, update={ "$set": { "runtime": 117 }} ), DeleteMany( namespace="sample_restaurants.restaurants", filter={ "cuisine": "French" } ) ] results = client.bulk_write(operations) print(results)
ClientBulkWriteResult({'anySuccessful': True, 'error': None, 'writeErrors': [], 'writeConcernErrors': [], 'nInserted': 1, 'nUpserted': 0, 'nMatched': 1, 'nModified': 1, 'nDeleted': 344, 'insertResults': {}, 'updateResults': {}, 'deleteResults': {}}, acknowledged=True, verbose=False)
operations = [ InsertOne( namespace="sample_mflix.movies", document={ "title": "Minari", "runtime": 217, "genres": ["Drama", "Comedy"] } ), UpdateOne( namespace="sample_mflix.movies", filter={ "title": "Minari" }, update={ "$set": { "runtime": 117 }} ), DeleteMany( namespace="sample_restaurants.restaurants", filter={ "cuisine": "French" } ) ] results = await client.bulk_write(operations) print(results)
ClientBulkWriteResult({'anySuccessful': True, 'error': None, 'writeErrors': [], 'writeConcernErrors': [], 'nInserted': 1, 'nUpserted': 0, 'nMatched': 1, 'nModified': 1, 'nDeleted': 344, 'insertResults': {}, 'updateResults': {}, 'deleteResults': {}}, acknowledged=True, verbose=False)
自定义批量写入操作
bulk_write() 方法可以选择接受其他参数,这些参数表示可用于配置批量写入操作的选项。
集合批量写入选项
下表描述了可以传递给 Collection.bulk_write() 方法的选项:
属性 | 说明 |
|---|---|
| 如果 |
| |
|
|
| 要附加到操作的注释。 有关更多信息,请参阅 MongoDB Server 手册中的删除命令字段指南。 |
| 参数名称和值的映射。 值必须是常量或不引用文档字段的闭合表达式。 有关更多信息,请参阅 MongoDB Server 手册中的let 语句。 |
以下示例调用前面的集合批量写入示例中的 bulk_write() 方法,但将 ordered 选项设置为 False。选择Synchronous或Asynchronous标签页以查看相应的代码:
results = restaurants.bulk_write(operations, ordered=False)
results = await restaurants.bulk_write(operations, ordered=False)
如果无序批量写入中的任何写入操作失败, PyMongo仅在尝试所有操作后才会报告错误。
注意
无序批量操作不保证执行顺序。 为了优化运行时间,顺序可以与您列出的方式不同。
客户端批量写入选项
下表描述了可以传递给 MongoClient.bulk_write() 方法的选项:
属性 | 说明 |
|---|---|
|
|
| 如果 |
| 指定操作是否返回每个成功操作的详细结果。 |
| |
| 要附加到操作的注释。 有关更多信息,请参阅 MongoDB Server 手册中的删除命令字段指南。 |
| 参数名称和值的映射。 值必须是常量或不引用文档字段的闭合表达式。 有关更多信息,请参阅 MongoDB Server 手册中的let 语句。 |
|
以下示例调用前面客户端批量写入示例中的 bulk_write() 方法,但将 verbose_results 选项设置为 True。选择Synchronous或Asynchronous标签页以查看相应的代码:
results = client.bulk_write(operations, verbose_results=True)
ClientBulkWriteResult({'anySuccessful': True, 'error': None, 'writeErrors': [], 'writeConcernErrors': [], 'nInserted': 1, 'nUpserted': 0, 'nMatched': 1, 'nModified': 1, 'nDeleted': 344, 'insertResults': {0: InsertOneResult(ObjectId('...'), acknowledged=True)}, 'updateResults': {1: UpdateResult({'ok': 1.0, 'idx': 1, 'n': 1, 'nModified': 1}, acknowledged=True)}, 'deleteResults': {2: DeleteResult({'ok': 1.0, 'idx': 2, 'n': 344}, acknowledged=True)}}, acknowledged=True, verbose=True)
results = await client.bulk_write(operations, verbose_results=True)
ClientBulkWriteResult({'anySuccessful': True, 'error': None, 'writeErrors': [], 'writeConcernErrors': [], 'nInserted': 1, 'nUpserted': 0, 'nMatched': 1, 'nModified': 1, 'nDeleted': 344, 'insertResults': {0: InsertOneResult(ObjectId('...'), acknowledged=True)}, 'updateResults': {1: UpdateResult({'ok': 1.0, 'idx': 1, 'n': 1, 'nModified': 1}, acknowledged=True)}, 'deleteResults': {2: DeleteResult({'ok': 1.0, 'idx': 2, 'n': 344}, acknowledged=True)}}, acknowledged=True, verbose=True)
Return Values
本节介绍以下批量操作方法的返回值:
集合批量写入返回值
Collection.bulk_write()方法返回一个BulkWriteResult对象。 BulkWriteResult对象包含以下属性:
属性 | 说明 |
|---|---|
| 指示服务器是否确认了写入操作。 |
| 服务器返回的原始批量API结果。 |
| 已删除的文档数量(如有)。 |
| 插入的文档数量(如有)。 |
| 与更新匹配的文档数量(如果适用)。 |
| 已修改文档的数量(如有)。 |
| 已更新或插入的文档数量(如有)。 |
| 操作索引到已更新或已插入文档的 |
客户端批量写入返回值
MongoClient.bulk_write()方法返回一个ClientBulkWriteResult对象。 ClientBulkWriteResult对象包含以下属性:
属性 | 说明 |
|---|---|
| 指示服务器是否确认了写入操作。 |
| 服务器返回的原始批量API结果。 |
| 任何成功删除操作及其结果的映射。 |
| 已删除的文档数量(如有)。 |
| 指示返回的结果是否详细。 |
| 任何成功插入操作及其结果的映射。 |
| 插入的文档数量(如有)。 |
| 与更新匹配的文档数量(如果适用)。 |
| 已修改文档的数量(如有)。 |
| 任何成功的更新操作及其结果的映射。 |
| 已更新或插入的文档数量(如有)。 |
故障排除
客户端类型注解
如果没有为 MongoClient对象添加类型注解,类型检查器可能会显示类似于以下内容的错误:
from pymongo import MongoClient client = MongoClient() # error: Need type annotation for "client"
解决方案是将 MongoClient对象注释为 client: MongoClient 或 client: MongoClient[Dict[str, Any]]。
不兼容类型
如果您指定 MongoClient 作为类型提示,但不包含文档、键和值的数据类型,则类型检查器可能会显示类似于以下内容的错误:
error: Dict entry 0 has incompatible type "str": "int"; expected "Mapping[str, Any]": "int"
解决方案是将以下类型提示添加到 MongoClient对象:
client: MongoClient[Dict[str, Any]]
更多信息
要了解如何执行单个写入操作,请参阅以下指南:
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: