Overview
在本指南中,您可以学习;了解如何使用批量写入操作在单个数据库调用中执行多个写入操作。
考虑这样一个场景:您要插入一个文档,更新多个其他文档,然后删除一个文档。 如果使用单独的方法,则每个操作都需要调用自己的数据库。
通过使用批量写入操作,您可以通过更少的数据库调用来执行多个写入操作。 您可以在以下级别执行批量写入操作:
样本数据
本指南中的示例使用Atlas示例数据集中的 sample_restaurants.restaurants 和 sample_mflix.movies 集合。要学习如何创建免费的MongoDB Atlas 集群并加载示例数据集,请参阅Atlas入门。
这些集合中的文档由以下Kotlin数据类建模:
data class Restaurant( val name: String, val borough: String, val cuisine: String, val stars: Int? = null, ) data class Movie( val title: String, val year: Int, val seen: Boolean? = null, )
集合批量写入
批量写入操作包含一个或多个写入操作。 要在集合级别执行批量写入操作,请将 List 的 WriteModel 文档传递给 MongoCollection.bulkWrite() 方法。 WriteModel 是表示写入操作的模型。
对于要执行的每个写入操作,请创建以下从 WriteModel 继承的类之一的实例:
InsertOneModelUpdateOneModelUpdateManyModelReplaceOneModelDeleteOneModelDeleteManyModel
以下部分介绍如何创建和使用上述类的实例。 执行批量操作部分演示了如何将模型列表传递给bulkWrite()方法以执行批量操作。
插入操作
要执行插入操作,请创建一个InsertOneModel实例并指定要插入的文档。
以下示例创建了一个InsertOneModel实例:
val blueMoon = InsertOneModel(Restaurant("Blue Moon Grill", "Brooklyn", "American"))
要插入多个文档,请为每个文档创建一个InsertOneModel实例。
重要
执行批量操作时,InsertOneModel 无法指示插入具有集合中已存在的 _id 值的文档。在这种情况下,驱动程序会抛出 MongoBulkWriteException。
更新操作
要更新文档,请创建UpdateOneModel的实例并传递以下参数:
查询筛选器,用于指定匹配集合中文档的条件
UpdateOneModel实例指定与查询过滤匹配的第一个文档的更新。
以下示例创建了一个UpdateOneModel实例:
val updateOneFilter = Filters.eq(Restaurant::name.name, "White Horse Tavern") val updateOneDoc = Updates.set(Restaurant::borough.name, "Queens") val tavernUpdate = UpdateOneModel<Restaurant>(updateOneFilter, updateOneDoc)
如果多个文档与 UpdateOneModel实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在 UpdateOptions实例中指定排序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:
val opts = UpdateOptions().sort(Sorts.ascending(Restaurant::name.name))
要更新多个文档,请创建UpdateManyModel的实例并传递与UpdateOneModel相同的参数。 UpdateManyModel类指定与查询过滤匹配的所有文档的更新。
以下示例创建了一个 UpdateManyModel实例,以指示驱动程序更新所有匹配的文档:
val updateManyFilter = Filters.eq(Restaurant::name.name, "Wendy's") val updateManyDoc = Updates.set(Restaurant::cuisine.name, "Fast food") val wendysUpdate = UpdateManyModel<Restaurant>(updateManyFilter, updateManyDoc)
替换操作
替换操作会删除指定文档的所有字段和值,并将其替换为您指定的新字段和值。 要执行替换操作,请创建ReplaceOneModel实例并传递查询过滤以及要用于替换匹配文档的字段和值。
以下示例创建了一个ReplaceOneModel实例:
val replaceFilter = Filters.eq(Restaurant::name.name, "Cooper Town Diner") val replaceDoc = Restaurant("Smith Town Diner", "Brooklyn", "American") val replacement = ReplaceOneModel(replaceFilter, replaceDoc)
如果多个文档与 ReplaceOneModel实例中指定的查询过滤匹配,则该操作将替换第一个结果。您可以在 ReplaceOptions实例中指定排序,以便在服务器执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
val opts = ReplaceOptions().sort(Sorts.ascending(Restaurant::name.name))
提示
替换多个文档
要替换多个文档,请为每个文档创建一个ReplaceOneModel实例。
删除操作
要删除文档,请创建DeleteOneModel的实例并传递查询过滤,指定要删除的文档。 DeleteOneModel实例提供了仅删除与查询过滤匹配的第一个文档的说明。
以下示例创建了一个DeleteOneModel实例:
val deleteOne = DeleteOneModel<Restaurant>(Filters.eq( Restaurant::name.name, "Morris Park Bake Shop" ))
要删除多个文档,请创建DeleteManyModel实例并传递查询过滤,指定要删除的文档。 DeleteManyModel的实例提供了删除与查询过滤匹配的所有文档的说明。
以下示例创建了一个DeleteManyModel实例:
val deleteMany = DeleteManyModel<Restaurant>(Filters.eq( Restaurant::cuisine.name, "Experimental" ))
执行批量操作
为要执行的每个操作定义模型实例后,将这些实例的列表传递给bulkWrite()方法。 默认,该方法按照模型列表指定的顺序运行操作。
以下示例使用bulkWrite()方法执行多个写入操作:
val insertOneMdl = InsertOneModel(Restaurant("Red's Pizza", "Brooklyn", "Pizzeria")) val updateOneMdl = UpdateOneModel<Restaurant>( Filters.eq(Restaurant::name.name, "Moonlit Tavern"), Updates.set(Restaurant::borough.name, "Queens") ) val deleteManyMdl = DeleteManyModel<Restaurant>( Filters.eq(Restaurant::name.name, "Crepe") ) val bulkResult = collection.bulkWrite( listOf(insertOneMdl, updateOneMdl, deleteManyMdl) ) println(bulkResult)
AcknowledgedBulkWriteResult{insertedCount=1, matchedCount=5, removedCount=3, modifiedCount=2, upserts=[], inserts=[BulkWriteInsert{index=0, id=BsonObjectId{value=...}}]}
如果任何写入操作失败,Kotlin 同步驱动程序都会引发 BulkWriteError,并且不会执行任何进一步的操作。BulkWriteError 提供了一个 details字段,其中包括失败的操作以及有关异常的详细信息。
注意
当驱动程序运行批量操作时,它会使用目标集合的写关注(write concern)。无论执行顺序如何,驱动程序在尝试所有操作后都会报告所有写关注(write concern)错误。
自定义批量写入操作
bulkWrite() 方法可以选择接受一个参数,该参数指定可用于配置批量写入操作的选项。如果不指定任何选项,驱动程序将使用默认设置执行批量操作。
下表描述了可用于配置BulkWriteOptions实例的 setter 方法:
属性 | 说明 |
|---|---|
| 如果为 |
| |
| 设置要附加到操作的注释。 |
| 提供参数名称和值的映射,以便为操作设立顶级变量。值必须是常量或不引用文档字段的闭合表达式。 |
以下代码创建选项并使用ordered(false)选项指定无序批量写入。 然后,该示例使用bulkWrite()方法执行批量操作:
val opts = BulkWriteOptions().ordered(false) collection.bulkWrite(bulkOperations, opts)
如果无序批量写入中的任何写入操作失败, Kotlin 同步驱动程序仅在尝试所有操作后才会报告错误。
注意
无序批量操作不保证执行顺序。 为了优化运行时间,顺序可以与您列出的方式不同。
返回值
bulkWrite()方法返回一个BulkWriteResult对象。 您可以从BulkWriteResult实例访问权限以下信息:
属性 | 说明 |
|---|---|
| 指示服务器是否确认了写入操作。 |
| 已删除的文档数量(如有)。 |
| 插入的文档数量(如有)。 |
| 已插入文档的列表(如果有)。 |
| 与更新匹配的文档数量(如果适用)。 |
| 已修改文档的数量(如有)。 |
| 已更新或插入的文档列表(如有)。 |
客户端批量写入
连接到运行MongoDB Server 8.0 或更高版本的部署时,可以使用 MongoClient.bulkWrite() 方法写入同一集群中的多个数据库和集合。 MongoClient.bulkWrite() 方法在一次调用中执行所有写入。
MongoClient.bulkWrite() 方法采用 ClientNamespacedWriteModel 实例列表来表示不同的写入操作。 您可以使用实例方法构造 ClientNamespacedWriteModel 接口的实例。 示例,ClientNamespacedInsertOneModel 的实例表示插入一个文档的操作,您可以使用 ClientNamespacedWriteModel.insertOne() 方法创建此模型。
下表描述了模型及其相应的实例方法。
模型 | 实例方法 | 说明 | 参数 |
|---|---|---|---|
|
| 创建一个模型以将文档插入到 |
|
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以更新 |
您必须为 |
|
| 创建一个模型以替换 |
|
|
| 创建模型以删除 |
|
|
| 创建模型以删除 |
|
以下部分提供了一些示例,说明如何创建模型和使用客户端bulkWrite() 方法。
插入操作
此示例演示如何创建包含插入两个文档的指令的模型。 将一个文档插入到 sample_restaurants.restaurants集合中,将另一文档插入到 sample_mflix.movies集合中。 MongoNamespace实例定义每个写入操作适用的目标数据库和集合。
val restaurantToInsert = ClientNamespacedWriteModel .insertOne( MongoNamespace("sample_restaurants", "restaurants"), Restaurant("Blue Moon Grill", "Brooklyn", "American") ) val movieToInsert = ClientNamespacedWriteModel .insertOne( MongoNamespace("sample_mflix", "movies"), Movie("Silly Days", 2022) )
更新操作
以下示例展示如何使用 bulkWrite() 方法更新sample_restaurants.restaurants 和 sample_mflix.movies 集合中的现有文档:
val restaurantUpdate = ClientNamespacedWriteModel .updateOne( MongoNamespace("sample_restaurants", "restaurants"), Filters.eq(Restaurant::name.name, "Villa Berulia"), Updates.inc(Restaurant::stars.name, 1) ) val movieUpdate = ClientNamespacedWriteModel .updateMany( MongoNamespace("sample_mflix", "movies"), Filters.eq(Movie::title.name, "Carrie"), Updates.set(Movie::seen.name, true) )
此示例将 restaurants集合中 name 值为 "Villa Berulia" 的文档中 stars字段的值递增 1。它还将 movies集合中 title 值为 "Carrie" 的所有文档的 seen字段的值设置为 true。
如果多个文档与 ClientNamespacedUpdateOneModel实例中指定的查询过滤匹配,则该操作会更新第一个结果。您可以在ClientUpdateOneOptions实例中指定排序顺序,以便在服务器执行更新操作之前对匹配的文档应用顺序,如以下代码所示:
val options = ClientUpdateOneOptions .clientUpdateOneOptions() .sort(Sorts.ascending("_id"))
替换操作
以下示例展示了如何创建模型来替换 sample_restaurants.restaurants 和 sample_mflix.movies 集合中的现有文档:
val restaurantReplacement = ClientNamespacedWriteModel .replaceOne( MongoNamespace("sample_restaurants", "restaurants"), Filters.eq("_id", 1), Restaurant("Smith Town Diner", "Brooklyn", "American") ) val movieReplacement = ClientNamespacedWriteModel .replaceOne( MongoNamespace("sample_mflix", "movies"), Filters.eq("_id", 1), Movie("Loving Sylvie", 1999) )
此示例成功运行后,restaurants集合中 _id 值为 1 的文档将替换为新文档。movies集合中 _id 值为 1 的文档也会被替换为新文档。
如果多个文档与 ClientNamespacedReplaceOneModel实例中指定的查询筛选条件匹配,则该操作将替换第一个结果。您可以在 ClientReplaceOneOptions 实例中指定排序顺序,以便在驱动程序执行替换操作之前对匹配的文档应用顺序,如以下代码所示:
val options = ClientReplaceOneOptions .clientReplaceOneOptions() .sort(Sorts.ascending("_id"))
执行批量操作
为要执行的每个操作定义 ClientNamespacedWriteModel实例后,将这些实例的列表传递给客户端bulkWrite() 方法。 默认情况下,该方法按照指定的顺序运行操作。
以下示例使用bulkWrite()方法执行多个写入操作:
val restaurantNamespace = MongoNamespace("sample_restaurants", "restaurants") val movieNamespace = MongoNamespace("sample_mflix", "movies") val bulkOperations = listOf( ClientNamespacedWriteModel .insertOne( restaurantNamespace, Restaurant("Drea's", "Brooklyn", "Mexican") ), ClientNamespacedWriteModel .replaceOne( movieNamespace, Filters.eq("_id", 1), Movie("Underneath It All", 2002) ) ) val clientBulkResult = mongoClient .bulkWrite(bulkOperations) println(clientBulkResult.toString())
AcknowledgedSummaryClientBulkWriteResult{insertedCount=1, matchedCount=1, ...}
如果任何写入操作失败,驱动程序都会引发 ClientBulkWriteException,并且不会执行任何进一步的单个操作。ClientBulkWriteException 包括可使用 ClientBulkWriteException.getWriteErrors() 方法访问的 BulkWriteError,其中提供了单个故障的详细信息。
自定义批量写入
您可以将 ClientBulkWriteOptions 的实例传递给 bulkWrite() 方法,以自定义驱动程序执行批量写入操作的方式。
执行顺序
默认下,驱动程序会按照您指定的顺序运行批量操作中的各个操作,直到出现错误或操作成功完成。
但是,您可以在创建 ClientBulkWriteOptions实例时将 false 传递给 ordered() 方法,以指示驱动程序以无序方式执行写入操作。使用 unordered 选项时,产生错误的操作不会阻止驱动程序在批量写入操作中运行其他写入操作。
以下代码在 ClientBulkWriteOptions 的实例中将 ordered 选项设置为 false,并执行批量写入操作以插入多个文档。
val namespace = MongoNamespace("sample_restaurants", "restaurants") val options = ClientBulkWriteOptions .clientBulkWriteOptions() .ordered(false) val bulkOps = listOf( ClientNamespacedWriteModel.insertOne( namespace, Document("_id", 1).append("name", "Freezyland") ), // Causes a duplicate key error ClientNamespacedWriteModel.insertOne( namespace, Document("_id", 1).append("name", "Coffee Stand No. 1") ), ClientNamespacedWriteModel.insertOne<Any>( namespace, Document("name", "Kelly's Krepes") ) ) val result = mongoClient .bulkWrite(bulkOps, options)
即使插入具有重复键的文档的写入操作会导致错误,也会执行其他操作,因为写入操作是无序的。
更多信息
要了解如何执行单个写入操作,请参阅以下指南:
API 文档
要进一步了解本指南所讨论的任何方法或类型,请参阅以下 API 文档: