Docs 菜单
Docs 主页
/ / /
Kotlin 协程
/ / /

批量操作

在此页面上

  • Overview
  • 集合批量写入
  • 插入操作
  • 替换操作
  • 更新操作
  • 删除操作
  • 执行顺序
  • 客户端批量写入
  • 插入操作
  • 更新操作
  • 替换操作
  • BulkWriteOptions
  • 总结
  • MongoCollection.bulkWrite()
  • MongoClient.bulkWrite()

在本指南中,您可以学习;了解如何在Kotlin驾驶员中使用批量操作。

要执行单个创建、替换、更新或删除操作,可以使用相应的方法。 示例,要插入一个文档并替换一个文档,可以使用 insertOne()replaceOne() 方法。 使用这些方法时,客户端会对每项操作调用数据库一次。

通过使用批量写入操作,您可以通过更少的数据库调用来执行多个写入操作。 您可以在以下级别执行批量写入操作:

  • 集合:您可以使用MongoCollection.bulkWrite() 方法对单个集合执行批量写入。在此方法中,每种写入操作都需要至少一次数据库调用。 示例,MongoCollection.bulkWrite() 将多个更新操作放在一次调用中,但对数据库进行两次单独的调用以执行插入操作和替换操作。

  • 客户端:如果您的应用程序连接到MongoDB服务器版本8.0 或更高版本,则可以使用MongoClient.bulkWrite() 方法对同一集群中的多个集合和数据库执行批量写入操作。此方法在一次数据库调用中执行所有写入。

批量写入操作包含一个或多个写入操作。 要在集合级别执行批量写入操作,请将 ListWriteModel 文档传递给 MongoCollection.bulkWrite() 方法。 WriteModel 是表示写入操作的模型。

MongoCollection.bulkWrite() 方法在单独的数据库调用中执行每种写入。 示例,当您将 DeleteOneModelDeleteManyModelReplaceOneModel 对象传递给该方法时,该方法会执行两次调用:一次针对删除操作,另一次针对替换操作。

注意

当客户端将操作拆分为单独的数据库调用时,如果批量写入操作未进行排序,它可能会对操作进行重新排序以提高效率。 要学习;了解有关操作执行顺序的更多信息,请参阅执行顺序部分。

以下各节介绍如何创建和使用每个 WriteModel 文档。每节中的示例都使用 people 集合中的以下文档:

{ "_id": 1, "name": "Karen Sandoval", "age": 31 }
{ "_id": 2, "name": "William Chin", "age": 54 }
{ "_id": 8, "name": "Shayla Ray", "age": 20 }

该数据由以下Kotlin数据类进行建模:

data class Person(
@BsonId val id: Int,
val name: String,
val age: Int? = null,
val location: String? = null
)

有关本节中提到的方法和类的详情,请参阅以下 API 文档:

如需执行插入操作,请创建 InsertOneModel,指定要插入的文档。如需插入多个文档,必须为每个要插入的文档创建一个 InsertOneModel

以下示例为两个描述人物的文档创建了 InsertOneModel

val juneDoc = InsertOneModel(Person(3, "June Carrie", 17))
val kevinDoc = InsertOneModel(Person(4, "Kevin Moss", 22))

重要

执行bulkWrite()操作时, InsertOneModel无法插入collection中已存在的具有_id的文档。 在这种情况下,驱动程序会抛出一个MongoBulkWriteException

以下示例尝试插入两个文档,其中_id值为13 。由于collection中已存在_id1的文档,因此该操作会导致错误:

try {
val bulkOperations = listOf(
(InsertOneModel(Person(1, "James Smith", 13))),
(InsertOneModel(Person(3, "Colin Samuels")))
)
val bulkWrite = collection.bulkWrite(bulkOperations)
} catch (e: MongoBulkWriteException) {
println("A MongoBulkWriteException occurred with the following message: " + e.message)
}
A MongoBulkWriteException occurred with the following message:
Bulk write operation error on server sample-shard-00-02.pw0q4.mongodb.net:27017.
Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key
error collection: crudOps.bulkWrite index: _id_ dup key: { _id: 1 }', details={}}].

要了解驱动程序为何未插入_id3的文档,请参阅“执行顺序”部分。

有关本节提及的方法和类的详情,请参阅 InsertOneModel API 文档。

要执行替换操作,请创建ReplaceOneModel ,为要替换的文档和替换文档指定查询筛选器。

重要

执行bulkWrite()时, ReplaceOneModel不能进行违反collection上唯一索引约束的更改。此外,如果查询筛选器没有匹配项,则模型不会执行替换操作。

以下示例创建了一个ReplaceOneModel ,以将_id1的文档替换为包含附加location字段的文档:

val filter = Filters.eq("_id", 1)
val insert = Person(1, "Celine Stork", location = "San Diego, CA")
val doc = ReplaceOneModel(filter, insert)

如果多个文档与 ReplaceOneModel实例中指定的查询过滤匹配,则该操作将替换第一个结果。您可以在 ReplaceOptions实例中指定排序,以便在服务器执行替换操作之前对匹配的文档应用顺序,如以下代码所示:

val opts = ReplaceOptions().sort(Sorts.ascending("_id"))

有关部分提及的方法和类的更多信息,请参阅以下资源:

要执行更新操作,请创建指定查询筛选器和更新文档的UpdateOneModelUpdateManyModel

UpdateOneModel 更新与查询筛选器匹配的第一个文档,UpdateManyModel 更新与查询筛选器匹配的所有文档。

重要

执行bulkWrite()时, UpdateOneModelUpdateManyModel类型不能进行违反collection唯一索引约束的更改。此外,如果没有与查询筛选器的匹配项,则模型不会执行更新操作。

以下示例在UpdateOneModel age1 的文档中创建一个 ,以使 字段递增_id 2

val filter = Filters.eq("_id", 2)
val update = Updates.inc(Person::age.name, 1)
val doc = UpdateOneModel<Person>(filter, update)

如果多个文档与 UpdateOneModel实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在 UpdateOptions实例中指定排序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:

val opts = UpdateOptions().sort(Sorts.ascending("_id"))

有关部分提及的方法和类的更多信息,请参阅以下资源:

要执行删除操作,请创建DeleteOneModelDeleteManyModel ,为要删除的文档指定查询筛选器。

DeleteOneModel 删除与查询筛选器匹配的第一个文档,DeleteManyModel 删除与查询筛选器匹配的所有文档。

重要

执行bulkWrite()时,如果没有与查询筛选器的匹配项,则DeleteOneModelDeleteManyModel类型不会删除任何文档。

以下示例创建了一个DeleteOneModel来删除_id1的文档,并创建了一个DeleteManyModel来删除age值小于30的文档:

val deleteId1 = DeleteOneModel<Person>(Filters.eq("_id", 1))
val deleteAgeLt30 = DeleteManyModel<Person>(Filters.lt(Person::age.name, 30))

有关本节中提到的方法和类的详情,请参阅以下 API 文档:

bulkWrite()方法接受可选的BulkWriteOptions作为第二个参数,以指定是要按有序还是无序执行批量操作。

默认情况下, bulkWrite()方法按顺序执行批量操作。 这意味着,这些操作会按照您添加到列表中的顺序执行,直到出现任何错误。

以下示例执行这些批量操作:

  • 针对name"Zaynab Omar"age37的文档的插入操作

  • _id1的文档替换为包含location字段的新文档的替换操作

  • 针对_id6的文档的更新操作,用于更改name字段

  • 针对age值大于50的所有文档的删除操作

val insertMdl = InsertOneModel(Person(6, "Zaynab Omar", 37))
val replaceMdl = ReplaceOneModel(
Filters.eq("_id", 1),
Person(1, "Sandy Kane", location = "Helena, MT")
)
val updateMdl = UpdateOneModel<Person>(
Filters.eq("_id", 6),
Updates.set(Person::name.name, "Zaynab Hassan")
)
val deleteMdl = DeleteManyModel<Person>(Filters.gt(Person::age.name, 50))
val bulkOperations = listOf(
insertMdl,
replaceMdl,
updateMdl,
deleteMdl
)
val result = collection.bulkWrite(bulkOperations)

运行此示例后,集合将包含以下文档:

{ "_id": 1, "name": "Sandy Kane", "location": "Helena, MT" }
{ "_id": 8, "name": "Shayla Ray", "age": 20 }
{ "_id": 6, "name": "Zaynab Hassan", "age": 37 }

您还可以将false传递给BulkWriteOptions对象上的ordered()方法,以任意顺序执行批量操作。 这意味着无论是否出现错误,所有写入操作都会执行。 如果出现任何错误,驱动程序会在最后报告。

以下代码展示了如何执行没有执行顺序的批量操作:

val options = BulkWriteOptions().ordered(false)
val unorderedResult = collection.bulkWrite(bulkOperations, options)

注意

无序批量操作不保证执行顺序。为了优化运行时间,顺序可能与您所列顺序不同。

在前面的示例中,如果bulkWrite()方法在更新操作后执行插入操作,则更新操作不会产生更改,因为该文档当时不存在。该collection将包含以下文档:

{ "_id": 1, "name": "Sandy Kane", "location": "Helena, MT" }
{ "_id": 8, "name": "Shayla Ray", "age": 20 }
{ "_id": 6, "name": "Zaynab Omar", "age": 37 }

有关本节中提到的方法和类的详情,请参阅以下 API 文档:

连接到运行MongoDB 服务器 8.0 或更高版本的部署时,可以使用 MongoClient.bulkWrite() 方法写入同一集群中的多个数据库和集合。 MongoClient.bulkWrite() 方法在一次调用中执行所有写入。

MongoClient.bulkWrite() 方法采用 ClientNamespacedWriteModel 实例列表来表示不同的写入操作。 您可以使用实例方法构造 ClientNamespacedWriteModel 接口的实例。 示例,ClientNamespacedInsertOneModel 的实例表示插入一个文档的操作,您可以使用 ClientNamespacedWriteModel.insertOne() 方法创建此模型。

注意

批量写入错误

如果任何写入操作失败,驾驶员都会引发 ClientBulkWriteException,并且不会执行任何进一步的单个操作。 ClientBulkWriteException 包括可使用 ClientBulkWriteException.getWriteErrors() 方法访问的 BulkWriteError,其中提供了单个故障的详细信息。

下表描述了模型及其相应的实例方法。

模型
实例方法
说明
参数

ClientNamespacedInsertOneModel

insertOne()

创建一个模型以将文档插入到 namespace 中。

namespace:要写入的数据库和集合

document:要插入的文档

ClientNamespacedUpdateOneModel

updateOne()

创建一个模型以更新namespace 中与 filter 匹配的第一个文档。

namespace:要写入的数据库和集合

filter:用于选择要更新的文档的筛选器

update:更新以应用匹配文档

updatePipeline:更新管道以应用匹配文档

options:(可选)更新文档时应用的选项

您必须为 updateupdatePipeline 参数传递一个值。

ClientNamespacedUpdateManyModel

updateMany()

创建一个模型以更新namespace 中与 filter 匹配的所有文档。

namespace:要写入的数据库和集合

filter:用于选择要更新的文档的筛选器

update:更新以应用匹配文档

updatePipeline:更新管道以应用匹配文档

options:(可选)更新文档时应用的选项

您必须为 updateupdatePipeline 参数传递一个值。

ClientNamespacedReplaceOneModel

replaceOne()

创建一个模型以替换 namespace 中与 filter 匹配的第一个文档。

namespace:要写入的数据库和集合

filter:用于选择要替换的文档的筛选器

replacement:替换文档

options:(可选)替换文档时应用的选项

ClientNamespacedDeleteOneModel

deleteOne()

创建模型以删除namespace 中与 filter 匹配的第一个文档。

namespace:要写入的数据库和集合

filter:用于选择要删除的文档的筛选器

option:(可选)删除文档时应用的选项

ClientNamespacedDeleteManyModel

deleteMany()

创建模型以删除namespace 中与 filter 匹配的所有文档。

namespace:要写入的数据库和集合

filter:筛选器,用于选择要删除的文档

option:(可选)删除文档时应用的选项

以下部分举例说明如何使用客户端bulkWrite() 方法。 示例数据由以下Kotlin数据类建模:

data class Person(
@BsonId val id: Int,
val name: String,
val age: Int? = null,
)
data class Object(
@BsonId val id: Int,
val type: String,
val category: String? = null,
val manufacturer: String? = null,
)

要学习;了解有关本节中提到的方法和类的更多信息,请参阅以下API文档:

此示例演示如何使用 bulkWrite() 方法插入两个文档。 将一个文档插入到 sample_db.people集合中,而将另一文档插入到 sample_db.objects集合中。 MongoNamespace实例定义每个写入操作适用的数据库和集合。

val docsToInsert = mutableListOf<ClientNamespacedWriteModel>()
docsToInsert.add(
ClientNamespacedWriteModel
.insertOne(
MongoNamespace("sample_db", "people"),
Person(2, "Julia Smith")
)
)
docsToInsert.add(
ClientNamespacedWriteModel
.insertOne(
MongoNamespace("sample_db", "objects"),
Object(2, "washing machine")
)
)
val clientBulkResult = client.bulkWrite(docsToInsert)

以下示例展示如何使用 bulkWrite() 方法更新sample_db.peoplesample_db.objects 集合中的现有文档:

val docsToInsert = mutableListOf<ClientNamespacedWriteModel>()
docsToInsert.add(
ClientNamespacedWriteModel
.updateOne(
MongoNamespace("sample_db", "people"),
Filters.eq(Person::name.name, "Freya Polk"),
Updates.inc(Person::age.name, 1)
)
)
docsToInsert.add(
ClientNamespacedWriteModel
.updateMany(
MongoNamespace("sample_db", "objects"),
Filters.eq(Object::category.name, "electronic"),
Updates.set(Object::manufacturer.name, "Premium Technologies")
)
)
val clientBulkResult = client.bulkWrite(docsToInsert)

此示例将 people集合中 name 值为 "Freya Polk" 的文档中 age字段的值递增 1。它还将 objects集合中 category 值为 "electronic" 的所有文档的 manufacturer字段的值设置为 "Premium Technologies"

如果多个文档与 ClientNamespacedUpdateOneModel 实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在 ClientUpdateOneOptions 实例中指定排序顺序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:

val options = ClientUpdateOneOptions
.clientUpdateOneOptions()
.sort(Sorts.ascending("_id"))

以下示例演示如何使用 bulkWrite() 方法替换 sample_db.peoplesample_db.objects 集合中的现有文档。

val docsReplacements = mutableListOf<ClientNamespacedWriteModel>()
docsReplacements.add(
ClientNamespacedWriteModel
.replaceOne(
MongoNamespace("sample_db", "people"),
Filters.eq(Person::id.name, 1),
Person(1, "Frederic Hilbert")
)
)
docsReplacements.add(
ClientNamespacedWriteModel
.replaceOne(
MongoNamespace("sample_db", "objects"),
Filters.eq(Object::id.name, 1),
Object(1, "ironing board")
)
)
val clientBulkResult = client.bulkWrite(docsReplacements)

此示例成功运行后,people集合中 _id 值为 1 的文档将替换为新文档。 objects集合中 _id 值为 1 的文档将替换为新文档。

如果多个文档与 ClientNamespacedReplaceOneModel 实例中指定的查询过滤匹配,则该操作将替换第一个结果。您可以在 ClientReplaceOneOptions 实例中指定排序顺序,以便在驾驶员执行替换操作之前对匹配的文档应用顺序,如以下代码所示:

val options = ClientReplaceOneOptions
.clientReplaceOneOptions()
.sort(Sorts.ascending("_id"))

您可以将 ClientBulkWriteOptions 的实例传递给 bulkWrite() 方法,以在运行批量写入操作时指定选项。

默认情况下,批量操作中的各个操作按照指定的顺序执行,直到出现错误或成功执行。 不过,您可以将 false 传递给 ClientBulkWriteOptions 接口上的 ordered() 方法,以便以无序方式执行写入操作。 使用无序选项时,产生错误的操作不会阻止在调用 bulkWrite() 方法时执行其他写入。

以下代码在 ClientBulkWriteOptions 的实例上设置 ordered() 方法,并执行批量写入操作以插入多个文档。

val namespace = MongoNamespace("sample_db", "people")
val options = ClientBulkWriteOptions
.clientBulkWriteOptions()
.ordered(false)
val bulkOperations = listOf(
ClientNamespacedWriteModel.insertOne(
namespace,
Person(2, "Rudra Suraj")
),
// Causes duplicate key error
ClientNamespacedWriteModel.insertOne(
namespace,
Person(2, "Wendy Zhang")
),
ClientNamespacedWriteModel.insertOne(
namespace,
Person(4, "Mario Bianchi")
)
)
val result = client.bulkWrite(bulkOperations, options)

即使插入具有重复键的文档的写入操作会导致错误,也会执行其他操作,因为写入操作是无序的。

要学习;了解有关本节中提到的方法和类的更多信息,请参阅以下API文档:

要执行批量操作,您需要创建 WriteModel 实例列表并将其传递给 bulkWrite() 方法。

有 6 种不同的 WriteModel 子类型:InsertOneModelReplaceOneModelUpdateOneModelUpdateManyModelDeleteOneModelDeleteManyModel

有两种方法可以执行 bulkWrite() 方法:

  • 有序,按顺序执行批量操作,直到出现错误(如有)

  • 无序,以任何顺序执行所有批量操作,并在最后报告错误(如果有)

要学习;了解有关集合bulkWrite 命令的更多信息,请参阅数据库。 集合.bulkWrite()MongoDB服务器手册中的方法参考。

连接到运行MongoDB服务器版本 8.0 或更高版本的部署时,可以使用 MongoClient.bulkWrite() 方法同时对多个数据库和集合执行批量操作。

要执行客户端批量操作,您可以创建一个传递给此方法的 ClientNamespacedWriteModel 实例列表。

ClientNamespacedWriteModel 有六个子类型用于表示写入操作。 要构建这些写入模型,可以使用相应的 ClientNamespacedWriteModel 方法 insertOne()updateOne()updateMany()replaceOne()deleteOne()deleteMany()。 这些方法接受一个 MongoNamespace对象,该对象定义要写入的数据库和集合。

MongoClient.bulkWrite() 方法还可以接受 ClientBulkWriteOptions对象来指定命令执行方式的不同选项。

要学习;了解有关客户端bulkWrite 命令的更多信息,请参阅MongoDB服务器手册中的 bulkWrite() 方法参考。

后退

在单个操作中插入或更新