Overview
本指南向您展示如何使用Scala驱动程序执行批量写入操作,从而在单个数据库调用中对数据进行多项更改。
考虑这样一个场景:您要插入一个文档,更新多个其他文档,然后删除一个文档。 如果使用单独的方法,则每个操作都需要调用自己的数据库。
通过使用批量写入操作,您可以通过更少的数据库调用来执行多个写入操作。 您可以在以下级别执行批量写入操作:
集合批量写入
批量写入操作包含一个或多个写入操作。 要在集合级别执行批量写入操作,请将 Seq 的 WriteModel 文档传递给 MongoCollection.bulkWrite() 方法。 WriteModel 是表示写入操作的模型。
对于要执行的每个写入操作,请创建以下从 WriteModel 继承的类之一的实例:
InsertOneModelUpdateOneModelUpdateManyModelReplaceOneModelDeleteOneModelDeleteManyModel
然后,将这些实例的列表传递给bulkWrite()方法。
以下部分介绍如何创建和使用上述类的实例。 执行批量操作部分演示了如何将模型列表传递给bulkWrite()方法以执行批量操作。
样本数据
本部分中的示例使用Atlas示例数据集的 sample_restaurants数据库中的 restaurants集合。要从Scala应用程序访问权限此集合,请创建一个连接到Atlas 集群的MongoClient,并将以下值分配给 database 和 collection 变量:
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
要学习如何创建免费的MongoDB Atlas 集群并加载示例数据集,请参阅MongoDB 入门指南。
插入操作
要执行插入操作,请创建一个InsertOneModel实例并指定要插入的文档。
以下示例创建了一个InsertOneModel实例:
val insertOneModel = InsertOneModel( Document("name" -> "Blue Moon Grill", "borough" -> "Brooklyn", "cuisine" -> "American") )
要插入多个文档,请为每个文档创建一个InsertOneModel实例。
重要
执行批量操作时,InsertOneModel 无法插入集合中已存在的具有 _id 的文档。在这种情况下,驱动程序会抛出 MongoBulkWriteException。
更新操作
要更新文档,请创建UpdateOneModel的实例并传递以下参数:
查询过滤,指定用于匹配集合中文档的条件。
要执行的更新操作。要学习;了解有关更新操作的更多信息,请参阅MongoDB Server手册中的字段更新操作指南。
以下示例创建了一个UpdateOneModel实例:
val updateOneFilter = equal("name", "White Horse Tavern") val updateOneDoc = set("borough", "Queens") val updateOneModel = UpdateOneModel(updateOneFilter, updateOneDoc)
如果多个文档与 UpdateOneModel实例中指定的查询筛选条件匹配,则该操作会更新第一个结果。您可以在 UpdateOptions实例中指定排序,以便在驱动程序执行更新操作之前对匹配的文档应用,如以下代码所示:
val options = UpdateOptions.sort(ascending("name"))
要更新多个文档,请创建 UpdateManyModel 的实例并传递与 UpdateOneModel 相同的参数。UpdateManyModel 类指定与查询过滤匹配的所有文档的更新。
以下示例创建了一个UpdateManyModel实例:
val updateManyFilter = equal("name", "Wendy's") val updateManyDoc = set("cuisine", "Fast food") val updateManyModel = UpdateManyModel(updateManyFilter, updateManyDoc)
替换操作
替换操作会删除指定文档的所有字段和值,并将其替换为您指定的新字段和值。 要执行替换操作,请创建 ReplaceOneModel 的实例并传递以下参数:
查询过滤,指定用于匹配集合中文档的条件
指定要插入的新字段和值的替换文档
以下示例创建了一个ReplaceOneModel实例:
val replaceFilter = equal("name", "Cooper Town Diner") val replaceDoc = Document("name" -> "Smith Town Diner", "borough" -> "Brooklyn", "cuisine" -> "American") val replaceOneModel = ReplaceOneModel(replaceFilter, replaceDoc)
如果多个文档与 ReplaceOneModel实例中指定的查询筛选条件匹配,则该操作将替换第一个结果。您可以在 ReplaceOptions实例中指定排序,以便在驱动程序执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
val options = ReplaceOptions.sort(ascending("name"))
提示
替换多个文档
要替换多个文档,请为每个文档创建一个ReplaceOneModel实例。
删除操作
要删除文档,请创建DeleteOneModel的实例并传递查询过滤,指定要删除的文档。 DeleteOneModel实例提供了仅删除与查询过滤匹配的第一个文档的说明。
以下示例创建了一个DeleteOneModel实例:
val deleteOneModel = DeleteOneModel(equal("name", "Morris Park Bake Shop"))
要删除多个文档,请创建DeleteManyModel实例并传递查询过滤,指定要删除的文档。 DeleteManyModel的实例提供了删除与查询过滤匹配的所有文档的说明。
以下示例创建了一个DeleteManyModel实例:
val deleteManyModel = DeleteManyModel(equal("cuisine", "Experimental"))
执行批量操作
为要执行的每个操作定义模型实例后,将包含模型的 Seq实例传递给 MongoCollection.bulkWrite() 方法。默认下,该方法按照模型列表指定的顺序运行操作。
以下示例使用bulkWrite()方法执行多个写入操作:
val insertOneModel = InsertOneModel( Document("name" -> "Red's Pizza", "borough" -> "Brooklyn", "cuisine" -> "Pizzeria") ) val updateOneModel = UpdateOneModel(equal("name", "Moonlit Tavern"), set("borough", "Queens")) val deleteManyModel = DeleteManyModel(equal("name", "Crepe")) val writes = Seq(insertOneModel, updateOneModel, deleteManyModel) val observable = collection.bulkWrite(writes) observable.subscribe( (result: BulkWriteResult) => println(s"Success: $result"), (error: Throwable) => println(s"Error: ${error.getMessage}"), () => println("Completed") )
Success: AcknowledgedBulkWriteResult{insertedCount=1, matchedCount=1, removedCount=1, modifiedCount=1, upserts=[], inserts=[BulkWriteInsert{index=0, id=BsonObjectId{value=...}}]} Completed
如果任何写入操作失败,则Scala驱动程序将引发 BulkWriteError 并且不会执行任何进一步的操作。BulkWriteError 提供了一个 details 项,其中包括失败的操作以及有关异常的详细信息。
注意
当驱动程序运行批量操作时,它会使用目标集合的写关注(write concern)。无论执行顺序如何,驱动程序在尝试所有操作后都会报告所有写关注(write concern)错误。
自定义批量写入
MongoCollection.bulkWrite() 方法可以选择接受 BulkWriteOptions 参数,该参数指定可用于配置批量写入操作的选项。如果不指定任何选项,驱动程序将使用默认设置执行批量操作。要修改写入操作的行为,请将类实例作为最后一个参数传递给 bulkWrite() 方法。
下表描述了可用于配置BulkWriteOptions实例的 setter 方法:
方法 | 说明 |
|---|---|
| 如果 |
| |
| 设置要附加到操作的注释。 |
| 提供参数名称和值的映射,以便为操作设立顶级变量。值必须是常量或不引用文档字段的闭合表达式。 |
以下代码创建选项并将 ordered 选项设置为 false,以指定无序批量写入。 然后,该示例使用 bulkWrite() 方法执行批量操作:
val options = BulkWriteOptions().ordered(false) val observable = collection.bulkWrite(writes, options)
如果无序批量写入中的任何写入操作失败,则Scala驱动程序仅在尝试所有操作后才会报告错误。
注意
无序批量操作不保证执行顺序。 为了优化运行时间,顺序可以与您列出的方式不同。
返回值
bulkWrite() 方法返回一个包含 BulkWriteResult 的 SingleObservable对象。 您可以通过订阅可观察对象并使用以下方法从 BulkWriteResult实例访问权限信息:
方法 | 说明 |
|---|---|
| 指示服务器是否确认了写入操作。 |
| 已删除的文档数量(如有)。 |
| 插入的文档数量(如有)。 |
| 已插入文档的列表(如果有)。 |
| 与更新匹配的文档数量(如果适用)。 |
| 已修改文档的数量(如有)。 |
| 已更新或插入的文档列表(如有)。 |
客户端批量写入
连接到运行MongoDB Server 8.0 或更高版本的部署时,可以使用 MongoClient.bulkWrite() 方法写入同一集群中的多个数据库和集合。 MongoClient.bulkWrite() 方法在一次调用中执行所有写入。
MongoClient.bulkWrite() 方法采用包含一个或多个 ClientNamespacedWriteModel 实例的 List 来表示不同的写入操作。您可以使用实例方法构造 ClientNamespacedWriteModel 接口的实例。示例,ClientNamespacedInsertOneModel 的实例表示插入一个文档的操作,您可以使用 ClientNamespacedWriteModel.insertOne() 方法创建此模型。
下表描述了模型及其相应的实例方法:
模型 | 实例方法 | 说明 | 参数 |
|---|---|---|---|
|
| 创建一个模型以将文档插入到 |
|
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以替换 |
|
|
| 创建模型以删除 |
|
|
| 创建模型以删除 |
|
以下部分提供了一些示例,说明如何创建模型和使用客户端bulkWrite() 方法。
插入操作
此示例演示如何创建包含插入两个文档的指令的模型。 将一个文档插入到 db.people集合中,将另一文档插入到 db.things集合中。 MongoNamespace实例定义每个写入操作适用的目标数据库和集合。
val personToInsert = ClientNamespacedWriteModel.insertOne( MongoNamespace("db", "people"), Document("name" -> "Julia Smith") ) val thingToInsert = ClientNamespacedWriteModel.insertOne( MongoNamespace("db", "things"), Document("object" -> "washing machine") );
更新操作
以下示例展示如何使用 bulkWrite() 方法更新db.people 和 db.things 集合中的现有文档:
val personUpdate = ClientNamespacedWriteModel.updateOne( MongoNamespace("db", "people"), equal("name", "Freya Polk"), inc("age", 1) ) val thingUpdate = ClientNamespacedWriteModel.updateMany( MongoNamespace("db", "things"), equal("category", "electronic"), set("manufacturer", "Premium Technologies") )
此示例将 people集合中 name 值为 "Freya Polk" 的文档中 age字段的值递增 1。它还将 things集合中 category 值为 "electronic" 的所有文档的 manufacturer字段的值设置为 "Premium Technologies"。
如果多个文档与 ClientNamespacedUpdateOneModel实例中指定的查询筛选条件匹配,则该操作会更新第一个结果。您可以在 ClientUpdateOneOptions 实例中指定排序顺序,以便在驱动程序执行更新操作之前对匹配的文档应用顺序,如以下代码所示:
val options = ClientUpdateOneOptions .clientUpdateOneOptions() .sort(ascending("_id"))
替换操作
以下示例展示了如何创建模型来替换 db.people 和 db.things 集合中的现有文档:
val personReplacement = ClientNamespacedWriteModel.replaceOne( MongoNamespace("db", "people"), equal("_id", 1), Document("name" -> "Frederic Hilbert") ) val thingReplacement = ClientNamespacedWriteModel.replaceOne( MongoNamespace("db", "things"), equal("_id", 1), Document("object" -> "potato") )
前面的代码示例将以下文档替换为新文档:
people集合中_id值为1的文档things集合中_id值为1的文档
如果多个文档与 ClientNamespacedReplaceOneModel实例中指定的查询筛选条件匹配,则该操作将替换第一个结果。您可以在 ClientReplaceOneOptions 实例中指定排序顺序,以便在驱动程序执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
val options = ClientReplaceOneOptions .clientReplaceOneOptions() .sort(ascending("_id"))
执行批量操作
为要执行的每个操作定义 ClientNamespacedWriteModel实例后,将这些实例的列表传递给客户端bulkWrite() 方法。默认下,该方法按照指定的顺序运行操作。
以下示例使用bulkWrite()方法执行多个写入操作:
val peopleNamespace = MongoNamespace("db", "people") val thingsNamespace = MongoNamespace("db", "things") val writeModels = List( ClientNamespacedWriteModel.insertOne( peopleNamespace, Document("name" -> "Corey Kopper") ), ClientNamespacedWriteModel.replaceOne( thingsNamespace, equal("_id", 1), Document("object" -> "potato") ) ) val observable = mongoClient.bulkWrite(writeModels) observable.subscribe( (result: ClientBulkWriteResult) => println(result.toString), (error: Throwable) => println(s"Error: ${error.getMessage}"), () => println("Completed") )
AcknowledgedSummaryClientBulkWriteResult{insertedCount=1, matchedCount=1, ...}
如果任何写入操作失败,驱动程序都会引发 ClientBulkWriteException,并且不会执行任何进一步的单个操作。ClientBulkWriteException 包括可使用 ClientBulkWriteException.getWriteErrors() 方法访问的 BulkWriteError,该方法提供有关故障的信息。
自定义批量写入
您可以将 ClientBulkWriteOptions 的实例传递给 bulkWrite() 方法,以自定义驱动程序执行批量写入操作的方式。
执行顺序
默认情况下,驱动程序会按照您指定的顺序运行批量操作中的各个操作。驱动程序运行这些操作,直到出现错误或成功完成总批量操作。
但是,您可以在创建 ClientBulkWriteOptions实例时将 false 传递给 ordered() 方法,以指示驱动程序以无序方式执行写入操作。如果传递 false,则驱动程序会尝试运行批量写入操作中的所有写入操作,即使其中一个操作会产生错误。
以下代码在 ClientBulkWriteOptions 的实例中将 ordered 选项设置为 false,并执行批量写入操作以插入多个文档:
val namespace = MongoNamespace("db", "people") val options = ClientBulkWriteOptions.clientBulkWriteOptions().ordered(false) val writeModels = List( ClientNamespacedWriteModel.insertOne(namespace, Document("_id" -> 1, "name" -> "Rudra Suraj")), // Causes a duplicate key error ClientNamespacedWriteModel.insertOne(namespace, Document("_id" -> 1, "name" -> "Mario Bianchi")), ClientNamespacedWriteModel.insertOne(namespace, Document("name" -> "Wendy Zhang")) ) val observable = mongoClient.bulkWrite(writeModels, options)
由于写入操作是无序的,因此即使插入具有重复键的文档的写入操作导致错误,驱动程序也会执行所有无错误的操作。
更多信息
要了解如何执行单个写入操作,请参阅以下指南:
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: