Overview
在本指南中,您可以学习如何在Kotlin驱动程序中使用批量操作。
要执行单个创建、替换、更新或删除操作,可以使用相应的方法。 示例,要插入一个文档并替换一个文档,可以使用 insertOne() 和 replaceOne() 方法。 使用这些方法时,客户端会对每项操作调用数据库一次。
通过使用批量写入操作,您可以通过更少的数据库调用来执行多个写入操作。 您可以在以下级别执行批量写入操作:
集合批量写入
批量写入操作包含一个或多个写入操作。 要在集合级别执行批量写入操作,请将 List 的 WriteModel 文档传递给 MongoCollection.bulkWrite() 方法。 WriteModel 是表示写入操作的模型。
MongoCollection.bulkWrite() 方法在单独的数据库调用中执行每种写入。 示例,当您将 DeleteOneModel、DeleteManyModel 和 ReplaceOneModel 对象传递给该方法时,该方法会执行两次调用:一次针对删除操作,另一次针对替换操作。
以下各节介绍如何创建和使用每个 WriteModel 文档。每节中的示例都使用 people 集合中的以下文档:
{ "_id": 1, "name": "Karen Sandoval", "age": 31 } { "_id": 2, "name": "William Chin", "age": 54 } { "_id": 8, "name": "Shayla Ray", "age": 20 }
该数据由以下Kotlin数据类进行建模:
data class Person( val id: Int, val name: String, val age: Int? = null, val location: String? = null )
有关本节中提到的方法和类的详情,请参阅以下 API 文档:
插入操作
如需执行插入操作,请创建 InsertOneModel,指定要插入的文档。如需插入多个文档,必须为每个要插入的文档创建一个 InsertOneModel。
例子
以下示例为两个描述人物的文档创建了 InsertOneModel:
val juneDoc = InsertOneModel(Person(3, "June Carrie", 17)) val kevinDoc = InsertOneModel(Person(4, "Kevin Moss", 22))
重要
执行bulkWrite()操作时, InsertOneModel无法插入collection中已存在的具有_id的文档。 在这种情况下,驱动程序会抛出一个MongoBulkWriteException 。
以下示例尝试插入两个文档,其中_id值为1和3 。由于collection中已存在_id为1的文档,因此该操作会导致错误:
try { val bulkOperations = listOf( (InsertOneModel(Person(1, "James Smith", 13))), (InsertOneModel(Person(3, "Colin Samuels"))) ) val bulkWrite = collection.bulkWrite(bulkOperations) } catch (e: MongoBulkWriteException) { println("A MongoBulkWriteException occurred with the following message: " + e.message) }
A MongoBulkWriteException occurred with the following message: Bulk write operation error on server sample-shard-00-02.pw0q4.mongodb.net:27017. Write errors: [BulkWriteError{index=0, code=11000, message='E11000 duplicate key error collection: crudOps.bulkWrite index: _id_ dup key: { _id: 1 }', details={}}].
要了解驱动程序为何未插入_id为3的文档,请参阅“执行顺序”部分。
有关本节提及的方法和类的详情,请参阅 InsertOneModel API 文档。
替换操作
要执行替换操作,请创建ReplaceOneModel ,为要替换的文档和替换文档指定查询筛选器。
重要
执行bulkWrite()时, ReplaceOneModel不能进行违反collection上唯一索引约束的更改。此外,如果查询筛选器没有匹配项,则模型不会执行替换操作。
例子
以下示例创建了一个ReplaceOneModel ,以将_id为1的文档替换为包含附加location字段的文档:
val filter = Filters.eq("_id", 1) val insert = Person(1, "Celine Stork", location = "San Diego, CA") val doc = ReplaceOneModel(filter, insert)
如果多个文档与 ReplaceOneModel实例中指定的查询过滤匹配,则该操作将替换第一个结果。您可以在 ReplaceOptions实例中指定排序,以便在服务器执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
val opts = ReplaceOptions().sort(Sorts.ascending("_id"))
有关部分提及的方法和类的更多信息,请参阅以下资源:
ReplaceOneModel API 文档
ReplaceOptions API文档
更新操作
要执行更新操作,请创建指定查询筛选器和更新文档的UpdateOneModel或UpdateManyModel 。
UpdateOneModel 更新与查询筛选器匹配的第一个文档,UpdateManyModel 更新与查询筛选器匹配的所有文档。
重要
执行bulkWrite()时, UpdateOneModel和UpdateManyModel类型不能进行违反collection唯一索引约束的更改。此外,如果没有与查询筛选器的匹配项,则模型不会执行更新操作。
例子
以下示例在UpdateOneModel age为1 的文档中创建一个 ,以使 字段递增_id 2:
val filter = Filters.eq("_id", 2) val update = Updates.inc(Person::age.name, 1) val doc = UpdateOneModel<Person>(filter, update)
如果多个文档与 UpdateOneModel实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在 UpdateOptions实例中指定排序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:
val opts = UpdateOptions().sort(Sorts.ascending("_id"))
有关部分提及的方法和类的更多信息,请参阅以下资源:
UpdateOneModel API documentation
UpdateManyModel API 文档
UpdateOptions API documentation
唯一索引服务器手册说明
删除操作
要执行删除操作,请创建DeleteOneModel或DeleteManyModel ,为要删除的文档指定查询筛选器。
DeleteOneModel 删除与查询筛选器匹配的第一个文档,DeleteManyModel 删除与查询筛选器匹配的所有文档。
重要
执行bulkWrite()时,如果没有与查询筛选器的匹配项,则DeleteOneModel和DeleteManyModel类型不会删除任何文档。
例子
以下示例创建了一个DeleteOneModel来删除_id为1的文档,并创建了一个DeleteManyModel来删除age值小于30的文档:
val deleteId1 = DeleteOneModel<Person>(Filters.eq("_id", 1)) val deleteAgeLt30 = DeleteManyModel<Person>(Filters.lt(Person::age.name, 30))
有关本节中提到的方法和类的详情,请参阅以下 API 文档:
执行顺序
bulkWrite()方法接受可选的BulkWriteOptions作为第二个参数,以指定是要按有序还是无序执行批量操作。
命令执行
默认情况下, bulkWrite()方法按顺序执行批量操作。 这意味着,这些操作会按照您添加到列表中的顺序执行,直到出现任何错误。
例子
以下示例执行这些批量操作:
针对
name为"Zaynab Omar"且age为37的文档的插入操作将
_id为1的文档替换为包含location字段的新文档的替换操作针对
_id为6的文档的更新操作,用于更改name字段针对
age值大于50的所有文档的删除操作
val insertMdl = InsertOneModel(Person(6, "Zaynab Omar", 37)) val replaceMdl = ReplaceOneModel( Filters.eq("_id", 1), Person(1, "Sandy Kane", location = "Helena, MT") ) val updateMdl = UpdateOneModel<Person>( Filters.eq("_id", 6), Updates.set(Person::name.name, "Zaynab Hassan") ) val deleteMdl = DeleteManyModel<Person>(Filters.gt(Person::age.name, 50)) val bulkOperations = listOf( insertMdl, replaceMdl, updateMdl, deleteMdl ) val result = collection.bulkWrite(bulkOperations)
运行此示例后,集合将包含以下文档:
{ "_id": 1, "name": "Sandy Kane", "location": "Helena, MT" } { "_id": 8, "name": "Shayla Ray", "age": 20 } { "_id": 6, "name": "Zaynab Hassan", "age": 37 }
无序执行
您还可以将false传递给BulkWriteOptions对象上的ordered()方法,以任意顺序执行批量操作。 这意味着无论是否出现错误,所有写入操作都会执行。 如果出现任何错误,驱动程序会在最后报告。
以下代码展示了如何执行没有执行顺序的批量操作:
val options = BulkWriteOptions().ordered(false) val unorderedResult = collection.bulkWrite(bulkOperations, options)
注意
无序批量操作不保证执行顺序。为了优化运行时间,顺序可能与您所列顺序不同。
在前面的示例中,如果bulkWrite()方法在更新操作后执行插入操作,则更新操作不会产生更改,因为该文档当时不存在。该collection将包含以下文档:
{ "_id": 1, "name": "Sandy Kane", "location": "Helena, MT" } { "_id": 8, "name": "Shayla Ray", "age": 20 } { "_id": 6, "name": "Zaynab Omar", "age": 37 }
有关本节中提到的方法和类的详情,请参阅以下 API 文档:
客户端批量写入
连接到运行MongoDB 服务器 8.0 或更高版本的部署时,可以使用 MongoClient.bulkWrite() 方法写入同一集群中的多个数据库和集合。 MongoClient.bulkWrite() 方法在一次调用中执行所有写入。
MongoClient.bulkWrite() 方法采用 ClientNamespacedWriteModel 实例列表来表示不同的写入操作。 您可以使用实例方法构造 ClientNamespacedWriteModel 接口的实例。 示例,ClientNamespacedInsertOneModel 的实例表示插入一个文档的操作,您可以使用 ClientNamespacedWriteModel.insertOne() 方法创建此模型。
注意
批量写入错误
如果任何写入操作失败,驱动程序都会引发 ClientBulkWriteException,并且不会执行任何进一步的单个操作。ClientBulkWriteException 包括可使用 ClientBulkWriteException.getWriteErrors() 方法访问的 BulkWriteError,其中提供了单个故障的详细信息。
下表描述了模型及其相应的实例方法。
模型 | 实例方法 | 说明 | 参数 |
|---|---|---|---|
|
| 创建一个模型以将文档插入到 |
|
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以替换 |
|
|
| 创建模型以删除 |
|
|
| 创建模型以删除 |
|
以下部分举例说明如何使用客户端bulkWrite() 方法。 示例数据由以下Kotlin数据类建模:
data class Person( val id: Int, val name: String, val age: Int? = null, ) data class Object( val id: Int, val type: String, val category: String? = null, val manufacturer: String? = null, )
要学习;了解有关本节中提到的方法和类的更多信息,请参阅以下API文档:
插入操作
此示例演示如何使用 bulkWrite() 方法插入两个文档。 将一个文档插入到 sample_db.people集合中,而将另一文档插入到 sample_db.objects集合中。 MongoNamespace实例定义每个写入操作适用的数据库和集合。
val docsToInsert = mutableListOf<ClientNamespacedWriteModel>() docsToInsert.add( ClientNamespacedWriteModel .insertOne( MongoNamespace("sample_db", "people"), Person(2, "Julia Smith") ) ) docsToInsert.add( ClientNamespacedWriteModel .insertOne( MongoNamespace("sample_db", "objects"), Object(2, "washing machine") ) ) val clientBulkResult = client.bulkWrite(docsToInsert)
更新操作
以下示例展示如何使用 bulkWrite() 方法更新sample_db.people 和 sample_db.objects 集合中的现有文档:
val docsToInsert = mutableListOf<ClientNamespacedWriteModel>() docsToInsert.add( ClientNamespacedWriteModel .updateOne( MongoNamespace("sample_db", "people"), Filters.eq(Person::name.name, "Freya Polk"), Updates.inc(Person::age.name, 1) ) ) docsToInsert.add( ClientNamespacedWriteModel .updateMany( MongoNamespace("sample_db", "objects"), Filters.eq(Object::category.name, "electronic"), Updates.set(Object::manufacturer.name, "Premium Technologies") ) ) val clientBulkResult = client.bulkWrite(docsToInsert)
此示例将 people集合中 name 值为 "Freya Polk" 的文档中 age字段的值递增 1。它还将 objects集合中 category 值为 "electronic" 的所有文档的 manufacturer字段的值设置为 "Premium Technologies"。
如果多个文档与 ClientNamespacedUpdateOneModel实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在ClientUpdateOneOptions实例中指定排序顺序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:
val options = ClientUpdateOneOptions .clientUpdateOneOptions() .sort(Sorts.ascending("_id"))
替换操作
以下示例演示如何使用 bulkWrite() 方法替换 sample_db.people 和 sample_db.objects 集合中的现有文档。
val docsReplacements = mutableListOf<ClientNamespacedWriteModel>() docsReplacements.add( ClientNamespacedWriteModel .replaceOne( MongoNamespace("sample_db", "people"), Filters.eq(Person::id.name, 1), Person(1, "Frederic Hilbert") ) ) docsReplacements.add( ClientNamespacedWriteModel .replaceOne( MongoNamespace("sample_db", "objects"), Filters.eq(Object::id.name, 1), Object(1, "ironing board") ) ) val clientBulkResult = client.bulkWrite(docsReplacements)
此示例成功运行后,people集合中 _id 值为 1 的文档将替换为新文档。 objects集合中 _id 值为 1 的文档将替换为新文档。
如果多个文档与 ClientNamespacedReplaceOneModel实例中指定的查询筛选条件匹配,则该操作将替换第一个结果。您可以在 ClientReplaceOneOptions 实例中指定排序顺序,以便在驱动程序执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
val options = ClientReplaceOneOptions .clientReplaceOneOptions() .sort(Sorts.ascending("_id"))
BulkWriteOptions
您可以将 ClientBulkWriteOptions 的实例传递给 bulkWrite() 方法,以在运行批量写入操作时指定选项。
执行顺序
默认情况下,批量操作中的各个操作按照指定的顺序执行,直到出现错误或成功执行。 不过,您可以将 false 传递给 ClientBulkWriteOptions 接口上的 ordered() 方法,以便以无序方式执行写入操作。 使用无序选项时,产生错误的操作不会阻止在调用 bulkWrite() 方法时执行其他写入。
以下代码在 ClientBulkWriteOptions 的实例上设置 ordered() 方法,并执行批量写入操作以插入多个文档。
val namespace = MongoNamespace("sample_db", "people") val options = ClientBulkWriteOptions .clientBulkWriteOptions() .ordered(false) val bulkOperations = listOf( ClientNamespacedWriteModel.insertOne( namespace, Person(2, "Rudra Suraj") ), // Causes duplicate key error ClientNamespacedWriteModel.insertOne( namespace, Person(2, "Wendy Zhang") ), ClientNamespacedWriteModel.insertOne( namespace, Person(4, "Mario Bianchi") ) ) val result = client.bulkWrite(bulkOperations, options)
即使插入具有重复键的文档的写入操作会导致错误,也会执行其他操作,因为写入操作是无序的。
要学习;了解有关本节中提到的方法和类的更多信息,请参阅以下API文档:
总结
MongoCollection.bulkWrite()
要执行批量操作,您需要创建 WriteModel 实例列表并将其传递给 bulkWrite() 方法。
有 6 种不同的 WriteModel 子类型:InsertOneModel、ReplaceOneModel、UpdateOneModel、UpdateManyModel、DeleteOneModel 和 DeleteManyModel。
有两种方法可以执行 bulkWrite() 方法:
有序,按顺序执行批量操作,直到出现错误(如有)
无序,以任何顺序执行所有批量操作,并在最后报告错误(如果有)
要学习;了解有关集合bulkWrite 命令的更多信息,请参阅数据库。 集合.bulkWrite()MongoDB服务器手册中的方法参考。
MongoClient.bulkWrite()
连接到运行MongoDB服务器版本 8.0 或更高版本的部署时,可以使用 MongoClient.bulkWrite() 方法同时对多个数据库和集合执行批量操作。
要执行客户端批量操作,您可以创建一个传递给此方法的 ClientNamespacedWriteModel 实例列表。
ClientNamespacedWriteModel 有六个子类型用于表示写入操作。 要构建这些写入模型,可以使用相应的 ClientNamespacedWriteModel 方法 insertOne()、updateOne()、updateMany()、replaceOne()、deleteOne() 和 deleteMany()。 这些方法接受一个 MongoNamespace对象,该对象定义要写入的数据库和集合。
MongoClient.bulkWrite() 方法还可以接受 ClientBulkWriteOptions对象来指定命令执行方式的不同选项。
要学习;了解有关客户端bulkWrite 命令的更多信息,请参阅MongoDB服务器手册中的 bulkWrite() 方法参考。