Overview
在本指南中,您可以学习;了解如何使用批量写入操作在单个数据库调用中执行多个写入操作。
考虑这样一个场景:您要插入一个文档,更新多个其他文档,然后删除一个文档。 如果使用单独的方法,则每个操作都需要调用自己的数据库。
通过使用批量写入操作,您可以通过更少的数据库调用来执行多个写入操作。 您可以在以下级别执行批量写入操作:
样本数据
本指南中的示例使用Atlas示例数据集中的sample_restaurants.restaurants集合。 要学习;了解如何创建免费的MongoDB Atlas 群集并加载示例数据集,请参阅入门教程。
重要
项目 Reactor 库
本指南使用 Project Reactor 库来使用Java Reactive Streams驱动程序方法返回的 Publisher 实例。要学习;了解有关 Project Reactor 库及其使用方法的更多信息,请参阅 Reactor 文档中的入门。要进一步学习;了解如何使用本指南中的 Project Reactor 库方法,请参阅“将数据写入MongoDB”指南。
集合批量写入
批量写入操作包含一个或多个写入操作。 要在集合级别执行批量写入操作,请将 List 的 WriteModel 文档传递给 MongoCollection.bulkWrite() 方法。 WriteModel 是表示写入操作的模型。
对于要执行的每个写入操作,请创建以下从 WriteModel 继承的类之一的实例:
InsertOneModelUpdateOneModelUpdateManyModelReplaceOneModelDeleteOneModelDeleteManyModel
以下部分介绍如何创建和使用上述类的实例。
插入操作
要执行插入操作,请创建InsertOneModel的实例并传入要插入的文档。
以下示例创建了一个InsertOneModel实例:
InsertOneModel<Document> operation = new InsertOneModel<>( new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches"));
要插入多个文档,请为每个文档创建一个InsertOneModel实例。
更新操作
要更新文档,请创建UpdateOneModel的实例并传入以下参数:
查询过滤,指定用于匹配集合中文档的条件。
要执行的更新操作。 有关更新操作的更多信息,请参阅MongoDB Server手册中的字段更新操作符指南。
以下示例创建了一个UpdateOneModel实例:
UpdateOneModel<Document> operation = new UpdateOneModel<>( eq("name", "Mongo's Deli"), set("cuisine", "Sandwiches and Salads"));
如果多个文档与 UpdateOneModel实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在 UpdateOptions实例中指定排序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:
UpdateOptions options = UpdateOptions.sort(Sorts.ascending("_id"));
要更新多个文档,请创建UpdateManyModel的实例并传入相同的参数。 UpdateManyModel更新与查询过滤匹配的所有文档。
以下示例创建了一个UpdateManyModel实例:
UpdateManyModel<Document> operation = new UpdateManyModel<>( eq("name", "Mongo's Deli"), set("cuisine", "Sandwiches and Salads"));
替换操作
替换操作会删除指定文档中除_id字段之外的所有字段和值,并替换为新字段和值。 要执行替换操作,请创建ReplaceOneModel的实例并传入查询过滤以及要存储在匹配文档中的字段和值。
以下示例创建了一个ReplaceOneModel实例:
ReplaceOneModel<Document> operation = new ReplaceOneModel<>( eq("name", "Original Pizza"), new Document("name", "Mongo's Pizza") .append("borough", "Manhattan"));
如果多个文档与 ReplaceOneModel实例中指定的查询过滤匹配,则该操作将替换第一个结果。您可以在 ReplaceOptions实例中指定排序,以便在服务器执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
ReplaceOptions options = ReplaceOptions.sort(Sorts.ascending("_id"));
提示
替换多个文档
要替换多个文档,请为每个文档创建一个ReplaceOneModel实例。
删除操作
要删除文档,请创建DeleteOneModel的实例并传入查询过滤,指定要删除的文档。 DeleteOneModel仅删除与查询过滤匹配的第一个文档。
以下示例创建了一个DeleteOneModel实例:
DeleteOneModel<Document> operation = new DeleteOneModel<>( eq("restaurant_id", "5678"));
要删除多个文档,请创建DeleteManyModel的实例并传入查询过滤,指定要删除的文档。 DeleteManyModel会删除与查询过滤匹配的所有文档。
以下示例创建了一个DeleteManyModel实例:
DeleteManyModel<Document> operation = new DeleteManyModel<>( eq("name", "Mongo's Deli"));
执行批量操作
为要执行的每个操作定义 WriteModel实例后,将这些实例的列表传递给 bulkWrite() 方法。 默认情况下,该方法按照列表中定义的顺序运行操作。
以下示例使用bulkWrite()方法执行多个写入操作:
Publisher<BulkWriteResult> bulkWritePublisher = restaurants.bulkWrite( Arrays.asList(new InsertOneModel<>( new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches") .append("borough", "Manhattan") .append("restaurant_id", "1234")), new InsertOneModel<>(new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches") .append("borough", "Brooklyn") .append("restaurant_id", "5678")), new UpdateManyModel<>(eq("name", "Mongo's Deli"), set("cuisine", "Sandwiches and Salads")), new DeleteOneModel<>(eq("restaurant_id", "1234")))); BulkWriteResult bulkResult = Mono.from(bulkWritePublisher).block(); System.out.println(bulkResult.toString());
AcknowledgedBulkWriteResult{insertedCount=2, matchedCount=2, removedCount=1, modifiedCount=2, upserts=[], inserts=[BulkWriteInsert{index=0, id=BsonObjectId{value=66a7e0a6c08025218b657208}}, BulkWriteInsert{index=1, id=BsonObjectId{value=66a7e0a6c08025218b657209}}]}
如果任何写入操作失败, Java Reactive Streams驱动程序会发出 MongoBulkWriteException 信号,并且不会执行任何进一步的单个操作。MongoBulkWriteException 包括可使用 MongoBulkWriteException.getWriteErrors() 方法访问的 BulkWriteError,其中提供了单个故障的详细信息。
注意
当Java Reactive Streams驱动程序运行批量操作时,它会使用运行该操作的集合的 writeConcern。无论执行顺序如何,驱动程序在尝试所有操作后都会报告所有写关注(write concern)错误。
自定义批量写入
BulkWriteOptions类包含修改bulkWrite()方法行为的方法。 要使用BulkWriteOptions类,请构造该类的新实例,然后调用其一个或多个方法来修改写入操作。 您可以将这些方法调用链接在一起。 要修改写入操作的行为,请将类实例作为最后一个参数传递给bulkWrite()方法。
您可以使用BulkWriteOptions类中的以下方法来修改写入方法。 所有方法都是可选的。
方法 | 说明 |
|---|---|
| |
| |
| |
| |
| 如果设立为 |
以下示例调用上一示例中的bulkWrite()方法,但将ordered选项设置为False :
Publisher<BulkWriteResult> bulkWritePublisher = restaurants.bulkWrite( Arrays.asList(new InsertOneModel<>( new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches") .append("borough", "Manhattan") .append("restaurant_id", "1234")), new InsertOneModel<>(new Document("name", "Mongo's Deli") .append("cuisine", "Sandwiches") .append("borough", "Brooklyn") .append("restaurant_id", "5678")), new UpdateManyModel<>(eq("name", "Mongo's Deli"), set("cuisine", "Sandwiches and Salads")), new DeleteOneModel<>(eq("restaurant_id", "1234"))), new BulkWriteOptions().ordered(false)); BulkWriteResult bulkResult = Mono.from(bulkWritePublisher).block(); System.out.println(bulkResult.toString());
AcknowledgedBulkWriteResult{insertedCount=2, matchedCount=2, removedCount=1, modifiedCount=2, upserts=[], inserts=[BulkWriteInsert{index=0, id=BsonObjectId{value=66a7e03cce430c5854b6caf9}}, BulkWriteInsert{index=1, id=BsonObjectId{value=66a7e03cce430c5854b6cafa}}]}
如果无序批量写入中的任何写入操作失败, Java Reactive Streams驱动程序仅在尝试所有操作后才会报告错误。
注意
无序批量操作不保证执行顺序。 为了优化运行时间,顺序可以与您列出的方式不同。
客户端批量写入
连接到运行MongoDB Server 8.0 或更高版本的部署时,可以使用 MongoClient.bulkWrite() 方法写入同一集群中的多个数据库和集合。 MongoClient.bulkWrite() 方法在一次调用中执行所有写入。
MongoClient.bulkWrite() 方法采用 ClientNamespacedWriteModel 实例列表来表示不同的写入操作。 您可以使用实例方法构造 ClientNamespacedWriteModel 接口的实例。 示例,ClientNamespacedInsertOneModel 的实例表示插入一个文档的操作,您可以使用 ClientNamespacedWriteModel.insertOne() 方法创建此模型。
下表描述了模型及其相应的实例方法。
模型 | 实例方法 | 说明 | 参数 |
|---|---|---|---|
|
| 创建一个模型以将文档插入到 |
|
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以替换 |
|
|
| 创建模型以删除 |
|
|
| 创建模型以删除 |
|
以下部分提供了一些示例,说明如何创建模型和使用客户端bulkWrite() 方法。
插入操作
此示例演示如何创建包含插入两个文档的指令的模型。 将一个文档插入到 db.people集合中,将另一文档插入到 db.things集合中。 MongoNamespace实例定义每个写入操作适用的目标数据库和集合。
ClientNamespacedInsertOneModel personToInsert = ClientNamespacedWriteModel .insertOne( new MongoNamespace("db", "people"), new Document("name", "Julia Smith") ); ClientNamespacedInsertOneModel thingToInsert = ClientNamespacedWriteModel .insertOne( new MongoNamespace("db", "things"), new Document("object", "washing machine") );
更新操作
以下示例展示如何使用 bulkWrite() 方法更新db.people 和 db.things 集合中的现有文档:
ClientNamespacedUpdateOneModel personUpdate = ClientNamespacedWriteModel .updateOne( new MongoNamespace("db", "people"), Filters.eq("name", "Freya Polk"), Updates.inc("age", 1) ); ClientNamespacedUpdateManyModel thingUpdate = ClientNamespacedWriteModel .updateMany( new MongoNamespace("db", "things"), Filters.eq("category", "electronic"), Updates.set("manufacturer", "Premium Technologies") );
此示例将 people集合中 name 值为 "Freya Polk" 的文档中 age字段的值递增 1。它还将 things集合中 category 值为 "electronic" 的所有文档的 manufacturer字段的值设置为 "Premium Technologies"。
如果多个文档与 ClientNamespacedUpdateOneModel实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在ClientUpdateOneOptions实例中指定排序顺序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:
ClientUpdateOneOptions options = ClientUpdateOneOptions .clientUpdateOneOptions() .sort(Sorts.ascending("_id"));
替换操作
以下示例展示了如何创建模型来替换 db.people 和 db.things 集合中的现有文档:
ClientNamespacedReplaceOneModel personReplacement = ClientNamespacedWriteModel .replaceOne( new MongoNamespace("db", "people"), Filters.eq("_id", 1), new Document("name", "Frederic Hilbert") ); ClientNamespacedReplaceOneModel thingReplacement = ClientNamespacedWriteModel .replaceOne( new MongoNamespace("db", "things"), Filters.eq("_id", 1), new Document("object", "potato") );
此示例成功运行后,people集合中 _id 值为 1 的文档将替换为新文档。 things集合中 _id 值为 1 的文档将替换为新文档。
如果多个文档与 ClientNamespacedReplaceOneModel实例中指定的查询过滤匹配,则该操作将替换第一个结果。您可以在ClientReplaceOneOptions实例中指定排序顺序,以便在服务器执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
ClientReplaceOneOptions options = ClientReplaceOneOptions .clientReplaceOneOptions() .sort(Sorts.ascending("_id"));
执行批量操作
为要执行的每个操作定义 ClientNamespacedWriteModel实例后,将这些实例的列表传递给客户端bulkWrite() 方法。 默认情况下,该方法按照指定的顺序运行操作。
以下示例使用bulkWrite()方法执行多个写入操作:
MongoNamespace peopleNamespace = new MongoNamespace("db", "people"); MongoNamespace thingsNamespace = new MongoNamespace("db", "things"); List<ClientNamespacedWriteModel> bulkOperations = Arrays.asList( ClientNamespacedWriteModel .insertOne( peopleNamespace, new Document("name", "Corey Kopper") ), ClientNamespacedWriteModel .replaceOne( thingsNamespace, Filters.eq("_id", 1), new Document("object", "potato") ) ); Publisher<ClientBulkWriteResult> bulkWritePublisher = mongoClient .bulkWrite(bulkOperations); ClientBulkWriteResult clientBulkResult = Mono .from(bulkWritePublisher) .block(); System.out.println(clientBulkResult.toString());
AcknowledgedSummaryClientBulkWriteResult{insertedCount=1, matchedCount=1, ...}
如果任何写入操作失败,驱动程序都会引发 ClientBulkWriteException,并且不会执行任何进一步的单个操作。ClientBulkWriteException 包括可使用 ClientBulkWriteException.getWriteErrors() 方法访问的 BulkWriteError,其中提供了单个故障的详细信息。
自定义批量写入
您可以将 ClientBulkWriteOptions 的实例传递给 bulkWrite() 方法,以自定义驱动程序执行批量写入操作的方式。
执行顺序
默认下,驱动程序会按照您指定的顺序运行批量操作中的各个操作,直到出现错误或操作成功完成。
但是,您可以在创建 ClientBulkWriteOptions实例时将 false 传递给 ordered() 方法,以指示驱动程序以无序方式执行写入操作。使用 unordered 选项时,产生错误的操作不会阻止驱动程序在批量写入操作中运行其他写入操作。
以下代码在 ClientBulkWriteOptions 的实例中将 ordered 选项设置为 false,并执行批量写入操作以插入多个文档。
MongoNamespace namespace = new MongoNamespace("db", "people"); ClientBulkWriteOptions options = ClientBulkWriteOptions .clientBulkWriteOptions() .ordered(false); List<ClientNamespacedWriteModel> bulkOperations = Arrays.asList( ClientNamespacedWriteModel.insertOne( namespace, new Document("_id", 1).append("name", "Rudra Suraj") ), // Causes a duplicate key error ClientNamespacedWriteModel.insertOne( namespace, new Document("_id", 1).append("name", "Mario Bianchi") ), ClientNamespacedWriteModel.insertOne( namespace, new Document("name", "Wendy Zhang") ) ); Publisher<ClientBulkWriteResult> bulkWritePublisher = mongoClient .bulkWrite(bulkOperations, options);
即使插入具有重复键的文档的写入操作会导致错误,也会执行其他操作,因为写入操作是无序的。
更多信息
要了解如何执行单个写入操作,请参阅以下指南:
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: