对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Docs 菜单

批量写入操作

本指南向您展示如何使用Scala驱动程序执行批量写入操作,从而在单个数据库调用中对数据进行多项更改。

考虑这样一个场景:您要插入一个文档,更新多个其他文档,然后删除一个文档。 如果使用单独的方法,则每个操作都需要调用自己的数据库。

通过使用批量写入操作,您可以通过更少的数据库调用来执行多个写入操作。 您可以在以下级别执行批量写入操作:

  • 集合:您可以使用MongoCollection.bulkWrite() 方法对单个集合执行批量写入。在此方法中,每种写入操作都需要至少一次数据库调用。 示例,MongoCollection.bulkWrite() 将多个更新操作放在一次调用中,但对数据库进行两次单独的调用以执行插入操作和替换操作。

  • 客户端:如果您的应用程序连接到MongoDB Server8.0 或更高版本,则可以使用MongoClient.bulkWrite() 方法对同一集群中的多个集合和数据库执行批量写入操作。此方法在一次数据库调用中执行所有写入。

批量写入操作包含一个或多个写入操作。 要在集合级别执行批量写入操作,请将 SeqWriteModel 文档传递给 MongoCollection.bulkWrite() 方法。 WriteModel 是表示写入操作的模型。

对于要执行的每个写入操作,请创建以下从 WriteModel 继承的类之一的实例:

  • InsertOneModel

  • UpdateOneModel

  • UpdateManyModel

  • ReplaceOneModel

  • DeleteOneModel

  • DeleteManyModel

然后,将这些实例的列表传递给bulkWrite()方法。

以下部分介绍如何创建和使用上述类的实例。 执行批量操作部分演示了如何将模型列表传递给bulkWrite()方法以执行批量操作。

本部分中的示例使用Atlas示例数据集sample_restaurants数据库中的 restaurants集合。要从Scala应用程序访问权限此集合,请创建一个连接到Atlas 集群的MongoClient,并将以下值分配给 databasecollection 变量:

val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

要学习如何创建免费的MongoDB Atlas 集群并加载示例数据集,请参阅MongoDB 入门指南

要执行插入操作,请创建一个InsertOneModel实例并指定要插入的文档。

以下示例创建了一个InsertOneModel实例:

val insertOneModel = InsertOneModel(
Document("name" -> "Blue Moon Grill",
"borough" -> "Brooklyn",
"cuisine" -> "American")
)

要插入多个文档,请为每个文档创建一个InsertOneModel实例。

重要

执行批量操作时,InsertOneModel 无法插入集合中已存在的具有 _id 的文档。在这种情况下,驱动程序会抛出 MongoBulkWriteException

要更新文档,请创建UpdateOneModel的实例并传递以下参数:

  • 查询过滤,指定用于匹配集合中文档的条件。

  • 要执行的更新操作。要学习;了解有关更新操作的更多信息,请参阅MongoDB Server手册中的字段更新操作指南。

以下示例创建了一个UpdateOneModel实例:

val updateOneFilter = equal("name", "White Horse Tavern")
val updateOneDoc = set("borough", "Queens")
val updateOneModel = UpdateOneModel(updateOneFilter, updateOneDoc)

如果多个文档与 UpdateOneModel实例中指定的查询筛选条件匹配,则该操作会更新第一个结果。您可以在 UpdateOptions实例中指定排序,以便在驱动程序执行更新操作之前对匹配的文档应用,如以下代码所示:

val options = UpdateOptions.sort(ascending("name"))

要更新多个文档,请创建 UpdateManyModel 的实例并传递与 UpdateOneModel 相同的参数。UpdateManyModel 类指定与查询过滤匹配的所有文档的更新。

以下示例创建了一个UpdateManyModel实例:

val updateManyFilter = equal("name", "Wendy's")
val updateManyDoc = set("cuisine", "Fast food")
val updateManyModel = UpdateManyModel(updateManyFilter, updateManyDoc)

替换操作会删除指定文档的所有字段和值,并将其替换为您指定的新字段和值。 要执行替换操作,请创建 ReplaceOneModel 的实例并传递以下参数:

  • 查询过滤,指定用于匹配集合中文档的条件

  • 指定要插入的新字段和值的替换文档

以下示例创建了一个ReplaceOneModel实例:

val replaceFilter = equal("name", "Cooper Town Diner")
val replaceDoc = Document("name" -> "Smith Town Diner",
"borough" -> "Brooklyn",
"cuisine" -> "American")
val replaceOneModel = ReplaceOneModel(replaceFilter, replaceDoc)

如果多个文档与 ReplaceOneModel实例中指定的查询筛选条件匹配,则该操作将替换第一个结果。您可以在 ReplaceOptions实例中指定排序,以便在驱动程序执行替换操作之前对匹配的文档应用顺序,如以下代码所示:

val options = ReplaceOptions.sort(ascending("name"))

提示

替换多个文档

要替换多个文档,请为每个文档创建一个ReplaceOneModel实例。

要删除文档,请创建DeleteOneModel的实例并传递查询过滤,指定要删除的文档。 DeleteOneModel实例提供了仅删除与查询过滤匹配的第一个文档的说明。

以下示例创建了一个DeleteOneModel实例:

val deleteOneModel = DeleteOneModel(equal("name", "Morris Park Bake Shop"))

要删除多个文档,请创建DeleteManyModel实例并传递查询过滤,指定要删除的文档。 DeleteManyModel的实例提供了删除与查询过滤匹配的所有文档的说明。

以下示例创建了一个DeleteManyModel实例:

val deleteManyModel = DeleteManyModel(equal("cuisine", "Experimental"))

为要执行的每个操作定义模型实例后,将包含模型的 Seq实例传递给 MongoCollection.bulkWrite() 方法。默认下,该方法按照模型列表指定的顺序运行操作。

以下示例使用bulkWrite()方法执行多个写入操作:

val insertOneModel = InsertOneModel(
Document("name" -> "Red's Pizza",
"borough" -> "Brooklyn",
"cuisine" -> "Pizzeria")
)
val updateOneModel = UpdateOneModel(equal("name", "Moonlit Tavern"), set("borough", "Queens"))
val deleteManyModel = DeleteManyModel(equal("name", "Crepe"))
val writes = Seq(insertOneModel, updateOneModel, deleteManyModel)
val observable = collection.bulkWrite(writes)
observable.subscribe(
(result: BulkWriteResult) => println(s"Success: $result"),
(error: Throwable) => println(s"Error: ${error.getMessage}"),
() => println("Completed")
)
Success: AcknowledgedBulkWriteResult{insertedCount=1, matchedCount=1, removedCount=1,
modifiedCount=1, upserts=[], inserts=[BulkWriteInsert{index=0, id=BsonObjectId{value=...}}]}
Completed

如果任何写入操作失败,则Scala驱动程序将引发 BulkWriteError 并且不会执行任何进一步的操作。BulkWriteError 提供了一个 details 项,其中包括失败的操作以及有关异常的详细信息。

注意

当驱动程序运行批量操作时,它会使用目标集合的写关注(write concern)。无论执行顺序如何,驱动程序在尝试所有操作后都会报告所有写关注(write concern)错误。

MongoCollection.bulkWrite() 方法可以选择接受 BulkWriteOptions 参数,该参数指定可用于配置批量写入操作的选项。如果不指定任何选项,驱动程序将使用默认设置执行批量操作。要修改写入操作的行为,请将类实例作为最后一个参数传递给 bulkWrite() 方法。

下表描述了可用于配置BulkWriteOptions实例的 setter 方法:

方法
说明

ordered()

如果 true,则驱动程序会按照提供的顺序执行写入操作。如果发生错误,则不会尝试其他操作。

如果 false,则驱动程序会以任意顺序执行操作,并尝试执行所有操作。
默认为 true

bypassDocumentValidation()

指定更新操作是否绕过文档验证。这样,您就可以更新不符合模式验证要求的文档(如果存在)。有关模式验证的更多信息,请参阅MongoDB Server手册中的模式验证。默认为
false

comment()

设置要附加到操作的注释。

let()

提供参数名称和值的映射,以便为操作设立顶级变量。值必须是常量或不引用文档字段的闭合表达式。

以下代码创建选项并将 ordered 选项设置为 false,以指定无序批量写入。 然后,该示例使用 bulkWrite() 方法执行批量操作:

val options = BulkWriteOptions().ordered(false)
val observable = collection.bulkWrite(writes, options)

如果无序批量写入中的任何写入操作失败,则Scala驱动程序仅在尝试所有操作后才会报告错误。

注意

无序批量操作不保证执行顺序。 为了优化运行时间,顺序可以与您列出的方式不同。

bulkWrite() 方法返回一个包含 BulkWriteResultSingleObservable对象。 您可以通过订阅可观察对象并使用以下方法从 BulkWriteResult实例访问权限信息:

方法
说明

wasAcknowledged()

指示服务器是否确认了写入操作。

getDeletedCount()

已删除的文档数量(如有)。

getInsertedCount()

插入的文档数量(如有)。

getInserts()

已插入文档的列表(如果有)。

getMatchedCount()

与更新匹配的文档数量(如果适用)。

getModifiedCount()

已修改文档的数量(如有)。

getUpserts()

已更新或插入的文档列表(如有)。

连接到运行MongoDB Server 8.0 或更高版本的部署时,可以使用 MongoClient.bulkWrite() 方法写入同一集群中的多个数据库和集合。 MongoClient.bulkWrite() 方法在一次调用中执行所有写入。

MongoClient.bulkWrite() 方法采用包含一个或多个 ClientNamespacedWriteModel 实例的 List 来表示不同的写入操作。您可以使用实例方法构造 ClientNamespacedWriteModel 接口的实例。示例,ClientNamespacedInsertOneModel 的实例表示插入一个文档的操作,您可以使用 ClientNamespacedWriteModel.insertOne() 方法创建此模型。

下表描述了模型及其相应的实例方法:

模型
实例方法
说明
参数

ClientNamespacedInsertOneModel

insertOne()

创建一个模型以将文档插入到 namespace 中。

namespace:要写入的数据库和集合

document:要插入的文档

ClientNamespacedUpdateOneModel

updateOne()

创建一个模型以更新namespace 中与 filter 匹配的第一个文档。

namespace:要写入的数据库和集合

filter:用于选择要更新的文档的筛选器

update:更新以应用匹配文档

updatePipeline:更新管道以应用匹配文档

options:(可选)更新文档时应用的选项

您必须为 updateupdatePipeline 参数传递一个值。

ClientNamespacedUpdateManyModel

updateMany()

创建一个模型以更新namespace 中与 filter 匹配的所有文档。

namespace:要写入的数据库和集合

filter:用于选择要更新的文档的筛选器

update:更新以应用匹配文档

updatePipeline:更新管道以应用匹配文档

options:(可选)更新文档时应用的选项

您必须为 updateupdatePipeline 参数传递一个值。

ClientNamespacedReplaceOneModel

replaceOne()

创建一个模型以替换 namespace 中与 filter 匹配的第一个文档。

namespace:要写入的数据库和集合

filter:用于选择要替换的文档的筛选器

replacement:替换文档

options:(可选)替换文档时应用的选项

ClientNamespacedDeleteOneModel

deleteOne()

创建模型以删除namespace 中与 filter 匹配的第一个文档。

namespace:要写入的数据库和集合

filter:用于选择要删除的文档的筛选器

option:(可选)删除文档时应用的选项

ClientNamespacedDeleteManyModel

deleteMany()

创建模型以删除namespace 中与 filter 匹配的所有文档。

namespace:要写入的数据库和集合

filter:筛选器,用于选择要删除的文档

option:(可选)删除文档时应用的选项

以下部分提供了一些示例,说明如何创建模型和使用客户端bulkWrite() 方法。

此示例演示如何创建包含插入两个文档的指令的模型。 将一个文档插入到 db.people集合中,将另一文档插入到 db.things集合中。 MongoNamespace实例定义每个写入操作适用的目标数据库和集合。

val personToInsert = ClientNamespacedWriteModel.insertOne(
MongoNamespace("db", "people"),
Document("name" -> "Julia Smith")
)
val thingToInsert = ClientNamespacedWriteModel.insertOne(
MongoNamespace("db", "things"),
Document("object" -> "washing machine")
);

以下示例展示如何使用 bulkWrite() 方法更新db.peopledb.things 集合中的现有文档:

val personUpdate = ClientNamespacedWriteModel.updateOne(
MongoNamespace("db", "people"),
equal("name", "Freya Polk"),
inc("age", 1)
)
val thingUpdate = ClientNamespacedWriteModel.updateMany(
MongoNamespace("db", "things"),
equal("category", "electronic"),
set("manufacturer", "Premium Technologies")
)

此示例将 people集合中 name 值为 "Freya Polk" 的文档中 age字段的值递增 1。它还将 things集合中 category 值为 "electronic" 的所有文档的 manufacturer字段的值设置为 "Premium Technologies"

如果多个文档与 ClientNamespacedUpdateOneModel实例中指定的查询筛选条件匹配,则该操作会更新第一个结果。您可以在 ClientUpdateOneOptions 实例中指定排序顺序,以便在驱动程序执行更新操作之前对匹配的文档应用顺序,如以下代码所示:

val options = ClientUpdateOneOptions
.clientUpdateOneOptions()
.sort(ascending("_id"))

以下示例展示了如何创建模型来替换 db.peopledb.things 集合中的现有文档:

val personReplacement = ClientNamespacedWriteModel.replaceOne(
MongoNamespace("db", "people"),
equal("_id", 1),
Document("name" -> "Frederic Hilbert")
)
val thingReplacement = ClientNamespacedWriteModel.replaceOne(
MongoNamespace("db", "things"),
equal("_id", 1),
Document("object" -> "potato")
)

前面的代码示例将以下文档替换为新文档:

  • people集合中 _id 值为 1 的文档

  • things集合中 _id 值为 1 的文档

如果多个文档与 ClientNamespacedReplaceOneModel实例中指定的查询筛选条件匹配,则该操作将替换第一个结果。您可以在 ClientReplaceOneOptions 实例中指定排序顺序,以便在驱动程序执行替换操作之前对匹配的文档应用顺序,如以下代码所示:

val options = ClientReplaceOneOptions
.clientReplaceOneOptions()
.sort(ascending("_id"))

为要执行的每个操作定义 ClientNamespacedWriteModel实例后,将这些实例的列表传递给客户端bulkWrite() 方法。默认下,该方法按照指定的顺序运行操作。

以下示例使用bulkWrite()方法执行多个写入操作:

val peopleNamespace = MongoNamespace("db", "people")
val thingsNamespace = MongoNamespace("db", "things")
val writeModels = List(
ClientNamespacedWriteModel.insertOne(
peopleNamespace,
Document("name" -> "Corey Kopper")
),
ClientNamespacedWriteModel.replaceOne(
thingsNamespace,
equal("_id", 1),
Document("object" -> "potato")
)
)
val observable = mongoClient.bulkWrite(writeModels)
observable.subscribe(
(result: ClientBulkWriteResult) => println(result.toString),
(error: Throwable) => println(s"Error: ${error.getMessage}"),
() => println("Completed")
)
AcknowledgedSummaryClientBulkWriteResult{insertedCount=1, matchedCount=1, ...}

如果任何写入操作失败,驱动程序都会引发 ClientBulkWriteException,并且不会执行任何进一步的单个操作。ClientBulkWriteException 包括可使用 ClientBulkWriteException.getWriteErrors() 方法访问的 BulkWriteError,该方法提供有关故障的信息。

您可以将 ClientBulkWriteOptions 的实例传递给 bulkWrite() 方法,以自定义驱动程序执行批量写入操作的方式。

默认情况下,驱动程序会按照您指定的顺序运行批量操作中的各个操作。驱动程序运行这些操作,直到出现错误或成功完成总批量操作。

但是,您可以在创建 ClientBulkWriteOptions实例时将 false 传递给 ordered() 方法,以指示驱动程序以无序方式执行写入操作。如果传递 false,则驱动程序会尝试运行批量写入操作中的所有写入操作,即使其中一个操作会产生错误。

以下代码在 ClientBulkWriteOptions 的实例中将 ordered 选项设置为 false,并执行批量写入操作以插入多个文档:

val namespace = MongoNamespace("db", "people")
val options = ClientBulkWriteOptions.clientBulkWriteOptions().ordered(false)
val writeModels = List(
ClientNamespacedWriteModel.insertOne(namespace, Document("_id" -> 1, "name" -> "Rudra Suraj")),
// Causes a duplicate key error
ClientNamespacedWriteModel.insertOne(namespace, Document("_id" -> 1, "name" -> "Mario Bianchi")),
ClientNamespacedWriteModel.insertOne(namespace, Document("name" -> "Wendy Zhang"))
)
val observable = mongoClient.bulkWrite(writeModels, options)

由于写入操作是无序的,因此即使插入具有重复键的文档的写入操作导致错误,驱动程序也会执行所有无错误的操作。

要了解如何执行单个写入操作,请参阅以下指南:

要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: