事务
在 MongoDB 中,对单个文档的操作具有原子性。由于您可以使用嵌入式文档和数组来捕获单个文档结构中数据之间的关系,而无需跨多个文档和集合进行标准化,因此这种单文档原子性消除了许多实际使用案例使用分布式事务的必要性。
对于需要对多个文档(在单个或多个集合中)的读写操作具有原子性的情况,MongoDB 支持多文档事务。利用分布式事务,可以跨多个操作、集合、数据库、文档和分片使用事务。
此页面上的信息适用于在以下环境中托管的部署:
MongoDB Atlas :用于在云中部署 MongoDB 的完全托管服务
MongoDB Enterprise:基于订阅、自我管理的 MongoDB 版本
MongoDB Community:源代码可用、免费使用且可自行管理的 MongoDB 版本
事务 API
➤ 使用右上角的选择语言下拉菜单来设置以下示例的语言。
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
static bool with_transaction_example (bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_write_concern_t *wc = NULL; mongoc_collection_t *coll = NULL; bool success = false; bool ret = false; bson_t *doc = NULL; bson_t *insert_opts = NULL; mongoc_client_session_t *session = NULL; mongoc_transaction_opt_t *txn_opts = NULL; /* For a replica set, include the replica set name and a seedlist of the * members in the URI string; e.g. * uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \ * "27017/?replicaSet=myRepl"; * client = mongoc_client_new (uri_repl); * For a sharded cluster, connect to the mongos instances; e.g. * uri_sharded = * "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; * client = mongoc_client_new (uri_sharded); */ client = get_client (); /* Prereq: Create collections. Note Atlas connection strings include a majority write * concern by default. */ wc = mongoc_write_concern_new (); mongoc_write_concern_set_wmajority (wc, 0); insert_opts = bson_new (); mongoc_write_concern_append (wc, insert_opts); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (0)); ret = mongoc_collection_insert_one (coll, doc, insert_opts, NULL /* reply */, error); if (!ret) { goto fail; } /* Step 1: Start a client session. */ session = mongoc_client_start_session (client, NULL /* opts */, error); if (!session) { goto fail; } /* Step 2: Optional. Define options to use for the transaction. */ txn_opts = mongoc_transaction_opts_new (); mongoc_transaction_opts_set_write_concern (txn_opts, wc); /* Step 3: Use mongoc_client_session_with_transaction to start a transaction, * execute the callback, and commit (or abort on error). */ ret = mongoc_client_session_with_transaction (session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error); if (!ret) { goto fail; } success = true; fail: bson_destroy (doc); mongoc_collection_destroy (coll); bson_destroy (insert_opts); mongoc_write_concern_destroy (wc); mongoc_transaction_opts_destroy (txn_opts); mongoc_client_session_destroy (session); mongoc_client_destroy (client); return success; } /* Define the callback that specifies the sequence of operations to perform * inside the transactions. */ static bool callback (mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) { mongoc_client_t *client = NULL; mongoc_collection_t *coll = NULL; bson_t *doc = NULL; bool success = false; bool ret = false; BSON_UNUSED (ctx); client = mongoc_client_session_get_client (session); coll = mongoc_client_get_collection (client, "mydb1", "foo"); doc = BCON_NEW ("abc", BCON_INT32 (1)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } bson_destroy (doc); mongoc_collection_destroy (coll); coll = mongoc_client_get_collection (client, "mydb2", "bar"); doc = BCON_NEW ("xyz", BCON_INT32 (999)); ret = mongoc_collection_insert_one (coll, doc, NULL /* opts */, *reply, error); if (!ret) { goto fail; } success = true; fail: mongoc_collection_destroy (coll); bson_destroy (doc); return success; }
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
// The mongocxx::instance constructor and destructor initialize and shut down the driver, // respectively. Therefore, a mongocxx::instance must be created before using the driver and // must remain alive for as long as the driver is in use. mongocxx::instance inst{}; // For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. // uriString = // 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' mongocxx::client client{mongocxx::uri{"mongodb://localhost/?replicaSet=repl0"}}; write_concern wc_majority{}; wc_majority.acknowledge_level(write_concern::level::k_majority); read_concern rc_local{}; rc_local.acknowledge_level(read_concern::level::k_local); read_preference rp_primary{}; rp_primary.mode(read_preference::read_mode::k_primary); // Prereq: Create collections. auto foo = client["mydb1"]["foo"]; auto bar = client["mydb2"]["bar"]; try { options::insert opts; opts.write_concern(wc_majority); foo.insert_one(make_document(kvp("abc", 0)), opts); bar.insert_one(make_document(kvp("xyz", 0)), opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred while inserting: " << e.what() << std::endl; return EXIT_FAILURE; } // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transactions. client_session::with_transaction_cb callback = [&](client_session* session) { // Important:: You must pass the session to the operations. foo.insert_one(*session, make_document(kvp("abc", 1))); bar.insert_one(*session, make_document(kvp("xyz", 999))); }; // Step 2: Start a client session auto session = client.start_session(); // Step 3: Use with_transaction to start a transaction, execute the callback, // and commit (or abort on error). try { options::transaction opts; opts.write_concern(wc_majority); opts.read_concern(rc_local); opts.read_preference(rp_primary); session.with_transaction(callback, opts); } catch (const mongocxx::exception& e) { std::cout << "An exception occurred: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS;
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"; // For a sharded cluster, connect to the mongos instances; e.g. // string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; var client = new MongoClient(connectionString); // Prereq: Create collections. var database1 = client.GetDatabase("mydb1"); var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority); collection1.InsertOne(new BsonDocument("abc", 0)); var database2 = client.GetDatabase("mydb2"); var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority); collection2.InsertOne(new BsonDocument("xyz", 0)); // Step 1: Start a client session. using (var session = client.StartSession()) { // Step 2: Optional. Define options to use for the transaction. var transactionOptions = new TransactionOptions( writeConcern: WriteConcern.WMajority); // Step 3: Define the sequence of operations to perform inside the transactions var cancellationToken = CancellationToken.None; // normally a real token would be used result = session.WithTransaction( (s, ct) => { try { collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct); collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct); } catch (MongoWriteException) { // Do something in response to the exception throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur. } return "Inserted into collections in different databases"; }, transactionOptions, cancellationToken); }
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
// WithTransactionExample is an example of using the Session.WithTransaction function. func WithTransactionExample(ctx context.Context) error { // For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl" // For a sharded cluster, connect to the mongos instances; e.g. // uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/" uri := mtest.ClusterURI() clientOpts := options.Client().ApplyURI(uri) client, err := mongo.Connect(clientOpts) if err != nil { return err } defer func() { _ = client.Disconnect(ctx) }() // Prereq: Create collections. wcMajority := writeconcern.Majority() wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority) fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts) barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts) // Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction. callback := func(sesctx context.Context) (interface{}, error) { // Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the // transaction. if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil { return nil, err } if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil { return nil, err } return nil, nil } // Step 2: Start a session and run the callback using WithTransaction. session, err := client.StartSession() if err != nil { return err } defer session.EndSession(ctx) result, err := session.WithTransaction(ctx, callback) if err != nil { return err } log.Printf("result: %v\n", result) return nil }
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
/* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances. For example: String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin"; */ final MongoClient client = MongoClients.create(uri); /* Create collections. */ client.getDatabase("mydb1").getCollection("foo") .withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("abc", 0)); client.getDatabase("mydb2").getCollection("bar") .withWriteConcern(WriteConcern.MAJORITY).insertOne(new Document("xyz", 0)); /* Step 1: Start a client session. */ final ClientSession clientSession = client.startSession(); /* Step 2: Optional. Define options to use for the transaction. */ TransactionOptions txnOptions = TransactionOptions.builder() .writeConcern(WriteConcern.MAJORITY) .build(); /* Step 3: Define the sequence of operations to perform inside the transactions. */ TransactionBody txnBody = new TransactionBody<String>() { public String execute() { MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo"); MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar"); /* Important:: You must pass the session to the operations. */ coll1.insertOne(clientSession, new Document("abc", 1)); coll2.insertOne(clientSession, new Document("xyz", 999)); return "Inserted into collections in different databases"; } }; try { /* Step 4: Use .withTransaction() to start a transaction, execute the callback, and commit (or abort on error). */ clientSession.withTransaction(txnBody, txnOptions); } catch (RuntimeException e) { // some error handling } finally { clientSession.close(); }
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = AsyncIOMotorClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. async def callback(my_session): collection_one = my_session.client.mydb1.foo collection_two = my_session.client.mydb2.bar # Important:: You must pass the session to the operations. await collection_one.insert_one({"abc": 1}, session=my_session) await collection_two.insert_one({"xyz": 999}, session=my_session) # Step 2: Start a client session. async with await client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). await session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' const client = new MongoClient(uri); await client.connect(); // Prereq: Create collections. await client .db('mydb1') .collection('foo') .insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } }); await client .db('mydb2') .collection('bar') .insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } }); // Step 1: Start a Client Session const session = client.startSession(); // Step 2: Optional. Define options to use for the transaction const transactionOptions = { readPreference: 'primary', readConcern: { level: 'local' }, writeConcern: { w: 'majority' } }; // Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error) // Note: The callback for withTransaction MUST be async and/or return a Promise. try { await session.withTransaction(async () => { const coll1 = client.db('mydb1').collection('foo'); const coll2 = client.db('mydb2').collection('bar'); // Important:: You must pass the session to the operations await coll1.insertOne({ abc: 1 }, { session }); await coll2.insertOne({ xyz: 999 }, { session }); }, transactionOptions); } finally { await session.endSession(); await client.close(); }
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
sub runTransactionWithRetry { my ( $txnFunc, $session ) = @_; LOOP: { eval { $txnFunc->($session); # performs transaction }; if ( my $error = $@ ) { print("Transaction aborted-> Caught exception during transaction.\n"); # If transient error, retry the whole transaction if ( $error->has_error_label("TransientTransactionError") ) { print("TransientTransactionError, retrying transaction ->..\n"); redo LOOP; } else { die $error; } } } return; } sub commitWithRetry { my ($session) = @_; LOOP: { eval { $session->commit_transaction(); # Uses write concern set at transaction start. print("Transaction committed->\n"); }; if ( my $error = $@ ) { # Can retry commit if ( $error->has_error_label("UnknownTransactionCommitResult") ) { print("UnknownTransactionCommitResult, retrying commit operation ->..\n"); redo LOOP; } else { print("Error during commit ->..\n"); die $error; } } } return; } # Updates two collections in a transactions sub updateEmployeeInfo { my ($session) = @_; my $employeesCollection = $session->client->ns("hr.employees"); my $eventsCollection = $session->client->ns("reporting.events"); $session->start_transaction( { readConcern => { level => "snapshot" }, writeConcern => { w => "majority" }, readPreference => 'primary', } ); eval { $employeesCollection->update_one( { employee => 3 }, { '$set' => { status => "Inactive" } }, { session => $session}, ); $eventsCollection->insert_one( { employee => 3, status => { new => "Inactive", old => "Active" } }, { session => $session}, ); }; if ( my $error = $@ ) { print("Caught exception during transaction, aborting->\n"); $session->abort_transaction(); die $error; } commitWithRetry($session); } # Start a session my $session = $client->start_session(); eval { runTransactionWithRetry(\&updateEmployeeInfo, $session); }; if ( my $error = $@ ) { # Do something with error } $session->end_session();
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
/* * For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. * uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' * For a sharded cluster, connect to the mongos instances; e.g. * uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' */ $client = new \MongoDB\Client($uriString); // Prerequisite: Create collections. $client->selectCollection( 'mydb1', 'foo', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['abc' => 0]); $client->selectCollection( 'mydb2', 'bar', [ 'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000), ], )->insertOne(['xyz' => 0]); // Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. $callback = function (\MongoDB\Driver\Session $session) use ($client): void { $client ->selectCollection('mydb1', 'foo') ->insertOne(['abc' => 1], ['session' => $session]); $client ->selectCollection('mydb2', 'bar') ->insertOne(['xyz' => 999], ['session' => $session]); }; // Step 2: Start a client session. $session = $client->startSession(); // Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). \MongoDB\with_transaction($session, $callback);
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = MongoClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. def callback(session): collection_one = session.client.mydb1.foo collection_two = session.client.mydb2.bar # Important:: You must pass the session to the operations. collection_one.insert_one({"abc": 1}, session=session) collection_two.insert_one({"xyz": 999}, session=session) # Step 2: Start a client session. with client.start_session() as session: # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( callback, read_concern=ReadConcern("local"), write_concern=wc_majority, read_preference=ReadPreference.PRIMARY, )
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000}) # Prereq: Create collections. client.use('mydb1')['foo'].insert_one(abc: 0) client.use('mydb2')['bar'].insert_one(xyz: 0) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. callback = Proc.new do |my_session| collection_one = client.use('mydb1')['foo'] collection_two = client.use('mydb2')['bar'] # Important: You must pass the session to the operations. collection_one.insert_one({'abc': 1}, session: my_session) collection_two.insert_one({'xyz': 999}, session: my_session) end #. Step 2: Start a client session. session = client.start_session # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction( read_concern: {level: :local}, write_concern: {w: :majority, wtimeout: 1000}, read: {mode: :primary}, &callback)
此示例重点介绍了事务 API 的关键组件。特别是,它使用回调 API。回调 API:
启动事务
执行指定操作
提交结果(或在出错时中止)
回调 API 包含特定错误的重试逻辑。服务器尝试在 TransientTransactionError 或 UnknownTransactionCommitResult 提交错误后重新运行事务。
从 MongoDB 6.2 开始,服务器在收到 TransactionTooLargeForCache 错误后不会重试事务。
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
// For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. let uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/? // replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g. // let uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; let client = Client::with_uri_str(uri).await?; // Prereq: Create collections. CRUD operations in transactions must be on existing collections. client .database("mydb1") .collection::<Document>("foo") .insert_one(doc! { "abc": 0}) .await?; client .database("mydb2") .collection::<Document>("bar") .insert_one(doc! { "xyz": 0}) .await?; // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transaction. async fn callback(session: &mut ClientSession) -> Result<()> { let collection_one = session .client() .database("mydb1") .collection::<Document>("foo"); let collection_two = session .client() .database("mydb2") .collection::<Document>("bar"); // Important: You must pass the session to the operations. collection_one .insert_one(doc! { "abc": 1 }) .session(&mut *session) .await?; collection_two .insert_one(doc! { "xyz": 999 }) .session(session) .await?; Ok(()) } // Step 2: Start a client session. let mut session = client.start_session().await?; // Step 3: Use and_run to start a transaction, execute the callback, and commit (or // abort on error). session .start_transaction() .and_run((), |session, _| callback(session).boxed()) .await?;
此示例使用的是核心 API 。由于核心 API 不包含 TransientTransactionError
或UnknownTransactionCommitResult
提交错误的重试逻辑,因此该示例包含针对这些错误重试事务的显式逻辑:
重要
使用适合您的 MongoDB 版本的 MongoDB 驱动程序。
使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。
您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。
/* * Copyright 2008-present MongoDB, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.mongodb.scala import org.mongodb.scala.model.{Filters, Updates} import org.mongodb.scala.result.UpdateResult import scala.concurrent.Await import scala.concurrent.duration.Duration //scalastyle:off magic.number class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec { // Implicit functions that execute the Observable and return the results val waitDuration = Duration(5, "seconds") implicit class ObservableExecutor[T](observable: Observable[T]) { def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration) } implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) { def execute(): T = Await.result(observable.toFuture(), waitDuration) } // end implicit functions "The Scala driver" should "be able to commit a transaction" in withClient { client => assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost()) client.getDatabase("hr").drop().execute() client.getDatabase("hr").createCollection("employees").execute() client.getDatabase("hr").createCollection("events").execute() updateEmployeeInfoWithRetry(client).execute() should equal(Completed()) client.getDatabase("hr").drop().execute() should equal(Completed()) } def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = { observable.map(clientSession => { val employeesCollection = database.getCollection("employees") val eventsCollection = database.getCollection("events") val transactionOptions = TransactionOptions.builder() .readPreference(ReadPreference.primary()) .readConcern(ReadConcern.SNAPSHOT) .writeConcern(WriteConcern.MAJORITY) .build() clientSession.startTransaction(transactionOptions) employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive")) .subscribe((res: UpdateResult) => println(res)) eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active"))) .subscribe((res: Completed) => println(res)) clientSession }) } def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => { println("UnknownTransactionCommitResult, retrying commit operation ...") commitAndRetry(observable) } case e: Exception => { println(s"Exception during commit ...: $e") throw e } }) } def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = { observable.recoverWith({ case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => { println("TransientTransactionError, aborting transaction and retrying ...") runTransactionAndRetry(observable) } }) } def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = { val database = client.getDatabase("hr") val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession()) val commitTransactionObservable: SingleObservable[Completed] = updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction()) val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable) runTransactionAndRetry(commitAndRetryObservable) } }
事务和原子性
对于需要对多个文档(在单个或多个集合中)原子性读取和写入的情况,MongoDB 支持分布式事务,包括副本集和分片集群上的事务。
分布式事务具有原子性:
事务要么应用所有数据更改,要么回滚更改。
在事务提交时,事务中所做的所有数据更改都会保存,并且在事务之外可见。
在事务进行提交前,在事务中所做的数据更改在事务外不可见。
不过,当事务写入多个分片时,并非所有外部读取操作都需等待已提交事务的结果在各个分片上可见。例如,如果事务已提交并且写入 1 在分片 A 上可见,但写入 2 在分片 B 上尚不可见,则读关注
"local"
处的外部读取可以在不看到写入 2 的情况下读取写入 1 的结果。事务中止后,在事务中所做的所有数据更改会被丢弃且不会变得可见。例如,如果事务中的任何操作失败,事务就会中止,事务中所做的所有数据更改将被丢弃且不会变得可见。
重要
在大多数情况下,与单文档写入操作相比,分布式事务会产生更高的性能成本,并且分布式事务的可用性不应取代有效的模式设计。在许多情况下,非规范化数据模型(嵌入式文档和数组)仍然是数据和使用案例的最佳选择。换言之,对于许多场景,适当的数据建模将最大限度地减少对分布式事务的需求。
有关其他事务使用注意事项(如运行时间限制和 oplog 大小限制),另请参阅生产注意事项。
事务和操作
可以跨多个操作、集合、数据库、文档和分片使用分布式事务。
对于事务:
可以在事务中创建集合和索引。有关详细信息,请参阅在事务中创建集合和索引
事务中使用的集合可以位于不同的数据库中。
注意
您无法在跨分片写事务中创建新集合。例如,如果您在一个分片中写入一个现有集合,并在另一个分片中隐式创建一个集合,MongoDB 将无法在同一事务中执行这两个操作。
不能写入固定大小集合。
从固定大小集合读取时不能使用读关注
"snapshot"
。(从 MongoDB 5.0 开始)不能在
config
、admin
或local
数据库中读取/写入集合。不能写入
system.*
集合。不能使用
explain
或类似命令返回受支持操作的查询计划。
您不能将
killCursors
指定为事务中的第一个操作。
有关事务中不支持的操作列表,请参阅限制操作。
提示
在启动事务之前创建或删除集合时,如果在事务内部访问该集合,请发出带有写关注 "majority"
的创建或删除操作,以确保事务可以获取所需的锁。
在事务中创建集合和索引
如果事务不是跨分片写入事务,则可以在分布式事务中执行以下操作:
创建集合。
在先前同一事务中创建的新空集合上创建索引。
在事务中创建集合时:
您可以隐式创建一个集合,例如:
对不存在的集合进行插入操作,或
对不存在的集合使用
upsert: true
进行 update/findAndModify 操作。
您可以使用
create
命令或其辅助程序db.createCollection()
显式创建集合。
在事务内创建索引 [1] 时,要创建的索引必须位于以下位置之一:
不存在的集合。集合作为操作的一部分创建。
先前在同一事务中创建的新空集合。
[1] | 您还可以对现有索引运行 db.collection.createIndex() 和 db.collection.createIndexes() 以检查其是否存在。这些操作成功返回而不创建索引。 |
限制
您无法在跨分片写事务中创建新集合。例如,如果您在一个分片中写入一个现有集合,并在另一个分片中隐式创建一个集合,MongoDB 将无法在同一事务中执行这两个操作。
当以分片集合为目标时,您无法在事务中使用
$graphLookup
阶段。要在事务内显式创建集合或索引,事务读关注级别必须为
"local"
。要显式创建集合和索引,请使用以下命令和方法:
计数操作
要在事务内执行计数操作,请使用 $count
聚合阶段或 $group
(带有 $sum
表达式)聚合阶段。
MongoDB 驱动程序提供集合级 API countDocuments(filter, options)
作为辅助方法,该方法使用$group
和$sum
表达式来执行计数。
mongosh
提供 db.collection.countDocuments()
辅助方法,该方法使用 $group
和 $sum
表达式进行计数。
去重操作
如要在事务中执行不同的操作:
对于未分片的集合,可以使用
db.collection.distinct()
方法/distinct
命令以及带有$group
阶段的聚合管道。对于分片集合,不能使用
db.collection.distinct()
方法或distinct
命令。要查找分片集合的不同值,请改用带有
$group
阶段的 aggregation pipeline。例如:不使用
db.coll.distinct("x")
,而是使用db.coll.aggregate([ { $group: { _id: null, distinctValues: { $addToSet: "$x" } } }, { $project: { _id: 0 } } ]) 不使用
db.coll.distinct("x", { status: "A" })
,而是使用db.coll.aggregate([ { $match: { status: "A" } }, { $group: { _id: null, distinctValues: { $addToSet: "$x" } } }, { $project: { _id: 0 } } ])
管道返回一个指向文档的游标:
{ "distinctValues" : [ 2, 3, 1 ] } 迭代游标以访问结果文档。
信息操作
事务中允许使用诸如 hello
、buildInfo
、connectionStatus
(及其辅助方法)之类的信息命令,但它们不能是事务中的第一项操作。
限制性操作
事务中不允许执行以下操作:
在跨分片写事务中创建新集合。例如,如果您在一个分片中写入一个现有集合,并在另一个分片中隐式创建一个集合,那么 MongoDB 将无法在同一事务中执行这两项操作。
使用
"local"
以外的读关注级别时,显式创建集合(例如db.createCollection()
方法)和索引(例如db.collection.createIndexes()
和db.collection.createIndex()
方法)。listCollections
和listIndexes
命令及其辅助方法。其他非 CRUD 和非信息性操作(例如
createUser
、getParameter
和count
)及其辅助程序。
事务和会话
事务与会话关联。
一个会话一次最多可以具有一个未结事务。
使用驱动程序时,事务中的每项操作都必须与会话关联。有关详细信息,请参阅驱动程序特定文档。
如果会话结束并且具有打开的事务,则事务将中止。
读关注/写关注/读取偏好
事务和读取偏好
事务中的操作使用事务级读取偏好。
使用驱动程序,您可以在事务启动时设置事务级读取偏好:
如果未设置事务级别的读取偏好,则事务将使用会话级别的读取偏好。
如果未设置事务级别和会话级别的读取偏好,则事务将使用客户端级别的读取偏好。默认情况下,客户端级别的读取偏好为
primary
。
事务和读关注
事务中的操作使用事务级读关注。也就是说,在集合和数据库级别设置的任何读关注在事务中都会被忽略。
您可以在事务启动时设置事务级别的读关注。
如果未设置事务级别的读关注,则事务级别的读关注默认为会话级别的读关注。
如果未设置事务级读关注和会话级读关注,则事务级读关注默认为客户端级读关注。默认情况下,对于主节点上的读取,客户端级读关注是
"local"
。另请参阅:
事务支持以下读关注级别:
"local"
"majority"
如果事务以写关注“majority”提交,则读关注
"majority"
返回已被多数副本集节点确认且无法回滚的数据。否则,读关注"majority"
不保证读取操作读取多数提交的数据。对于分片集群上的事务,读关注
"majority"
无法保证数据来自跨分片的同一快照视图。如果需要快照隔离,请使用读关注"snapshot"
。
"snapshot"
如果事务使用写关注“majority”提交,则读关注
"snapshot"
会从多数已提交数据的快照中返回数据。如果事务不使用写关注“majority”提交,则
"snapshot"
读关注不保证读操作会使用大多数已提交数据的快照。对于分片集群上的事务,数据的
"snapshot"
视图会在各分片之间同步。
事务和写关注
事务使用事务级写关注来提交写入操作。事务内的写入操作必须在没有明确写关注规范的情况下执行,并须使用默认的写关注。在提交时,使用事务级写关注来提交写入。
提示
请勿为事务中的各个写入操作显式设置写关注。为事务内的各个写入操作设置写关注会返回错误消息。
您可以在事务启动时设置事务级写关注。
如果未设置事务级别的写关注,则事务级别的写关注默认为提交的会话级别写关注。
如果未设置事务级别的写关注和会话级别的的写关注,则事务级别的写关注默认为 的客户端级别的写关注,
w: "majority"
(在 MongoDB 5.0 及更高版本中),包含仲裁节点的部署有所不同。请参阅隐式默认写关注。
事务支持所有写关注 w 值,包括:
w: 1
写关注
w: 1
会在提交应用于主节点后返回确认信息。重要
使用
w: 1
提交时,如果发生故障转移,则可以回滚事务。使用
w: 1
写入关注提交时,事务级"majority"
读关注无法保证事务中的读操作会读取大多数已提交数据。使用
w: 1
写关注提交时,事务级"snapshot"
读关注无法保证事务中的读操作会使用大多数已提交数据的快照。
w: "majority"
在将提交应用于大多数投票节点后,写关注
w: "majority"
会返回确认消息。使用
w: "majority"
写关注提交时,事务级"majority"
读关注可以保证操作已读取大多数已提交数据。对于分片集群上的事务,大多数已提交数据的视图不会在各分片之间同步。使用
w: "majority"
写关注提交时,事务级"snapshot"
读关注可以保证操作已从大多数已提交数据的同步快照中读取。
注意
无论为事务指定了何种写关注,分片集群事务的提交操作始终包括一些使用 {w:
"majority", j: true}
写关注的部分。
服务器参数 coordinateCommitReturnImmediatelyAfterPersistingDecision
可控制何时将事务提交决策返回给客户端。
该参数是在 MongDB 5.0 中引入的,默认值为 true
。。在 MongoDB 6.1 中,该默认值更改为 false
。
当 coordinateCommitReturnImmediatelyAfterPersistingDecision
为 false
时,分片事务协调器会等待所有成员确认多文档事务提交,然后再将提交决策返回给客户端。
如果为多文档事务指定 "majority"
写关注,并且该事务未能复制到计算出的多数副本集成员,则该事务可能不会立即在副本集成员上回滚。副本集最终将保持一致。事务始终在所有副本集节点上应用或回滚。
无论为事务指定了何种写关注 ,驱动程序在重试 commitTransaction
时都会应用 w: "majority"
作为写关注。
基本信息
以下各节将介绍有关事务的更多注意事项。
生产环境注意事项
有关生产环境中的事务,请参阅生产环境注意事项。此外,有关分片集群,请参阅生产环境注意事项(分片集群)。
仲裁节点
如果副本集具有仲裁节点,则无法使用事务更改分片键。仲裁节点无法参与多分片事务所需的数据操作。
如果任何事务操作读取或写入包含仲裁节点的分片,则写入操作跨越多个分片的事务将出现错误并中止。
分片配置限制
您无法在具有将 writeConcernMajorityJournalDefault
设置为 false
的分片(例如具有使用内存中存储引擎的投票节点的分片)的分片集群上运行事务。
注意
无论为事务指定了何种写关注,分片集群事务的提交操作始终包括一些使用 {w:
"majority", j: true}
写关注的部分。
诊断
要获取事务状态和指标,请使用以下方法:
源 | 返回: |
---|---|
db.serverStatus() methodserverStatus command | |
$currentOp 聚合管道 | 返回:
|
db.currentOp() methodcurrentOp command | 返回:
|
在 TXN 日志组件中包含有关慢速事务(即超过 operationProfiling.slowOpThresholdMs 阈值的事务)的信息。 |
特征兼容性版本 (FCV)
要使用事务,所有部署节点的 featureCompatibilityVersion 必须至少为:
部署 | 最低 featureCompatibilityVersion |
---|---|
副本集(Replica Set) | 4.0 |
分片集群 | 4.2 |
要检查成员的 FCV,请连接到该成员并运行以下命令:
db.adminCommand( { getParameter: 1, featureCompatibilityVersion: 1 } )
更多信息,请参阅 setFeatureCompatibilityVersion
参考页。
存储引擎
副本集和分片集群支持的分布式事务,其中:
主节点使用 WiredTiger 存储引擎,而
从节点使用 WiredTiger 存储引擎或内存存储引擎。
注意
您无法在具有将 writeConcernMajorityJournalDefault
设置为 false
的分片(例如具有使用内存中存储引擎的投票成员的分片)的分片集群上运行事务。
限制关键部分等待时间
从 MongoDB 5.2(和 5.0.4)开始:
要限制分片在事务中等待关键部分的时间,使用
metadataRefreshInTransactionMaxWaitBehindCritSecMS
参数。