Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs 菜单
Docs 主页
/ / /
C 驱动程序
/ /

批量写入操作

本教程介绍如何利用MongoDB C驾驶员批量写入操作功能。 批量执行写入操作可减少网络往返次数,从而提高写入吞吐量。

首先,我们需要从 mongoc_collection_t 中获取批量操作处理。

mongoc_bulk_operation_t *bulk =
mongoc_collection_create_bulk_operation_with_opts (collection, NULL);

我们现在可以开始将文档插入到批量操作中。 这些将被缓冲,直到我们执行操作。

每次连续调用 mongoc_bulk_operation_insert 时,批量操作会将插入合并为单个批处理。这会在可能的情况下创建管道效果。

为了执行批量操作并接收结果,我们调用 mongoc_bulk_operation_execute。

批量1 .c
#include <assert.h>
#include <mongoc/mongoc.h>
#include <stdio.h>
static void
bulk1 (mongoc_collection_t *collection)
{
mongoc_bulk_operation_t *bulk;
bson_error_t error;
bson_t *doc;
bson_t reply;
char *str;
bool ret;
int i;
bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL);
for (i = 0; i < 10000; i++) {
doc = BCON_NEW ("i", BCON_INT32 (i));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
}
ret = mongoc_bulk_operation_execute (bulk, &reply, &error);
str = bson_as_canonical_extended_json (&reply, NULL);
printf ("%s\n", str);
bson_free (str);
if (!ret) {
fprintf (stderr, "Error: %s\n", error.message);
}
bson_destroy (&reply);
mongoc_bulk_operation_destroy (bulk);
}
int
main (void)
{
mongoc_client_t *client;
mongoc_collection_t *collection;
const char *uri_string = "mongodb://localhost/?appname=bulk1-example";
mongoc_uri_t *uri;
bson_error_t error;
mongoc_init ();
uri = mongoc_uri_new_with_error (uri_string, &error);
if (!uri) {
fprintf (stderr,
"failed to parse URI: %s\n"
"error message: %s\n",
uri_string,
error.message);
return EXIT_FAILURE;
}
client = mongoc_client_new_from_uri (uri);
if (!client) {
return EXIT_FAILURE;
}
mongoc_client_set_error_api (client, 2);
collection = mongoc_client_get_collection (client, "test", "test");
bulk1 (collection);
mongoc_uri_destroy (uri);
mongoc_collection_destroy (collection);
mongoc_client_destroy (client);
mongoc_cleanup ();
return EXIT_SUCCESS;
}

示例 reply文档:

{"nInserted" : 10000,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"nUpserted" : 0,
"writeErrors" : []
"writeConcernErrors" : [] }

MongoDB C 驱动程序还支持执行混合批量写入操作。 可以使用批量写入操作 API 一起执行一批插入、更新和删除操作。

有序的批量写入操作会按照为串行执行提供的顺序进行批处理并发送到服务器。 reply文档描述了所执行操作的类型和次数。

批量2 .c
#include <assert.h>
#include <mongoc/mongoc.h>
#include <stdio.h>
static void
bulk2 (mongoc_collection_t *collection)
{
mongoc_bulk_operation_t *bulk;
bson_error_t error;
bson_t *query;
bson_t *doc;
bson_t *opts;
bson_t reply;
char *str;
bool ret;
int i;
bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL);
/* Remove everything */
query = bson_new ();
mongoc_bulk_operation_remove (bulk, query);
bson_destroy (query);
/* Add a few documents */
for (i = 1; i < 4; i++) {
doc = BCON_NEW ("_id", BCON_INT32 (i));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
}
/* {_id: 1} => {$set: {foo: "bar"}} */
query = BCON_NEW ("_id", BCON_INT32 (1));
doc = BCON_NEW ("$set", "{", "foo", BCON_UTF8 ("bar"), "}");
mongoc_bulk_operation_update_many_with_opts (bulk, query, doc, NULL, &error);
bson_destroy (query);
bson_destroy (doc);
/* {_id: 4} => {'$inc': {'j': 1}} (upsert) */
opts = BCON_NEW ("upsert", BCON_BOOL (true));
query = BCON_NEW ("_id", BCON_INT32 (4));
doc = BCON_NEW ("$inc", "{", "j", BCON_INT32 (1), "}");
mongoc_bulk_operation_update_many_with_opts (bulk, query, doc, opts, &error);
bson_destroy (query);
bson_destroy (doc);
bson_destroy (opts);
/* replace {j:1} with {j:2} */
query = BCON_NEW ("j", BCON_INT32 (1));
doc = BCON_NEW ("j", BCON_INT32 (2));
mongoc_bulk_operation_replace_one_with_opts (bulk, query, doc, NULL, &error);
bson_destroy (query);
bson_destroy (doc);
ret = mongoc_bulk_operation_execute (bulk, &reply, &error);
str = bson_as_canonical_extended_json (&reply, NULL);
printf ("%s\n", str);
bson_free (str);
if (!ret) {
printf ("Error: %s\n", error.message);
}
bson_destroy (&reply);
mongoc_bulk_operation_destroy (bulk);
}
int
main (void)
{
mongoc_client_t *client;
mongoc_collection_t *collection;
const char *uri_string = "mongodb://localhost/?appname=bulk2-example";
mongoc_uri_t *uri;
bson_error_t error;
mongoc_init ();
uri = mongoc_uri_new_with_error (uri_string, &error);
if (!uri) {
fprintf (stderr,
"failed to parse URI: %s\n"
"error message: %s\n",
uri_string,
error.message);
return EXIT_FAILURE;
}
client = mongoc_client_new_from_uri (uri);
if (!client) {
return EXIT_FAILURE;
}
mongoc_client_set_error_api (client, 2);
collection = mongoc_client_get_collection (client, "test", "test");
bulk2 (collection);
mongoc_uri_destroy (uri);
mongoc_collection_destroy (collection);
mongoc_client_destroy (client);
mongoc_cleanup ();
return EXIT_SUCCESS;
}

示例reply文档:

{ "nInserted" : 3,
"nMatched" : 2,
"nModified" : 2,
"nRemoved" : 10000,
"nUpserted" : 1,
"upserted" : [{"index" : 5, "_id" : 4}],
"writeErrors" : []
"writeConcernErrors" : [] }

upserted大量中的index字段是更新或更新或插入(upsert)操作基于0的索引;在此示例中,整个批量操作的第六个操作是更新或更新或插入(upsert),因此其索引为5 。

无序批量写入操作会被分批处理,并以任意顺序发送到服务器,而服务器可能会并行执行这些操作。 尝试所有操作后,都会报告出现的任何错误。

在下一个示例中,第一个和第三个操作由于_id的唯一约束而失败。 由于我们正在进行无序执行,因此第二个和第四个操作会成功。

批量3 .c
#include <assert.h>
#include <mongoc/mongoc.h>
#include <stdio.h>
static void
bulk3 (mongoc_collection_t *collection)
{
bson_t opts = BSON_INITIALIZER;
mongoc_bulk_operation_t *bulk;
bson_error_t error;
bson_t *query;
bson_t *doc;
bson_t reply;
char *str;
bool ret;
/* false indicates unordered */
BSON_APPEND_BOOL (&opts, "ordered", false);
bulk = mongoc_collection_create_bulk_operation_with_opts (collection, &opts);
bson_destroy (&opts);
/* Add a document */
doc = BCON_NEW ("_id", BCON_INT32 (1));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
/* remove {_id: 2} */
query = BCON_NEW ("_id", BCON_INT32 (2));
mongoc_bulk_operation_remove_one (bulk, query);
bson_destroy (query);
/* insert {_id: 3} */
doc = BCON_NEW ("_id", BCON_INT32 (3));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
/* replace {_id:4} {'i': 1} */
query = BCON_NEW ("_id", BCON_INT32 (4));
doc = BCON_NEW ("i", BCON_INT32 (1));
mongoc_bulk_operation_replace_one (bulk, query, doc, false);
bson_destroy (query);
bson_destroy (doc);
ret = mongoc_bulk_operation_execute (bulk, &reply, &error);
str = bson_as_canonical_extended_json (&reply, NULL);
printf ("%s\n", str);
bson_free (str);
if (!ret) {
printf ("Error: %s\n", error.message);
}
bson_destroy (&reply);
mongoc_bulk_operation_destroy (bulk);
bson_destroy (&opts);
}
int
main (void)
{
mongoc_client_t *client;
mongoc_collection_t *collection;
const char *uri_string = "mongodb://localhost/?appname=bulk3-example";
mongoc_uri_t *uri;
bson_error_t error;
mongoc_init ();
uri = mongoc_uri_new_with_error (uri_string, &error);
if (!uri) {
fprintf (stderr,
"failed to parse URI: %s\n"
"error message: %s\n",
uri_string,
error.message);
return EXIT_FAILURE;
}
client = mongoc_client_new_from_uri (uri);
if (!client) {
return EXIT_FAILURE;
}
mongoc_client_set_error_api (client, 2);
collection = mongoc_client_get_collection (client, "test", "test");
bulk3 (collection);
mongoc_uri_destroy (uri);
mongoc_collection_destroy (collection);
mongoc_client_destroy (client);
mongoc_cleanup ();
return EXIT_SUCCESS;
}

示例reply文档:

{ "nInserted" : 0,
"nMatched" : 1,
"nModified" : 1,
"nRemoved" : 1,
"nUpserted" : 0,
"writeErrors" : [
{ "index" : 0,
"code" : 11000,
"errmsg" : "E11000 duplicate key error index: test.test.$_id_ dup key: { : 1 }" },
{ "index" : 2,
"code" : 11000,
"errmsg" : "E11000 duplicate key error index: test.test.$_id_ dup key: { : 3 }" } ],
"writeConcernErrors" : [] }
Error: E11000 duplicate key error index: test.test.$_id_ dup key: { : 1 }

bson_error_t 域为MONGOC_ERROR_COMMAND ,其代码为11000 。

此功能仅在使用 MongoDB 3.2及更高版本时可用。

默认,批量操作会根据模式(如果已定义)进行验证。 但在某些情况下,可能需要绕过文档验证。

批量5 .c
#include <assert.h>
#include <mongoc/mongoc.h>
#include <stdio.h>
static void
bulk5_fail (mongoc_collection_t *collection)
{
mongoc_bulk_operation_t *bulk;
bson_error_t error;
bson_t *doc;
bson_t reply;
char *str;
bool ret;
bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL);
/* Two inserts */
doc = BCON_NEW ("_id", BCON_INT32 (31));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
doc = BCON_NEW ("_id", BCON_INT32 (32));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
/* The above documents do not comply to the schema validation rules
* we created previously, so this will result in an error */
ret = mongoc_bulk_operation_execute (bulk, &reply, &error);
str = bson_as_canonical_extended_json (&reply, NULL);
printf ("%s\n", str);
bson_free (str);
if (!ret) {
printf ("Error: %s\n", error.message);
}
bson_destroy (&reply);
mongoc_bulk_operation_destroy (bulk);
}
static void
bulk5_success (mongoc_collection_t *collection)
{
mongoc_bulk_operation_t *bulk;
bson_error_t error;
bson_t *doc;
bson_t reply;
char *str;
bool ret;
bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL);
/* Allow this document to bypass document validation.
* NOTE: When authentication is enabled, the authenticated user must have
* either the "dbadmin" or "restore" roles to bypass document validation */
mongoc_bulk_operation_set_bypass_document_validation (bulk, true);
/* Two inserts */
doc = BCON_NEW ("_id", BCON_INT32 (31));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
doc = BCON_NEW ("_id", BCON_INT32 (32));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
ret = mongoc_bulk_operation_execute (bulk, &reply, &error);
str = bson_as_canonical_extended_json (&reply, NULL);
printf ("%s\n", str);
bson_free (str);
if (!ret) {
printf ("Error: %s\n", error.message);
}
bson_destroy (&reply);
mongoc_bulk_operation_destroy (bulk);
}
int
main (void)
{
bson_t *options;
bson_error_t error;
mongoc_client_t *client;
mongoc_collection_t *collection;
mongoc_database_t *database;
const char *uri_string = "mongodb://localhost/?appname=bulk5-example";
mongoc_uri_t *uri;
mongoc_init ();
uri = mongoc_uri_new_with_error (uri_string, &error);
if (!uri) {
fprintf (stderr,
"failed to parse URI: %s\n"
"error message: %s\n",
uri_string,
error.message);
return EXIT_FAILURE;
}
client = mongoc_client_new_from_uri (uri);
if (!client) {
return EXIT_FAILURE;
}
mongoc_client_set_error_api (client, 2);
database = mongoc_client_get_database (client, "testasdf");
/* Create schema validator */
options = BCON_NEW ("validator", "{", "number", "{", "$gte", BCON_INT32 (5), "}", "}");
collection = mongoc_database_create_collection (database, "collname", options, &error);
if (collection) {
bulk5_fail (collection);
bulk5_success (collection);
mongoc_collection_destroy (collection);
} else {
fprintf (stderr, "Couldn't create collection: '%s'\n", error.message);
}
bson_free (options);
mongoc_uri_destroy (uri);
mongoc_database_destroy (database);
mongoc_client_destroy (client);
mongoc_cleanup ();
return EXIT_SUCCESS;
}

运行上述示例将导致:

{ "nInserted" : 0,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"nUpserted" : 0,
"writeErrors" : [
{ "index" : 0,
"code" : 121,
"errmsg" : "Document failed validation" } ] }
Error: Document failed validation
{ "nInserted" : 2,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"nUpserted" : 0,
"writeErrors" : [] }

bson_error_t 域为MONGOC_ERROR_COMMAND

默认情况下,批量操作是使用它们所针对的集合的 mongoc_write_concern_t 来执行的。可以将自定义写关注(write concern)传递给 mongoc_collection_create_bulk_operation_with_opts 方法。无论执行顺序如何,在尝试所有操作后都会报告写关注错误(例如 wtimeout)。

批量4 .c
#include <assert.h>
#include <mongoc/mongoc.h>
#include <stdio.h>
static void
bulk4 (mongoc_collection_t *collection)
{
bson_t opts = BSON_INITIALIZER;
mongoc_write_concern_t *wc;
mongoc_bulk_operation_t *bulk;
bson_error_t error;
bson_t *doc;
bson_t reply;
char *str;
bool ret;
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_w (wc, 4);
mongoc_write_concern_set_wtimeout_int64 (wc, 100); /* milliseconds */
mongoc_write_concern_append (wc, &opts);
bulk = mongoc_collection_create_bulk_operation_with_opts (collection, &opts);
/* Two inserts */
doc = BCON_NEW ("_id", BCON_INT32 (10));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
doc = BCON_NEW ("_id", BCON_INT32 (11));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
ret = mongoc_bulk_operation_execute (bulk, &reply, &error);
str = bson_as_canonical_extended_json (&reply, NULL);
printf ("%s\n", str);
bson_free (str);
if (!ret) {
printf ("Error: %s\n", error.message);
}
bson_destroy (&reply);
mongoc_bulk_operation_destroy (bulk);
mongoc_write_concern_destroy (wc);
bson_destroy (&opts);
}
int
main (void)
{
mongoc_client_t *client;
mongoc_collection_t *collection;
const char *uri_string = "mongodb://localhost/?appname=bulk4-example";
mongoc_uri_t *uri;
bson_error_t error;
mongoc_init ();
uri = mongoc_uri_new_with_error (uri_string, &error);
if (!uri) {
fprintf (stderr,
"failed to parse URI: %s\n"
"error message: %s\n",
uri_string,
error.message);
return EXIT_FAILURE;
}
client = mongoc_client_new_from_uri (uri);
if (!client) {
return EXIT_FAILURE;
}
mongoc_client_set_error_api (client, 2);
collection = mongoc_client_get_collection (client, "test", "test");
bulk4 (collection);
mongoc_uri_destroy (uri);
mongoc_collection_destroy (collection);
mongoc_client_destroy (client);
mongoc_cleanup ();
return EXIT_SUCCESS;
}

reply文档和错误消息示例:

{ "nInserted" : 2,
"nMatched" : 0,
"nModified" : 0,
"nRemoved" : 0,
"nUpserted" : 0,
"writeErrors" : [],
"writeConcernErrors" : [
{ "code" : 64,
"errmsg" : "waiting for replication timed out" }
] }
Error: waiting for replication timed out

如果存在写关注(write concern)错误但没有写入错误,则 bson_error_t 域为MONGOC_ERROR_WRITE_CONCERN 。写入错误表示操作失败,因此它们优先于写关注(write concern)错误,后者仅平均值写关注(write concern)尚未满足。

此功能仅在使用 MongoDB 3.4及更高版本时可用。

bulk-collation.c
#include <mongoc/mongoc.h>
#include <stdio.h>
static void
bulk_collation (mongoc_collection_t *collection)
{
mongoc_bulk_operation_t *bulk;
bson_t *opts;
bson_t *doc;
bson_t *selector;
bson_t *update;
bson_error_t error;
bson_t reply;
char *str;
uint32_t ret;
/* insert {_id: "one"} and {_id: "One"} */
bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL);
doc = BCON_NEW ("_id", BCON_UTF8 ("one"));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
doc = BCON_NEW ("_id", BCON_UTF8 ("One"));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
/* "One" normally sorts before "one"; make "one" come first */
opts = BCON_NEW ("collation", "{", "locale", BCON_UTF8 ("en_US"), "caseFirst", BCON_UTF8 ("lower"), "}");
/* set x=1 on the document with _id "One", which now sorts after "one" */
update = BCON_NEW ("$set", "{", "x", BCON_INT64 (1), "}");
selector = BCON_NEW ("_id", "{", "$gt", BCON_UTF8 ("one"), "}");
mongoc_bulk_operation_update_one_with_opts (bulk, selector, update, opts, &error);
ret = mongoc_bulk_operation_execute (bulk, &reply, &error);
str = bson_as_canonical_extended_json (&reply, NULL);
printf ("%s\n", str);
bson_free (str);
if (!ret) {
printf ("Error: %s\n", error.message);
}
bson_destroy (&reply);
bson_destroy (update);
bson_destroy (selector);
bson_destroy (opts);
mongoc_bulk_operation_destroy (bulk);
}
int
main (void)
{
mongoc_client_t *client;
mongoc_collection_t *collection;
const char *uri_string = "mongodb://localhost/?appname=bulk-collation";
mongoc_uri_t *uri;
bson_error_t error;
mongoc_init ();
uri = mongoc_uri_new_with_error (uri_string, &error);
if (!uri) {
fprintf (stderr,
"failed to parse URI: %s\n"
"error message: %s\n",
uri_string,
error.message);
return EXIT_FAILURE;
}
client = mongoc_client_new_from_uri (uri);
if (!client) {
return EXIT_FAILURE;
}
mongoc_client_set_error_api (client, 2);
collection = mongoc_client_get_collection (client, "db", "collection");
bulk_collation (collection);
mongoc_uri_destroy (uri);
mongoc_collection_destroy (collection);
mongoc_client_destroy (client);
mongoc_cleanup ();
return EXIT_SUCCESS;
}

运行上述示例将导致:

{ "nInserted" : 2,
"nMatched" : 1,
"nModified" : 1,
"nRemoved" : 0,
"nUpserted" : 0,
"writeErrors" : [ ]
}

对于未确认的写入,将“w”设置为零。 驾驶员使用传统操作码OP_INSERTOP_UPDATEOP_DELETE发送未确认的写入。

批量6 .c
#include <mongoc/mongoc.h>
#include <stdio.h>
static void
bulk6 (mongoc_collection_t *collection)
{
bson_t opts = BSON_INITIALIZER;
mongoc_write_concern_t *wc;
mongoc_bulk_operation_t *bulk;
bson_error_t error;
bson_t *doc;
bson_t *selector;
bson_t reply;
char *str;
bool ret;
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_w (wc, 0);
mongoc_write_concern_append (wc, &opts);
bulk = mongoc_collection_create_bulk_operation_with_opts (collection, &opts);
doc = BCON_NEW ("_id", BCON_INT32 (10));
mongoc_bulk_operation_insert (bulk, doc);
bson_destroy (doc);
selector = BCON_NEW ("_id", BCON_INT32 (11));
mongoc_bulk_operation_remove_one (bulk, selector);
bson_destroy (selector);
ret = mongoc_bulk_operation_execute (bulk, &reply, &error);
str = bson_as_canonical_extended_json (&reply, NULL);
printf ("%s\n", str);
bson_free (str);
if (!ret) {
printf ("Error: %s\n", error.message);
}
bson_destroy (&reply);
mongoc_bulk_operation_destroy (bulk);
mongoc_write_concern_destroy (wc);
bson_destroy (&opts);
}
int
main (void)
{
mongoc_client_t *client;
mongoc_collection_t *collection;
const char *uri_string = "mongodb://localhost/?appname=bulk6-example";
mongoc_uri_t *uri;
bson_error_t error;
mongoc_init ();
uri = mongoc_uri_new_with_error (uri_string, &error);
if (!uri) {
fprintf (stderr,
"failed to parse URI: %s\n"
"error message: %s\n",
uri_string,
error.message);
return EXIT_FAILURE;
}
client = mongoc_client_new_from_uri (uri);
if (!client) {
return EXIT_FAILURE;
}
mongoc_client_set_error_api (client, 2);
collection = mongoc_client_get_collection (client, "test", "test");
bulk6 (collection);
mongoc_uri_destroy (uri);
mongoc_collection_destroy (collection);
mongoc_client_destroy (client);
mongoc_cleanup ();
return EXIT_SUCCESS;
}

reply文档为空:

{ }

请参阅驱动程序批量API规范 ,其中描述了所有MongoDB驱动程序的批量写入操作。

后退

Cursors

在此页面上