이 튜토리얼에서는 MongoDB C 운전자 의 대량 쓰기 (write) 작업 기능을 활용하는 방법을 설명합니다. 쓰기 (write) 작업을 배치로 실행하면 네트워크 왕복 횟수가 줄어들어 쓰기 (write) 처리량 이 증가합니다.
Bulk Insert
First we need to fetch a bulk operation handle from the mongoc_collection_t.
mongoc_bulk_operation_t *bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL);
이제 대량 작업에 문서 삽입을 시작할 수 있습니다. 작업을 실행할 때까지 버퍼링됩니다.
The bulk operation will coalesce insertions as a single batch for each consecutive call to mongoc_bulk_operation_insert. This creates a pipelined effect when possible.
To execute the bulk operation and receive the result we call mongoc_bulk_operation_execute.
static void bulk1 (mongoc_collection_t *collection) { mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *doc; bson_t reply; char *str; bool ret; int i; bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL); for (i = 0; i < 10000; i++) { doc = BCON_NEW ("i", BCON_INT32 (i)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); } ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_canonical_extended_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { fprintf (stderr, "Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); } int main (void) { mongoc_client_t *client; mongoc_collection_t *collection; const char *uri_string = "mongodb://localhost/?appname=bulk1-example"; mongoc_uri_t *uri; bson_error_t error; mongoc_init (); uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, "failed to parse URI: %s\n" "error message: %s\n", uri_string, error.message); return EXIT_FAILURE; } client = mongoc_client_new_from_uri (uri); if (!client) { return EXIT_FAILURE; } mongoc_client_set_error_api (client, 2); collection = mongoc_client_get_collection (client, "test", "test"); bulk1 (collection); mongoc_uri_destroy (uri); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return EXIT_SUCCESS; }
reply
문서 예시:
{"nInserted" : 10000, "nMatched" : 0, "nModified" : 0, "nRemoved" : 0, "nUpserted" : 0, "writeErrors" : [] "writeConcernErrors" : [] }
혼합 대량 쓰기 작업
MongoDB C 드라이버는 혼합 대량 쓰기 작업 실행도 지원합니다. 대량 쓰기 작업 API를 사용하여 삽입, 업데이트 및 제거 작업을 일괄적으로 실행할 수 있습니다.
순서가 지정된 대량 쓰기 작업
순서가 지정된 대량 쓰기 (write) 작업은 일괄 처리되어 직렬 실행을 위해 제공된 순서대로 서버 로 전송됩니다. reply
문서 에서는 수행된 작업의 유형과 횟수를 설명합니다.
static void bulk2 (mongoc_collection_t *collection) { mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *query; bson_t *doc; bson_t *opts; bson_t reply; char *str; bool ret; int i; bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL); /* Remove everything */ query = bson_new (); mongoc_bulk_operation_remove (bulk, query); bson_destroy (query); /* Add a few documents */ for (i = 1; i < 4; i++) { doc = BCON_NEW ("_id", BCON_INT32 (i)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); } /* {_id: 1} => {$set: {foo: "bar"}} */ query = BCON_NEW ("_id", BCON_INT32 (1)); doc = BCON_NEW ("$set", "{", "foo", BCON_UTF8 ("bar"), "}"); mongoc_bulk_operation_update_many_with_opts (bulk, query, doc, NULL, &error); bson_destroy (query); bson_destroy (doc); /* {_id: 4} => {'$inc': {'j': 1}} (upsert) */ opts = BCON_NEW ("upsert", BCON_BOOL (true)); query = BCON_NEW ("_id", BCON_INT32 (4)); doc = BCON_NEW ("$inc", "{", "j", BCON_INT32 (1), "}"); mongoc_bulk_operation_update_many_with_opts (bulk, query, doc, opts, &error); bson_destroy (query); bson_destroy (doc); bson_destroy (opts); /* replace {j:1} with {j:2} */ query = BCON_NEW ("j", BCON_INT32 (1)); doc = BCON_NEW ("j", BCON_INT32 (2)); mongoc_bulk_operation_replace_one_with_opts (bulk, query, doc, NULL, &error); bson_destroy (query); bson_destroy (doc); ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_canonical_extended_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); } int main (void) { mongoc_client_t *client; mongoc_collection_t *collection; const char *uri_string = "mongodb://localhost/?appname=bulk2-example"; mongoc_uri_t *uri; bson_error_t error; mongoc_init (); uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, "failed to parse URI: %s\n" "error message: %s\n", uri_string, error.message); return EXIT_FAILURE; } client = mongoc_client_new_from_uri (uri); if (!client) { return EXIT_FAILURE; } mongoc_client_set_error_api (client, 2); collection = mongoc_client_get_collection (client, "test", "test"); bulk2 (collection); mongoc_uri_destroy (uri); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return EXIT_SUCCESS; }
reply
문서 예시:
{ "nInserted" : 3, "nMatched" : 2, "nModified" : 2, "nRemoved" : 10000, "nUpserted" : 1, "upserted" : [{"index" : 5, "_id" : 4}], "writeErrors" : [] "writeConcernErrors" : [] }
upserted
배열 의 index
필드 는 업서트 작업의 0기반 인덱스 입니다. 이 예시 에서 전체 대량 작업의 여섯 번째 작업은 업서트 이므로 인덱스 는 5 입니다.
순서가 지정되지 않은 대량 쓰기 작업
순서가 지정되지 않은 대량 쓰기 작업은 일괄 처리되어 임의의 순서 로 서버로 전송되어 병렬로 실행될 수 있습니다. 발생하는 모든 오류는 모든 작업이 시도된 후에 보고됩니다.
다음 예시 에서는 _id
에 대한 고유 제약 조건으로 인해 첫 번째 및 세 번째 작업이 실패합니다. 우리는 순서가 지정되지 않은 실행을 수행하고 있으므로 두 번째와 네 번째 작업이 성공합니다.
static void bulk3 (mongoc_collection_t *collection) { bson_t opts = BSON_INITIALIZER; mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *query; bson_t *doc; bson_t reply; char *str; bool ret; /* false indicates unordered */ BSON_APPEND_BOOL (&opts, "ordered", false); bulk = mongoc_collection_create_bulk_operation_with_opts (collection, &opts); bson_destroy (&opts); /* Add a document */ doc = BCON_NEW ("_id", BCON_INT32 (1)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); /* remove {_id: 2} */ query = BCON_NEW ("_id", BCON_INT32 (2)); mongoc_bulk_operation_remove_one (bulk, query); bson_destroy (query); /* insert {_id: 3} */ doc = BCON_NEW ("_id", BCON_INT32 (3)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); /* replace {_id:4} {'i': 1} */ query = BCON_NEW ("_id", BCON_INT32 (4)); doc = BCON_NEW ("i", BCON_INT32 (1)); mongoc_bulk_operation_replace_one (bulk, query, doc, false); bson_destroy (query); bson_destroy (doc); ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_canonical_extended_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); bson_destroy (&opts); } int main (void) { mongoc_client_t *client; mongoc_collection_t *collection; const char *uri_string = "mongodb://localhost/?appname=bulk3-example"; mongoc_uri_t *uri; bson_error_t error; mongoc_init (); uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, "failed to parse URI: %s\n" "error message: %s\n", uri_string, error.message); return EXIT_FAILURE; } client = mongoc_client_new_from_uri (uri); if (!client) { return EXIT_FAILURE; } mongoc_client_set_error_api (client, 2); collection = mongoc_client_get_collection (client, "test", "test"); bulk3 (collection); mongoc_uri_destroy (uri); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return EXIT_SUCCESS; }
reply
문서 예시:
{ "nInserted" : 0, "nMatched" : 1, "nModified" : 1, "nRemoved" : 1, "nUpserted" : 0, "writeErrors" : [ { "index" : 0, "code" : 11000, "errmsg" : "E11000 duplicate key error index: test.test.$_id_ dup key: { : 1 }" }, { "index" : 2, "code" : 11000, "errmsg" : "E11000 duplicate key error index: test.test.$_id_ dup key: { : 3 }" } ], "writeConcernErrors" : [] } Error: E11000 duplicate key error index: test.test.$_id_ dup key: { : 1 }
The bson_error_t domain is MONGOC_ERROR_COMMAND
and its code is 11000.
문서 유효성 검사를 우회하는 대량 작업
이 기능은 MongoDB 3.2 이상을 사용하는 경우에만 사용할 수 있습니다.
기본값 대량 작업은 스키마가 정의된 경우 스키마 에 대해 유효성을 검사합니다. 그러나 경우에 따라 문서 유효성 검사 를 우회해야 할 수도 있습니다.
static void bulk5_fail (mongoc_collection_t *collection) { mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *doc; bson_t reply; char *str; bool ret; bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL); /* Two inserts */ doc = BCON_NEW ("_id", BCON_INT32 (31)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); doc = BCON_NEW ("_id", BCON_INT32 (32)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); /* The above documents do not comply to the schema validation rules * we created previously, so this will result in an error */ ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_canonical_extended_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); } static void bulk5_success (mongoc_collection_t *collection) { mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *doc; bson_t reply; char *str; bool ret; bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL); /* Allow this document to bypass document validation. * NOTE: When authentication is enabled, the authenticated user must have * either the "dbadmin" or "restore" roles to bypass document validation */ mongoc_bulk_operation_set_bypass_document_validation (bulk, true); /* Two inserts */ doc = BCON_NEW ("_id", BCON_INT32 (31)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); doc = BCON_NEW ("_id", BCON_INT32 (32)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_canonical_extended_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); } int main (void) { bson_t *options; bson_error_t error; mongoc_client_t *client; mongoc_collection_t *collection; mongoc_database_t *database; const char *uri_string = "mongodb://localhost/?appname=bulk5-example"; mongoc_uri_t *uri; mongoc_init (); uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, "failed to parse URI: %s\n" "error message: %s\n", uri_string, error.message); return EXIT_FAILURE; } client = mongoc_client_new_from_uri (uri); if (!client) { return EXIT_FAILURE; } mongoc_client_set_error_api (client, 2); database = mongoc_client_get_database (client, "testasdf"); /* Create schema validator */ options = BCON_NEW ("validator", "{", "number", "{", "$gte", BCON_INT32 (5), "}", "}"); collection = mongoc_database_create_collection (database, "collname", options, &error); if (collection) { bulk5_fail (collection); bulk5_success (collection); mongoc_collection_destroy (collection); } else { fprintf (stderr, "Couldn't create collection: '%s'\n", error.message); } bson_free (options); mongoc_uri_destroy (uri); mongoc_database_destroy (database); mongoc_client_destroy (client); mongoc_cleanup (); return EXIT_SUCCESS; }
위의 예제를 실행하면 다음과 같은 결과가 나타납니다.
{ "nInserted" : 0, "nMatched" : 0, "nModified" : 0, "nRemoved" : 0, "nUpserted" : 0, "writeErrors" : [ { "index" : 0, "code" : 121, "errmsg" : "Document failed validation" } ] } Error: Document failed validation { "nInserted" : 2, "nMatched" : 0, "nModified" : 0, "nRemoved" : 0, "nUpserted" : 0, "writeErrors" : [] }
The bson_error_t domain is MONGOC_ERROR_COMMAND
.
대량 작업 쓰기 고려
By default bulk operations are executed with the mongoc_write_concern_t of the collection they are executed against. A custom write concern can be passed to the mongoc_collection_create_bulk_operation_with_opts method. Write concern errors (e.g. wtimeout) will be reported after all operations are attempted, regardless of execution order.
static void bulk4 (mongoc_collection_t *collection) { bson_t opts = BSON_INITIALIZER; mongoc_write_concern_t *wc; mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *doc; bson_t reply; char *str; bool ret; wc = mongoc_write_concern_new (); mongoc_write_concern_set_w (wc, 4); mongoc_write_concern_set_wtimeout_int64 (wc, 100); /* milliseconds */ mongoc_write_concern_append (wc, &opts); bulk = mongoc_collection_create_bulk_operation_with_opts (collection, &opts); /* Two inserts */ doc = BCON_NEW ("_id", BCON_INT32 (10)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); doc = BCON_NEW ("_id", BCON_INT32 (11)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_canonical_extended_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); mongoc_write_concern_destroy (wc); bson_destroy (&opts); } int main (void) { mongoc_client_t *client; mongoc_collection_t *collection; const char *uri_string = "mongodb://localhost/?appname=bulk4-example"; mongoc_uri_t *uri; bson_error_t error; mongoc_init (); uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, "failed to parse URI: %s\n" "error message: %s\n", uri_string, error.message); return EXIT_FAILURE; } client = mongoc_client_new_from_uri (uri); if (!client) { return EXIT_FAILURE; } mongoc_client_set_error_api (client, 2); collection = mongoc_client_get_collection (client, "test", "test"); bulk4 (collection); mongoc_uri_destroy (uri); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return EXIT_SUCCESS; }
reply
문서 및 오류 메시지 예시:
{ "nInserted" : 2, "nMatched" : 0, "nModified" : 0, "nRemoved" : 0, "nUpserted" : 0, "writeErrors" : [], "writeConcernErrors" : [ { "code" : 64, "errmsg" : "waiting for replication timed out" } ] } Error: waiting for replication timed out
The bson_error_t domain is MONGOC_ERROR_WRITE_CONCERN
if there are write concern errors and no write errors. Write errors indicate failed operations, so they take precedence over write concern errors, which mean merely that the write concern is not satisfied yet.
데이터 정렬 순서 설정
이 기능은 MongoDB 3.4 이상을 사용하는 경우에만 사용할 수 있습니다.
static void bulk_collation (mongoc_collection_t *collection) { mongoc_bulk_operation_t *bulk; bson_t *opts; bson_t *doc; bson_t *selector; bson_t *update; bson_error_t error; bson_t reply; char *str; uint32_t ret; /* insert {_id: "one"} and {_id: "One"} */ bulk = mongoc_collection_create_bulk_operation_with_opts (collection, NULL); doc = BCON_NEW ("_id", BCON_UTF8 ("one")); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); doc = BCON_NEW ("_id", BCON_UTF8 ("One")); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); /* "One" normally sorts before "one"; make "one" come first */ opts = BCON_NEW ("collation", "{", "locale", BCON_UTF8 ("en_US"), "caseFirst", BCON_UTF8 ("lower"), "}"); /* set x=1 on the document with _id "One", which now sorts after "one" */ update = BCON_NEW ("$set", "{", "x", BCON_INT64 (1), "}"); selector = BCON_NEW ("_id", "{", "$gt", BCON_UTF8 ("one"), "}"); mongoc_bulk_operation_update_one_with_opts (bulk, selector, update, opts, &error); ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_canonical_extended_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); bson_destroy (update); bson_destroy (selector); bson_destroy (opts); mongoc_bulk_operation_destroy (bulk); } int main (void) { mongoc_client_t *client; mongoc_collection_t *collection; const char *uri_string = "mongodb://localhost/?appname=bulk-collation"; mongoc_uri_t *uri; bson_error_t error; mongoc_init (); uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, "failed to parse URI: %s\n" "error message: %s\n", uri_string, error.message); return EXIT_FAILURE; } client = mongoc_client_new_from_uri (uri); if (!client) { return EXIT_FAILURE; } mongoc_client_set_error_api (client, 2); collection = mongoc_client_get_collection (client, "db", "collection"); bulk_collation (collection); mongoc_uri_destroy (uri); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return EXIT_SUCCESS; }
위의 예제를 실행하면 다음과 같은 결과가 나타납니다.
{ "nInserted" : 2, "nMatched" : 1, "nModified" : 1, "nRemoved" : 0, "nUpserted" : 0, "writeErrors" : [ ] }
승인되지 않은 대량 쓰기
승인되지 않은 쓰기 (write) 의 경우 'w'를 0으로 설정합니다. 운전자 는 레거시 옵코드 OP_INSERT
, OP_UPDATE
및 OP_DELETE
를 사용하여 승인되지 않은 쓰기를 전송합니다.
static void bulk6 (mongoc_collection_t *collection) { bson_t opts = BSON_INITIALIZER; mongoc_write_concern_t *wc; mongoc_bulk_operation_t *bulk; bson_error_t error; bson_t *doc; bson_t *selector; bson_t reply; char *str; bool ret; wc = mongoc_write_concern_new (); mongoc_write_concern_set_w (wc, 0); mongoc_write_concern_append (wc, &opts); bulk = mongoc_collection_create_bulk_operation_with_opts (collection, &opts); doc = BCON_NEW ("_id", BCON_INT32 (10)); mongoc_bulk_operation_insert (bulk, doc); bson_destroy (doc); selector = BCON_NEW ("_id", BCON_INT32 (11)); mongoc_bulk_operation_remove_one (bulk, selector); bson_destroy (selector); ret = mongoc_bulk_operation_execute (bulk, &reply, &error); str = bson_as_canonical_extended_json (&reply, NULL); printf ("%s\n", str); bson_free (str); if (!ret) { printf ("Error: %s\n", error.message); } bson_destroy (&reply); mongoc_bulk_operation_destroy (bulk); mongoc_write_concern_destroy (wc); bson_destroy (&opts); } int main (void) { mongoc_client_t *client; mongoc_collection_t *collection; const char *uri_string = "mongodb://localhost/?appname=bulk6-example"; mongoc_uri_t *uri; bson_error_t error; mongoc_init (); uri = mongoc_uri_new_with_error (uri_string, &error); if (!uri) { fprintf (stderr, "failed to parse URI: %s\n" "error message: %s\n", uri_string, error.message); return EXIT_FAILURE; } client = mongoc_client_new_from_uri (uri); if (!client) { return EXIT_FAILURE; } mongoc_client_set_error_api (client, 2); collection = mongoc_client_get_collection (client, "test", "test"); bulk6 (collection); mongoc_uri_destroy (uri); mongoc_collection_destroy (collection); mongoc_client_destroy (client); mongoc_cleanup (); return EXIT_SUCCESS; }
reply
문서가 비어 있습니다.
{ }
추가 읽기
See the Driver Bulk API Spec, which describes bulk write operations for all MongoDB drivers.