トランザクション
MongoDB では単一ドキュメントに対する操作はアトミックなものとなります。単一ドキュメント構造内でのデータ間の関係をキャプチャするには、複数のドキュメントとコレクションにわたる正規化プロセスを経る代わりに埋め込み配列を利用できるため、単一ドキュメントのこうしたアトミック性によって多数の実用的なユースケースで分散トランザクションは不要になります。
複数のドキュメント (単一または複数のコレクション内) への読み取りと書き込みのアトミック性が必要な状況では、MongoDB は分散トランザクションをサポートします。分散トランザクションを使用すると、トランザクションを複数の操作、コレクション、データベース、ドキュメント、およびシャードにわたって使用できます。
このページの情報は、次の環境でホストされている配置に適用されます。
MongoDB Atlas はクラウドでの MongoDB 配置のためのフルマネージド サービスです
MongoDB Enterprise: サブスクリプションベースの自己管理型 MongoDB バージョン
MongoDB Community: ソースが利用可能で、無料で使用できる自己管理型の MongoDB のバージョン
トランザクション API
➤ 右上の言語の選択ドロップダウンメニューを使用して、以下の例の言語を設定します。
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
static bool with_transaction_example (bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_write_concern_t *wc = NULL; mongoc_collection_t *coll = NULL; bool success = false; bool ret = false; bson_t *doc = NULL; bson_t *insert_opts = NULL; mongoc_client_session_t *session = NULL; mongoc_transaction_opt_t *txn_opts = NULL; /* For a replica set, include the replica set name and a seedlist of the * members in the URI string; e.g. * uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \ * "27017/?replicaSet=myRepl"; * client = mongoc_client_new (uri_repl); * For a sharded cluster, connect to the mongos instances; e.g. * uri_sharded = * "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; * client = mongoc_client_new (uri_sharded); */ client = get_client (); /* Prereq: Create collections. Note Atlas connection strings include a majority write * concern by default. */ wc = mongoc_write_concern_new (); mongoc_write_concern_set_wmajority (wc, 0); insert_opts = bson_new (); mongoc_write_concern_append (wc, insert_opts); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } /* Step 1: Start a client session. */ session = mongoc_client_start_session (client, NULL /* opts */, error); if (!session) { goto fail; } /* Step 2: Optional. Define options to use for the transaction. */ txn_opts = mongoc_transaction_opts_new (); mongoc_transaction_opts_set_write_concern (txn_opts, wc); /* Step 3: Use mongoc_client_session_with_transaction to start a transaction, * execute the callback, and commit (or abort on error). */ ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error); if (!ret) { goto fail; } success = true; fail: bson_destroy (doc); mongoc_collection_destroy (coll); bson_destroy (insert_opts); mongoc_write_concern_destroy (wc); mongoc_transaction_opts_destroy (txn_opts); mongoc_client_session_destroy (session); mongoc_client_destroy (client); return success; } /* Define the callback that specifies the sequence of operations to perform * inside the transactions. */ static bool callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_collection_t *coll = NULL; bson_t *doc = NULL; bool success = false; bool ret = false; BSON_UNUSED (ctx); client = mongoc_client_session_get_client (session); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (1)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (999)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } success = true; fail: mongoc_collection_destroy (coll); bson_destroy (doc); return success; }
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
// The mongocxx::instance constructor and destructor initialize and shut down the driver, // respectively. Therefore, a mongocxx::instance must be created before using the driver and // must remain alive for as long as the driver is in use. mongocxx::instance inst{}; // For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. // uriString = // 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}}; // Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be // needed. The suggested Atlas connection string includes majority write concern by default. write_concern wc_majority{}; wc_majority.acknowledge_level(write_concern::level::k_majority); // Prereq: Create collections. auto foo = client["mydb1"]["foo"]; auto bar = client["mydb2"]["bar"]; try { options::insert opts; opts.write_concern(wc_majority); foo.insert_one(make_document(kvp("abc", 0)), opts); bar.insert_one(make_document(kvp("xyz", 0)), opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred while inserting: " << e.what() << std::endl; return EXIT_FAILURE; } // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transactions. client_session::with_transaction_cb callback = [&](client_session* session) { // Important:: You must pass the session to the operations. foo.insert_one(*session, make_document(kvp("abc", 1))); bar.insert_one(*session, make_document(kvp("xyz", 999))); }; // Step 2: Start a client session auto session = client.start_session(); // Step 3: Use with_transaction to start a transaction, execute the callback, // and commit (or abort on error). try { options::transaction opts; opts.write_concern(wc_majority); session.with_transaction(callback, opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS;
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"; // For a sharded cluster, connect to the mongos instances; e.g. // string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; var client = new MongoClient(connectionString); // Prereq: Create collections. var database1 = client.GetDatabase("mydb1"); var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority); collection1.InsertOne(new BsonDocument("abc", 0)); var database2 = client.GetDatabase("mydb2"); var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority); collection2.InsertOne(new BsonDocument("xyz", 0)); // Step 1: Start a client session. using (var session = client.StartSession()) { // Step 2: Optional. Define options to use for the transaction. var transactionOptions = new TransactionOptions( writeConcern: WriteConcern.WMajority); // Step 3: Define the sequence of operations to perform inside the transactions var cancellationToken = CancellationToken.None; // normally a real token would be used result = session.WithTransaction( (s, ct) => { try { collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct); collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct); } catch (MongoWriteException) { // Do something in response to the exception throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur. } return "Inserted into collections in different databases"; }, transactionOptions, cancellationToken); }
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
// WithTransactionExample is an example of using the Session.WithTransaction function. func WithTransactionExample(ctx context.Context) error { // For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl" // For a sharded cluster, connect to the mongos instances; e.g. // uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/" uri := mtest.ClusterURI() clientOpts := options.Client().ApplyURI(uri) client, err := mongo.Connect(clientOpts) if err != nil { return err } defer func() { _ = client.Disconnect(ctx) }() // Prereq: Create collections. wcMajority := writeconcern.Majority() wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority) fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts) barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts) // Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction. callback := func(sesctx context.Context) (interface{}, error) { // Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the // transaction. if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil { return nil, err } if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil { return nil, err } return nil, nil } // Step 2: Start a session and run the callback using WithTransaction. session, err := client.StartSession() if err != nil { return err } defer session.EndSession(ctx) result, err := session.WithTransaction(ctx, callback) if err != nil { return err } log.Printf("result: %v\n", result) return nil }
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
/* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances. For example: String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin"; */ final MongoClient client = MongoClients.create(uri); /* Create collections. */ client.getDatabase("mydb1").getCollection("foo") .withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0)); client.getDatabase("mydb2").getCollection("bar") .withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0)); /* Step 1: Start a client session. */ final ClientSession clientSession = client.startSession(); /* Step 2: Optional. Define options to use for the transaction. */ TransactionOptions txnOptions = TransactionOptions.builder() .writeConcern(WriteConcern.MAJORITY) .build(); /* Step 3: Define the sequence of operations to perform inside the transactions. */ TransactionBody txnBody = new TransactionBody<String>() { public String execute() { MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo"); MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar"); /* Important:: You must pass the session to the operations. */ coll1.insertOne(clientSession, new Document("abc", 1)); coll2.insertOne(clientSession, new Document("xyz", 999)); return "Inserted into collections in different databases"; } }; try { /* Step 4: Use .withTransaction() to start a transaction, execute the callback, and commit (or abort on error). */ clientSession.withTransaction(txnBody, txnOptions); } catch (RuntimeException e) { // some error handling } finally { clientSession.close(); }
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = AsyncIOMotorClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. async def callback(my_session): collection_one = my_session.client.mydb1.foo collection_two = my_session.client.mydb2.bar # Important:: You must pass the session to the operations. await collection_one.insert_one({"abc": 1}, session=my_session) await collection_two.insert_one({"xyz": 999}, session=my_session) # Step 2: Start a client session. async with await client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). await session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' const client = new MongoClient(uri); await client.connect(); // Prereq: Create collections. await client .db('mydb1') .collection('foo') .insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } }); await client .db('mydb2') .collection('bar') .insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } }); // Step 1: Start a Client Session const session = client.startSession(); // Step 2: Optional. Define options to use for the transaction const transactionOptions = { readPreference: 'primary', readConcern: { level: 'local' }, writeConcern: { w: 'majority' } }; // Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error) // Note: The callback for withTransaction MUST be async and/or return a Promise. try { await session.withTransaction(async () => { const coll1 = client.db('mydb1').collection('foo'); const coll2 = client.db('mydb2').collection('bar'); // Important:: You must pass the session to the operations await coll1.insertOne({ abc: 1 }, { session }); await coll2.insertOne({ xyz: 999 }, { session }); }, transactionOptions); } finally { await session.endSession(); await client.close(); }
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
sub runTransactionWithRetry { my ( $txnFunc, $session ) = @_; LOOP: { eval { $txnFunc->($session); # performs transaction }; if ( my $error = $@ ) { print("Transaction aborted-> Caught exception during transaction.\n"); # If transient error, retry the whole transaction if ( $error->has_error_label("TransientTransactionError") ) { print("TransientTransactionError, retrying transaction ->..\n"); redo LOOP; } else { die $error; } } } return; } sub commitWithRetry { my ($session) = @_; LOOP: { eval { $session->commit_transaction(); # Uses write concern set at transaction start. print("Transaction committed->\n"); }; if ( my $error = $@ ) { # Can retry commit if ( $error->has_error_label("UnknownTransactionCommitResult") ) { print("UnknownTransactionCommitResult, retrying commit operation ->..\n"); redo LOOP; } else { print("Error during commit ->..\n"); die $error; } } } return; } # Updates two collections in a transactions sub updateEmployeeInfo { my ($session) = @_; my $employeesCollection = $session->client->ns("hr.employees"); my $eventsCollection = $session->client->ns("reporting.events"); $session->start_transaction( { readConcern => { level => "snapshot" }, writeConcern => { w => "majority" }, readPreference => 'primary', } ); eval { $employeesCollection->update_one( { employee => 3 }, { '$set' => { status => "Inactive" } }, { session => $session}, ); $eventsCollection->insert_one( { employee => 3, status => { new => "Inactive", old => "Active" } }, { session => $session}, ); }; if ( my $error = $@ ) { print("Caught exception during transaction, aborting->\n"); $session->abort_transaction(); die $error; } commitWithRetry($session); } # Start a session my $session = $client->start_session(); eval { runTransactionWithRetry(\&updateEmployeeInfo, $session); }; if ( my $error = $@ ) { # Do something with error } $session->end_session();
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
/* * For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. * uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' * For a sharded cluster, connect to the mongos instances; e.g. * uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' */ $client = new \MongoDB\Client($uriString); // Prerequisite: Create collections. $client->selectCollection( 'mydb1', 'foo', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['abc' => 0]); $client->selectCollection( 'mydb2', 'bar', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['xyz' => 0]); // Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. $callback = function (\MongoDB\Driver\Session $session) use ($client): void { $client ->selectCollection('mydb1', 'foo') ->insertOne(['abc' => 1], ['session' => $session]); $client ->selectCollection('mydb2', 'bar') ->insertOne(['xyz' => 999], ['session' => $session]); }; // Step 2: Start a client session. $session = $client->startSession(); // Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). \MongoDB\with_transaction($session, $callback);
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = MongoClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. def callback(session): collection_one = session.client.mydb1.foo collection_two = session.client.mydb2.bar # Important:: You must pass the session to the operations. collection_one.insert_one({"abc": 1}, session=session) collection_two.insert_one({"xyz": 999}, session=session) # Step 2: Start a client session. with client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000}) # Prereq: Create collections. client.use('mydb1')['foo'].insert_one(abc: 0) client.use('mydb2')['bar'].insert_one(xyz: 0) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. callback = Proc.new do |my_session| collection_one = client.use('mydb1')['foo'] collection_two = client.use('mydb2')['bar'] # Important: You must pass the session to the operations. collection_one.insert_one({'abc': 1}, session: my_session) collection_two.insert_one({'xyz': 999}, session: my_session) end #. Step 2: Start a client session. session = client.start_session # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( read_concern: {level: :local}, write_concern: {w: :majority, wtimeout: 1000}, read: {mode: :primary}, &callback)
この例では、トランザクション API の主要なコンポーネントが強調表示されています。Callback API の使用は特に注目に値し、次の役割を果たします。
トランザクションを開始します
指定された操作を実行します
結果をコミットするか、エラーが発生した場合はトランザクションを終了します
DuplicateKeyError
などのサーバーサイド操作のエラーにより、トランザクションが終了し、コマンド エラーが発生してトランザクションが終了したことをユーザーにアラートする可能性があります。この動作は予想され、クライアントがSession.abortTransaction()
を呼び出しない場合でも発生します。カスタム エラー処理を組み込むには、トランザクションでCore APIを使用します。
コールバック APIには、特定のエラーに対する再試行ロジックが組み込まれています。ドライバーは、 TransientTransactionErrorまたはUnknownTransactionCommitResultのコミット エラーの発生後に、トランザクションを再度実行しようとします。
MongoDB 6.2 以降、サーバーは TransactionTooLargeForCache エラーを受信してもトランザクションを再試行しません。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
// For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. let uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/? // replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g. // let uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; let client = Client::with_uri_str(uri).await?; // Prereq: Create collections. CRUD operations in transactions must be on existing collections. client .database("mydb1") .collection::<Document>("foo") .insert_one(doc! { "abc": 0}) .await?; client .database("mydb2") .collection::<Document>("bar") .insert_one(doc! { "xyz": 0}) .await?; // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transaction. async fn callback(session: &mut ClientSession) -> Result<()> { let collection_one = session .client() .database("mydb1") .collection::<Document>("foo"); let collection_two = session .client() .database("mydb2") .collection::<Document>("bar"); // Important: You must pass the session to the operations. collection_one .insert_one(doc! { "abc": 1 }) .session(&mut *session) .await?; collection_two .insert_one(doc! { "xyz": 999 }) .session(session) .await?; Ok(()) } // Step 2: Start a client session. let mut session = client.start_session().await?; // Step 3: Use and_run to start a transaction, execute the callback, and commit (or // abort on error). session .start_transaction() .and_run((), |session, _| callback(session).boxed()) .await?;
この例では Core API を使用しています。Core API にはTransientTransactionError
と UnknownTransactionCommitResult
のいずれのコミット エラーの再試行ロジックも組み込まれていないため、この例ではこれらのエラーに対してトランザクションを再試行するための明示的なロジックが含まれています。
重要
ご使用の MongoDB バージョン向けの MongoDB ドライバーを使用してください。
ドライバーを使用する場合、トランザクション内の各操作によってセッションがそれぞれの操作に渡される必要があります。
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)、トランザクションレベルの書込み保証(write concern)、トランザクションレベルの読み込み設定(read preference)が使用されます。
トランザクション内で暗黙的または明示的にコレクションを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
/* * Copyright 2008-present MongoDB, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.mongodb.scala import org.mongodb.scala.model.{Filters, Updates} import org.mongodb.scala.result.UpdateResult import scala.concurrent.Await import scala.concurrent.duration.Duration //scalastyle:off magic.number class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec { // Implicit functions that execute the Observable and return the results val waitDuration = Duration(5, "seconds") implicit class ObservableExecutor[T](observable: Observable[T]) { def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration) } implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) { def execute(): T = Await.result(observable.toFuture(), waitDuration) } // end implicit functions "The Scala driver" should "be able to commit a transaction" in withClient { client => assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost()) client.getDatabase("hr").drop().execute() client.getDatabase("hr").createCollection("employees").execute() client.getDatabase("hr").createCollection("events").execute() updateEmployeeInfoWithRetry(client).execute() should equal(Completed()) client.getDatabase("hr").drop().execute() should equal(Completed()) } def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = { observable.map(clientSession => { val employeesCollection = database.getCollection("employees") val eventsCollection = database.getCollection("events") val transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() clientSession.startTransaction(transactionOptions) employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive")) .subscribe((res: UpdateResult) => println(res)) eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active"))) .subscribe((res: Completed) => println(res)) clientSession }) } def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => { println("UnknownTransactionCommitResult, retrying commit operation ...") commitAndRetry(observable) } case e: Exception => { println(s"Exception during commit ...: $e") throw e } }) } def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { println("TransientTransactionError, aborting transaction and retrying ...") runTransactionAndRetry(observable) } }) } def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = { val database = client.getDatabase("hr") val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession()) val commitTransactionObservable: SingleObservable[Completed] = updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction()) val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable) runTransactionAndRetry(commitAndRetryObservable) } }
トランザクションと不可分性
MongoDB は(単一または複数のコレクション内の)複数のドキュメントへの読み取りと書込みにアトミック性を必要とする状況で、レプリカセットやシャーディングされたクラスターでのトランザクションを含む分散トランザクションをサポートします。
分散トランザクションは次のとおりアトミックな性質を持ちます。
トランザクションは、データ変更をすべて適用するか、変更をロールバックします。
トランザクションがコミットされると、トランザクション内で行われたすべてのデータ変更が保存され、トランザクション外でも表示されます。
トランザクションがコミットされるまで、トランザクションで行われたデータ変更はトランザクションの外部には表示されません。
ただし、トランザクションが複数のシャードに書き込む場合、すべての外部読み取り操作が、コミットされたトランザクションの結果がシャード全体で表示されるまで待機する必要はありません。たとえば、トランザクションがコミットされ、書込み 1 がシャード A で表示されているものの、書込み 2 がシャード B にまだ表示されていない場合、読み取り保証(read concern)
"local"
での外部読み取りは、書き込み 2 を見ることなく書き込み 1 の結果を読み取ることができます。トランザクションが中止されると、トランザクションで行われたすべてのデータの変更は表示されることなく破棄されます。たとえば、トランザクションはいずれかの操作に失敗すると中止され、トランザクションで行われたすべてのデータ変更は、表示されることなく破棄されます。
重要
ほとんどの場合、分散トランザクションでは 1 つのドキュメントの書き込み (write) よりもパフォーマンス コストが高くなります。分散トランザクションの可用性は、効果的なスキーマ設計の代わりにはなりません。多くのシナリオにおいて、非正規化されたデータモデル(埋め込みドキュメントと配列)が引き続きデータやユースケースに最適です。つまり、多くのシナリオにおいて、データを適切にモデリングすることで、分散トランザクションの必要性を最小限に抑えることができます。
トランザクションの使用に関するその他の考慮事項(ランタイム制限や oplog サイズ制限など)については、「本番環境での考慮事項」も参照してください。
トランザクションと操作
分散トランザクションは、複数の操作、コレクション、データベース、ドキュメント、シャードにわたって使用できます。
トランザクションについて:
トランザクション内でコレクションとインデックスを作成できます。詳細については、「トランザクション内でのコレクションとインデックスの作成」を参照してください。
トランザクションで使用されるコレクションは、異なるデータベースにある可能性があります。
注意
クロスシャードの書き込みトランザクションでは新しいコレクションを作成できません。たとえば、あるシャードで既存コレクションに書き込み、別のシャードで暗示的にコレクションを作成する場合、MongoDB では同じトランザクションで両方の操作を実行できません。
Capped コレクションには書き込めません。
Capped コレクションから読み取る場合、読み取り保証(read concern)
"snapshot"
は使用できません。(MongoDB 5.0 以降)config
データベース、admin
データベース、またはlocal
データベース内のコレクションの読み取りと書き込みはできません。system.*
コレクションに書き込み (write) はできません。explain
または類似コマンドを使用して、サポートされている操作のクエリプランを返すことはできません。
トランザクションの外部で作成されたカーソルの場合、トランザクション内で
getMore
を呼び出せません。トランザクション内で作成されたカーソルの場合、トランザクション外で
getMore
を呼び出せません。
killCursors
コマンドをトランザクションの最初の操作に指定することはできません。さらに、トランザクション内で
killCursors
コマンドを実行すると、サーバーは指定されたカーソルを直ちに停止します。トランザクションがコミットされるのを待ちません。
トランザクションでサポートされない操作のリストについては、「制限付き操作」を参照してください。
Tip
トランザクションを開始する直前にコレクションを作成または削除する際に、トランザクション内のコレクションにアクセスする場合、書込み保証(write concern)付きで "majority"
に作成または削除操作を発行して、トランザクションが必要なロックを取得できるようにします。
トランザクション内でのコレクションとインデックスの作成
トランザクションがクロスシャード書込みトランザクションでない場合、分散トランザクションで次の操作を実行できます。
コレクションを作成する。
同じトランザクション内で以前に作成された新しい空のコレクションにインデックスを作成する
トランザクション内でコレクションを作成する際、以下を実行できます。
暗黙的にコレクションを作成できます。たとえば次のいずれかを使用します。
存在しないコレクションに対する挿入操作。
存在しないコレクションを
upsert: true
に設定した update/findAndModify 操作。
create
コマンドまたはそのヘルパーであるdb.createCollection()
を使用して、明示的にコレクションを作成できます。
トランザクション内でインデックスを作成する場合 [1]、作成するインデックスは次のいずれかに配置される必要があります。
存在しないコレクション。コレクションは、操作中に作成されます。
同じトランザクション内で以前に作成された新しい空のコレクション。
[1] | 既存インデックスに対して db.collection.createIndex() と db.collection.createIndexes() を実行して、その有無を確認することもできます。これらの操作は、インデックスを作成せずに正常に返されます。 |
制限事項
クロスシャードの書き込みトランザクションでは新しいコレクションを作成できません。たとえば、あるシャードで既存コレクションに書き込み、別のシャードで暗示的にコレクションを作成する場合、MongoDB では同じトランザクションで両方の操作を実行できません。
シャーディングされたコレクションをターゲットにしている間は、トランザクション内で
$graphLookup
ステージを使用できません。トランザクション内でコレクションまたはインデックスを明示的に作成する場合、トランザクションの読み取り保証(read concern)は
"local"
でなければなりません。コレクションとインデックスを明示的に作成するには、次のコマンドとメソッドを使用します。
カウント操作
トランザクション内でカウント操作を実行するには、$count
集計ステージまたは $group
($sum
式を使用) 集計ステージを使用します。
MongoDB ドライバーは、$group
と $sum
式を併用してカウントを実行するヘルパー メソッドとして、コレクションレベルの API countDocuments(filter, options)
を提供します。
mongosh
には、$group
を $sum
式と併用してカウントを実行する db.collection.countDocuments()
ヘルパー メソッドが用意されています。
個別の操作
トランザクション内で個別の操作を実行するには、以下を使用できます。
シャーディングされていないコレクションの場合、
db.collection.distinct()
メソッドまたはdistinct
コマンド、および 集計パイプラインを$group
ステージと併用できます。シャーディングされたコレクションの場合、
db.collection.distinct()
メソッド、またはdistinct
コマンドは使用できません。シャーディングされたコレクションの個別の値を検索するには、代わりに
$group
ステージで集計パイプラインを使用します。以下に例を挙げます。db.coll.distinct("x")
の代わりに以下を使用しますdb.coll.aggregate([ { $group: { _id: null, distinctValues: { $addToSet: "$x" } } }, { $project: { _id: 0 } } ]) db.coll.distinct("x", { status: "A" })
の代わりに以下を使用します。db.coll.aggregate([ { $match: { status: "A" } }, { $group: { _id: null, distinctValues: { $addToSet: "$x" } } }, { $project: { _id: 0 } } ])
パイプラインはドキュメントにカーソルを返します。
{ "distinctValues" : [ 2, 3, 1 ] } カーソルを反復して、結果ドキュメントにアクセスします。
情報に関する操作
情報提供コマンドには、hello
、buildInfo
、connectionStatus
(およびこれらのヘルパー メソッド)などがあり、トランザクションに含めることができますが、最初の操作になることはできません。
制限付き操作
次の操作はトランザクションでは許可されません。
クロスシャードの書き込みトランザクションで新しいコレクションを作成します。たとえば、あるシャードで既存コレクションに書き込み、別のシャードで暗示的にコレクションを作成する場合、MongoDB では同じトランザクションで両方の操作を実行できません。
コレクションの明示的な作成(
db.createCollection()
メソッドやインデックスの明示的な作成(db.collection.createIndexes()
およびdb.collection.createIndex()
メソッドなど)で、"local"
以外の読み取り保証(read concern)レベルを使用する場合listCollections
コマンドとlistIndexes
コマンド、およびそのヘルパー メソッド。その他の CRUD 操作や情報操作以外の操作、例えば
createUser
、getParameter
、count
などおよびその補助ツール。
トランザクションとセッション
トランザクションはセッションに関連付けられます。
1 セッションで一度に開くことができるトランザクションは 1 つまでです。
ドライバーを使用する場合、トランザクション内の各操作はセッションに関連付けられる必要があります。詳細については、ドライバー固有のドキュメントを参照してください。
セッションが終了し、そのセッションにオープン トランザクションがある場合、そのトランザクションは中止されます。
読み取り保証/書込み保証/読み込み設定
トランザクションと読み込み設定(read preference)
トランザクション内の操作では、トランザクションレベルの読み込み設定(read preference)が使用されます。
ドライバーを使用すると、トランザクション開始時にトランザクションレベルの読み込み設定(read preference)を設定できます。
読み込み設定(read preference)がトランザクションレベルで設定されていない場合、セッションレベルの読み込み設定がトランザクションで使用されます。
読み込み設定(read preference)がトランザクションとセッションのいずれのレベルでも設定されていない場合、クライアントレベルの読み込み設定がトランザクションで使用されます。デフォルトでは、クライアントレベルの読み込み設定が
primary
となります。
読み取り操作を含む分散トランザクションでは、読み込み設定(read preference)primary
を使用する必要があります。特定のトランザクション内のすべての操作は、同じノードにルーティングする必要があります。
トランザクションと読み取り保証
トランザクション内の操作では、トランザクションレベルの読み取り保証(read concern)が使用されます。つまり、コレクションレベルとデータベースレベルで設定された読み取り保証は、トランザクション内では無視されます。
トランザクションの開始時に、トランザクションレベルの読み取り保証(read concern)を設定できます。
トランザクションレベルの読み取り保証(read concern)の設定が解除されている場合、トランザクションレベルの読み取り保証はデフォルトでセッションレベルの読み取り保証になります。
トランザクションレベルとセッションレベルの読み取り保証(read concern)の設定が解除されている場合、トランザクションレベルの読み取り保証はデフォルトでクライアントレベルの読み取り保証に設定されます。デフォルトでは、クライアントレベルの読み取り保証は プライマリでの読み取りで
"local"
となります。以下も参照してください。
トランザクションは次の読み取り保証(read concern)レベルをサポートします。
"local"
読み取り保証(read concern)
"local"
は、ノードから入手可能な最新データを返しますが、ロールバックも可能です。レプリカセットでは、トランザクションが読み取り保証
local
を使用している場合でも、トランザクションが開かれた時点でのスナップショットから操作が読み取られる場合、より強力な読み取り分離が見られる可能性があります。シャーディングされたクラスター上のトランザクションの場合、
"local"
読み取り保証(read concern)では、シャード全体でデータが同じスナップショット ビューから取得されることを保証できません。スナップショットの分離が必要な場合は、"snapshot"
読み取り保証(read concern)を使用してください。トランザクション内でコレクションとインデックスを作成できます。コレクションまたはインデックスを明示的に作成する場合、トランザクションで読み取り保証
"local"
を使用する必要があります。コレクションを暗黙的に作成する場合は、トランザクションに任意の読み取り保証を使用できます。
"majority"
トランザクションが「majority 」の書込み保証(write concern)でコミットされた場合、読み取り保証(read concern)
"majority"
はレプリカセット ノードの過半数によって確認済みであり、かつロールバックできないデータを返します。それ以外の場合、読み取り保証"majority"
では読み取り操作によって過半数のコミット済みデータが読み取られる保証はありません。シャーディングされたクラスター上のトランザクションの場合、読み取り保証(read concern)
"majority"
では、シャード全体でデータが同じスナップショット ビューから取得されることを保証できません。スナップショットの分離が必要な場合は、読み取り保証"snapshot"
を使用します。
"snapshot"
トランザクションが「majority 」の書込み保証(write concern)でコミットされる場合、読み取り保証(read concern)
"snapshot"
はコミットされた過半数のデータのスナップショットからのデータを返します。トランザクションが「過半数」の書込み保証(write concern)をコミットに使用しない場合、
"snapshot"
読み取り保証(read concern)は、読み取り操作によってコミットされた過半数のデータのスナップショットが使用されたことを保証しません。シャーディングされたクラスター上のトランザクションの場合、データの
"snapshot"
ビューはシャード間で同期されます。
トランザクションと書込み保証
トランザクションは、トランザクションレベルの書込み保証(write concern)を使用して書込み操作をコミットします。トランザクション内の書込み操作は、明示的な書込み保証を指定せずに、デフォルトの書込み保証を使用して実行される必要があります。コミット時に、書込み操作はトランザクションレベルの書込み保証を使用します。
Tip
トランザクション内の個々の書込み操作に書込み保証(write concern)を明示的に設定しないでください。トランザクション内の個々の書込み操作に書込み保証を設定すると、エラーが返されます。
トランザクションの開始時に、トランザクションレベルの書込み保証(write concern)を設定できます。
トランザクションレベルの書込み保証(write concern)の設定が解除されている場合、トランザクションレベルの書込み保証はデフォルトでセッションレベルの書込み保証でコミットされます。
トランザクションレベルとセッションレベルのいずれでも書込み保証(write concern)の設定が解除されている場合、トランザクションレベルの書込み保証はデフォルトでクライアントレベルの書込み保証になります。
w: "majority"
(MongoDB 5.0 以降の場合)。アービタを含む配置では差異があります。詳細については、「暗黙のデフォルト書込み保証(write concern)」を参照してください。
トランザクションは、以下を含むすべての書込み保証(write concern)の w 値をサポートします。
w: 1
書込み保証(write concern)
w: 1
は、コミットがプライマリに適用された後に確認応答を返します。重要
w: 1
でコミットする場合、トランザクションはフェイルオーバーが発生した場合にロールバック可能です。w: 1
書込み保証(write concern)でコミットする場合、トランザクションレベルの"majority"
読み取り保証(read concern)では、トランザクション内の読み取り操作が過半数でコミットされたデータを読み取るという保証は ありません。w: 1
書込み保証(write concern)でコミットする場合、トランザクションレベルの"snapshot"
読み取り保証(read concern)では、トランザクション内の読み取り操作で過半数でコミットされたデータのスナップショットが使用されるという保証は ありません。
w: "majority"
w: "majority"
の書込み保証(write concern)は、コミットが投票ノードの過半数に適用された後に確認応答を返します。w: "majority"
書込み保証(write concern) でコミットする場合、トランザクションレベルの"majority"
読み取り保証(read concern)により、操作によって過半数でコミットされたデータが読み取られたことが保証されます。シャーディングされたクラスター上のトランザクションでは、コミットされた過半数のデータのこのビューはシャード間で同期されません。w: "majority"
書込み保証(write concern)でコミットする場合、トランザクションレベルの"snapshot"
読み取り保証(read concern)により、操作によって過半数でコミットされたデータの同期されたスナップショットから読み取られたことが保証されます。
注意
トランザクションに指定された書込み保証(write concern)にかかわらず、シャーディングされたトランザクションに対するコミット操作の一部には、{w:
"majority", j: true}
書込み保証を使用する部分が含まれます。
サーバー パラメーター coordinateCommitReturnImmediatelyAfterPersistingDecision
は、トランザクションをコミットする決定がクライアントに返されるタイミングを制御します。
このパラメーターは MongDB 5.0 で導入されたもので、デフォルト値はtrue
です。MongoDB 6.1 ではデフォルト値が false
に変更されています。
coordinateCommitReturnImmediatelyAfterPersistingDecision
が false
の場合、シャード トトランザクションの調整役は、すべてのノードがマルチドキュメントトランザクションのコミットを確認するまで待機してから、コミットの決定をクライアントに返します。
マルチドキュメントトランザクションに対して "majority"
書込み保証(write concern)を指定し、レプリカセット ノードの計算された過半数へのレプリケート トランザクションに失敗すると、そのトランザクションはレプリカセット ノードに対してすぐにロールバックされない可能性があります。レプリカセットは最終的に整合性を保持します。トランザクションは常にすべてのレプリカセット ノードに適用またはロールバックされます。
トランザクションに指定された書込み保証(write concern)にかかわらず、ドライバーは commitTransaction
の再試行時に書込み保証として w: "majority"
を適用します。
一般情報
以下の各セクションでは、トランザクションに関する詳細な考慮事項を取りあげています。
本番環境での考慮事項
本番環境でのトランザクションについては、「本番環境での考慮事項」を参照してください。シャーディングされたクラスターについては、「本番環境での考慮事項(シャーディングされたクラスター)」も参照してください。
アービタ
レプリカセットにアービタがある場合、トランザクションを使用してシャードキーを変更することはできません。 アービタは、 マルチシャード トランザクションに必要なデータ操作に参加することはできません。
書込み操作が複数のシャードにまたがるトランザクションで、アービタを含むシャードを対象に読み取りまたは書込み操作が実行される場合、そのトランザクションはエラーとなり中止します。
シャード構成の制限
シャーディングされたクラスターのうち writeConcernMajorityJournalDefault
が false
に設定されているもの(インメモリ ストレージエンジンを使用する投票ノードのあるシャードなど)ではトランザクションを実行できません。
注意
トランザクションに指定された書込み保証(write concern)にかかわらず、シャーディングされたトランザクションに対するコミット操作の一部には、{w:
"majority", j: true}
書込み保証を使用する部分が含まれます。
診断
トランザクションのステータスとメトリクスを取得するには、次のメソッドを使用します。
ソース | 戻り値 |
---|---|
db.serverStatus() methodserverStatus command | |
$currentOp 集計パイプライン | 次の値を返します。
|
db.currentOp() methodcurrentOp command | 次の値を返します。
|
低速( operationProfiling.slowOpThresholdMs のしきい値を超える)トランザクションに関する情報を TXN ログ コンポーネントに含めます。 |
機能の互換性バージョン(fCV)
トランザクションを使用するには、配置のすべてのノードの FeatureCompatibilityVersion が次のバージョン以上である必要があります。
配置 | 最小 featureCompatibilityVersion |
---|---|
レプリカセット | 4.0 |
シャーディングされたクラスター | 4.2 |
ノードの機能の互換性バージョンを確認するには、該当ノードに接続して次のコマンドを実行します。
db.adminCommand( { getParameter: 1, featureCompatibilityVersion: 1 } )
詳細については、setFeatureCompatibilityVersion
に関する参考ページを参照してください。
ストレージ エンジン
分散トランザクションは次のレプリカセットとシャーディングされたクラスターでサポートされます。
プライマリが WiredTiger ストレージエンジンを使用しており、かつ
セカンダリ ノードが、WiredTiger ストレージエンジンまたはインメモリ ストレージエンジンのいずれかを使用している。
注意
シャーディングされたクラスターのうち writeConcernMajorityJournalDefault
が false
に設定されているもの(インメモリ ストレージエンジンを使用する投票ノードのあるシャードなど)ではトランザクションを実行できません。
クリティカル セクションの待機時間の制限
MongoDB 5.2 以降(および 5.0.4で)、以下の項目が適用されます。
クエリがシャードにアクセスしたときに、チャンク移行または DDL 操作によってコレクションのクリティカル セクションが保持されている可能性があります。
トランザクション内のクリティカル セクションをシャードが待機する時間を制限するには、
metadataRefreshInTransactionMaxWaitBehindCritSecMS
パラメーターを使用します。