Callback API 与 Core API
- 启动事务,执行指定操作并提交(或在出错时中止)。 
- 自动包含 - "TransientTransactionError"和- "UnknownTransactionCommitResult"的错误处理逻辑。
- 需要显式调用以启动并提交事务。 
- 不包含对 - "TransientTransactionError"和- "UnknownTransactionCommitResult"的错误处理逻辑,而是提供对这些错误进行自定义错误处理的灵活性。
回调 API
回调 API 包含以下逻辑:
- 如果提交操作遇到 - "UnknownTransactionCommitResult"错误,则重试提交操作。
从MongoDB 5.0.16开始,服务器在收到TransactionTooLargeForCache错误时不会重试ACID 事务。
例子
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- ACID 事务中的操作使用事务级读关注(read concern)、事务级写关注(write concern)和事务级读取偏好(read preference)。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
static bool with_transaction_example(bson_error_t *error) {    mongoc_client_t *client = NULL;    mongoc_write_concern_t *wc = NULL;    mongoc_collection_t *coll = NULL;    bool success = false;    bool ret = false;    bson_t *doc = NULL;    bson_t *insert_opts = NULL;    mongoc_client_session_t *session = NULL;    mongoc_transaction_opt_t *txn_opts = NULL;    /* For a replica set, include the replica set name and a seedlist of the     * members in the URI string; e.g.     * uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \     *    "27017/?replicaSet=myRepl";     * client = mongoc_client_new (uri_repl);     * For a sharded cluster, connect to the mongos instances; e.g.     * uri_sharded =     * "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";     * client = mongoc_client_new (uri_sharded);     */    client = get_client();    /* Prereq: Create collections. Note Atlas connection strings include a majority write     * concern by default.     */    wc = mongoc_write_concern_new();    mongoc_write_concern_set_wmajority(wc, 1000);    insert_opts = bson_new();    mongoc_write_concern_append(wc, insert_opts);    coll = mongoc_client_get_collection(client, "mydb1", "foo");    doc = BCON_NEW("abc", BCON_INT32(0));    ret = mongoc_collection_insert_one(coll, doc, insert_opts, NULL /* reply */, error);    if (!ret) {       goto fail;    }    bson_destroy(doc);    mongoc_collection_destroy(coll);    coll = mongoc_client_get_collection(client, "mydb2", "bar");    doc = BCON_NEW("xyz", BCON_INT32(0));    ret = mongoc_collection_insert_one(coll, doc, insert_opts, NULL /* reply */, error);    if (!ret) {       goto fail;    }    /* Step 1: Start a client session. */    session = mongoc_client_start_session(client, NULL /* opts */, error);    if (!session) {       goto fail;    }    /* Step 2: Optional. Define options to use for the transaction. */    txn_opts = mongoc_transaction_opts_new();    mongoc_transaction_opts_set_write_concern(txn_opts, wc);    /* Step 3: Use mongoc_client_session_with_transaction to start a transaction,     * execute the callback, and commit (or abort on error). */    ret = mongoc_client_session_with_transaction(session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);    if (!ret) {       goto fail;    }    success = true; fail:    bson_destroy(doc);    mongoc_collection_destroy(coll);    bson_destroy(insert_opts);    mongoc_write_concern_destroy(wc);    mongoc_transaction_opts_destroy(txn_opts);    mongoc_client_session_destroy(session);    mongoc_client_destroy(client);    return success; } /* Define the callback that specifies the sequence of operations to perform  * inside the transactions. */ static bool callback(mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error) {    mongoc_client_t *client = NULL;    mongoc_collection_t *coll = NULL;    bson_t *doc = NULL;    bool success = false;    bool ret = false;    BSON_UNUSED(ctx);    client = mongoc_client_session_get_client(session);    coll = mongoc_client_get_collection(client, "mydb1", "foo");    doc = BCON_NEW("abc", BCON_INT32(1));    ret = mongoc_collection_insert_one(coll, doc, NULL /* opts */, *reply, error);    if (!ret) {       goto fail;    }    bson_destroy(doc);    mongoc_collection_destroy(coll);    coll = mongoc_client_get_collection(client, "mydb2", "bar");    doc = BCON_NEW("xyz", BCON_INT32(999));    ret = mongoc_collection_insert_one(coll, doc, NULL /* opts */, *reply, error);    if (!ret) {       goto fail;    }    success = true; fail:    mongoc_collection_destroy(coll);    bson_destroy(doc);    return success; } 
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
// The mongocxx::instance constructor and destructor initialize and shut down the driver, // respectively. Therefore, a mongocxx::instance must be created before using the driver and // must remain alive for as long as the driver is in use. mongocxx::instance inst{}; // For a replica set, include the replica set name and a seedlist of the members in the URI // string; e.g. // uriString = // 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' // For a sharded cluster, connect to the mongos instances; e.g. // uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' mongocxx::client client{mongocxx::uri{}}; // Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be // needed. The suggested Atlas connection string includes majority write concern by default. write_concern wc_majority{}; wc_majority.acknowledge_level(write_concern::level::k_majority); // Prereq: Create collections. auto foo = client["mydb1"]["foo"]; auto bar = client["mydb2"]["bar"]; try {     options::insert opts;     opts.write_concern(wc_majority);     foo.insert_one(make_document(kvp("abc", 0)), opts);     bar.insert_one(make_document(kvp("xyz", 0)), opts); } catch (mongocxx::exception const& e) {     std::cout << "An exception occurred while inserting: " << e.what() << std::endl;     return EXIT_FAILURE; } // Step 1: Define the callback that specifies the sequence of operations to perform inside the // transactions. client_session::with_transaction_cb callback = [&](client_session* session) {     // Important::  You must pass the session to the operations.     foo.insert_one(*session, make_document(kvp("abc", 1)));     bar.insert_one(*session, make_document(kvp("xyz", 999))); }; // Step 2: Start a client session auto session = client.start_session(); // Step 3: Use with_transaction to start a transaction, execute the callback, // and commit (or abort on error). try {     options::transaction opts;     opts.write_concern(wc_majority);     session.with_transaction(callback, opts); } catch (mongocxx::exception const& e) {     std::cout << "An exception occurred: " << e.what() << std::endl;     return EXIT_FAILURE; } return EXIT_SUCCESS; 
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. // string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"; // For a sharded cluster, connect to the mongos instances; e.g. // string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"; var client = new MongoClient(connectionString); // Prereq: Create collections. var database1 = client.GetDatabase("mydb1"); var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority); collection1.InsertOne(new BsonDocument("abc", 0)); var database2 = client.GetDatabase("mydb2"); var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority); collection2.InsertOne(new BsonDocument("xyz", 0)); // Step 1: Start a client session. using (var session = client.StartSession()) {     // Step 2: Optional. Define options to use for the transaction.     var transactionOptions = new TransactionOptions(         writeConcern: WriteConcern.WMajority);     // Step 3: Define the sequence of operations to perform inside the transactions     var cancellationToken = CancellationToken.None; // normally a real token would be used     result = session.WithTransaction(         (s, ct) =>         {             try             {                 collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);                 collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);             }             catch (MongoWriteException)             {                 // Do something in response to the exception                 throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.             }             return "Inserted into collections in different databases";         },         transactionOptions,         cancellationToken); } 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
// WithTransactionExample is an example of using the Session.WithTransaction function. func WithTransactionExample(ctx context.Context) error { 	// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. 	// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl" 	// For a sharded cluster, connect to the mongos instances; e.g. 	// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/" 	uri := mtest.ClusterURI() 	clientOpts := options.Client().ApplyURI(uri) 	client, err := mongo.Connect(clientOpts) 	if err != nil { 		return err 	} 	defer func() { _ = client.Disconnect(ctx) }() 	// Prereq: Create collections. 	wcMajority := writeconcern.Majority() 	wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority) 	fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts) 	barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts) 	// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction. 	callback := func(sesctx context.Context) (any, error) { 		// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the 		// transaction. 		if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil { 			return nil, err 		} 		if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil { 			return nil, err 		} 		return nil, nil 	} 	// Step 2: Start a session and run the callback using WithTransaction. 	session, err := client.StartSession() 	if err != nil { 		return err 	} 	defer session.EndSession(ctx) 	result, err := session.WithTransaction(ctx, callback) 	if err != nil { 		return err 	} 	log.Printf("result: %v\n", result) 	return nil } 
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
/* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl"; For a sharded cluster, connect to the mongos instances; e.g. String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin"; */ final MongoClient client = MongoClients.create(uri); /*    Create collections. */ client.getDatabase("mydb1").getCollection("foo")       .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0)); client.getDatabase("mydb2").getCollection("bar")       .withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0)); /* Step 1: Start a client session. */ final ClientSession clientSession = client.startSession(); /* Step 2: Optional. Define options to use for the transaction. */ TransactionOptions txnOptions = TransactionOptions.builder()       .readPreference(ReadPreference.primary())       .readConcern(ReadConcern.LOCAL)       .writeConcern(WriteConcern.MAJORITY)       .build(); /* Step 3: Define the sequence of operations to perform inside the transactions. */ TransactionBody txnBody = new TransactionBody<String>() {    public String execute() {       MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");       MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");       /*          Important:: You must pass the session to the operations..          */       coll1.insertOne(clientSession, new Document("abc", 1));       coll2.insertOne(clientSession, new Document("xyz", 999));       return "Inserted into collections in different databases";    } }; try {    /*       Step 4: Use .withTransaction() to start a transaction,       execute the callback, and commit (or abort on error).    */    clientSession.withTransaction(txnBody, txnOptions); } catch (RuntimeException e) {    // some error handling } finally {    clientSession.close(); } 
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = AsyncIOMotorClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. async def callback(my_session):     collection_one = my_session.client.mydb1.foo     collection_two = my_session.client.mydb2.bar     # Important:: You must pass the session to the operations.     await collection_one.insert_one({"abc": 1}, session=my_session)     await collection_two.insert_one({"xyz": 999}, session=my_session) # Step 2: Start a client session. async with await client.start_session() as session:     # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).     await session.with_transaction(         callback,         read_concern=ReadConcern("local"),         write_concern=wc_majority,         read_preference=ReadPreference.PRIMARY,     ) 
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
  // For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.   // const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'   // For a sharded cluster, connect to the mongos instances; e.g.   // const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'   const client = new MongoClient(uri);   await client.connect();   // Prereq: Create collections.   await client     .db('mydb1')     .collection('foo')     .insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });   await client     .db('mydb2')     .collection('bar')     .insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });   // Step 1: Start a Client Session   const session = client.startSession();   // Step 2: Optional. Define options to use for the transaction   const transactionOptions = {     readPreference: 'primary',     readConcern: { level: 'local' },     writeConcern: { w: 'majority' }   };   // Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)   // Note: The callback for withTransaction MUST be async and/or return a Promise.   try {     await session.withTransaction(async () => {       const coll1 = client.db('mydb1').collection('foo');       const coll2 = client.db('mydb2').collection('bar');       // Important:: You must pass the session to the operations       await coll1.insertOne({ abc: 1 }, { session });       await coll2.insertOne({ xyz: 999 }, { session });     }, transactionOptions);   } finally {     await session.endSession();     await client.close();   } 
注意
对于Perl驾驶员,请参阅Core API使用示例。
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
/*  * For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.  * uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'  * For a sharded cluster, connect to the mongos instances; e.g.  * uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'  */ $client = new \MongoDB\Client($uriString); // Prerequisite: Create collections. $client->selectCollection(     'mydb1',     'foo',     [         'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),     ], )->insertOne(['abc' => 0]); $client->selectCollection(     'mydb2',     'bar',     [         'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),     ], )->insertOne(['xyz' => 0]); // Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. $callback = function (\MongoDB\Driver\Session $session) use ($client): void {     $client         ->selectCollection('mydb1', 'foo')         ->insertOne(['abc' => 1], ['session' => $session]);     $client         ->selectCollection('mydb2', 'bar')         ->insertOne(['xyz' => 999], ['session' => $session]); }; // Step 2: Start a client session. $session = $client->startSession(); // Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). \MongoDB\with_transaction($session, $callback); 
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = MongoClient(uriString) wc_majority = WriteConcern("majority", wtimeout=1000) # Prereq: Create collections. client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0}) client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0}) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. def callback(session):     collection_one = session.client.mydb1.foo     collection_two = session.client.mydb2.bar     # Important:: You must pass the session to the operations.     collection_one.insert_one({"abc": 1}, session=session)     collection_two.insert_one({"xyz": 999}, session=session) # Step 2: Start a client session. with client.start_session() as session:     # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).     session.with_transaction(callback) 
重要
- 使用适合您的 MongoDB 版本的 MongoDB 驱动程序。 
- 使用驱动程序时,事务中的每个操作都必须将会话传递给每个操作。 
- 您可以在事务中隐式或显式创建集合。请参阅在事务中创建集合和索引。 
该示例使用新的 Callback API 来处理事务,具体操作是启动事务、执行指定操作并提交(或在出错时中止)。新的 Callback API 包含针对 "TransientTransactionError" 或 "UnknownTransactionCommitResult" 提交错误的重试逻辑。
# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g. # uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl' # For a sharded cluster, connect to the mongos instances; e.g. # uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/' client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000}) # Prereq: Create collections. client.use('mydb1')['foo'].insert_one(abc: 0) client.use('mydb2')['bar'].insert_one(xyz: 0) # Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions. callback = Proc.new do |my_session|   collection_one = client.use('mydb1')['foo']   collection_two = client.use('mydb2')['bar']   # Important: You must pass the session to the operations.   collection_one.insert_one({'abc': 1}, session: my_session)   collection_two.insert_one({'xyz': 999}, session: my_session) end #. Step 2: Start a client session. session = client.start_session # Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error). session.with_transaction(   read_concern: {level: :local},   write_concern: {w: :majority, wtimeout: 1000},   read: {mode: :primary},   &callback) 
注意
对于Scala驾驶员,请参阅Core API使用示例。
Core API
核心事务 API 不包含标记为以下错误的重试逻辑:
- "TransientTransactionError"。如果事务中的操作返回标记为- "TransientTransactionError"的错误,则可以将事务作为一个整体进行重试。- 要处理 - "TransientTransactionError",应用程序应显式包含该错误的重试逻辑。
- "UnknownTransactionCommitResult"。如果提交返回标记为- "UnknownTransactionCommitResult"的错误,则可以重试提交。- 要处理 - "UnknownTransactionCommitResult",应用程序应显式包含该错误的重试逻辑。
例子
➤ 使用右上角的选择语言下拉菜单来设置本页面上示例的语言。
以下示例包含在出现暂时性错误时重试事务的逻辑,以及在出现未知提交错误时重试提交的逻辑:
/* takes a session, an out-param for server reply, and out-param for error. */ typedef bool (*txn_func_t)(mongoc_client_session_t *, bson_t *, bson_error_t *); /* runs transactions with retry logic */ bool run_transaction_with_retry(txn_func_t txn_func, mongoc_client_session_t *cs, bson_error_t *error) {    bson_t reply;    bool r;    while (true) {       /* perform transaction */       r = txn_func(cs, &reply, error);       if (r) {          /* success */          bson_destroy(&reply);          return true;       }       MONGOC_WARNING("Transaction aborted: %s", error->message);       if (mongoc_error_has_label(&reply, "TransientTransactionError")) {          /* on transient error, retry the whole transaction */          MONGOC_WARNING("TransientTransactionError, retrying transaction...");          bson_destroy(&reply);       } else {          /* non-transient error */          break;       }    }    bson_destroy(&reply);    return false; } /* commit transactions with retry logic */ bool commit_with_retry(mongoc_client_session_t *cs, bson_error_t *error) {    bson_t reply;    bool r;    while (true) {       /* commit uses write concern set at transaction start, see        * mongoc_transaction_opts_set_write_concern */       r = mongoc_client_session_commit_transaction(cs, &reply, error);       if (r) {          MONGOC_DEBUG("Transaction committed");          break;       }       if (mongoc_error_has_label(&reply, "UnknownTransactionCommitResult")) {          MONGOC_WARNING("UnknownTransactionCommitResult, retrying commit ...");          bson_destroy(&reply);       } else {          /* commit failed, cannot retry */          break;       }    }    bson_destroy(&reply);    return r; } /* updates two collections in a transaction and calls commit_with_retry */ bool update_employee_info(mongoc_client_session_t *cs, bson_t *reply, bson_error_t *error) {    mongoc_client_t *client;    mongoc_collection_t *employees;    mongoc_collection_t *events;    mongoc_read_concern_t *rc;    mongoc_write_concern_t *wc;    mongoc_transaction_opt_t *txn_opts;    bson_t opts = BSON_INITIALIZER;    bson_t *filter = NULL;    bson_t *update = NULL;    bson_t *event = NULL;    bool r;    bson_init(reply);    client = mongoc_client_session_get_client(cs);    employees = mongoc_client_get_collection(client, "hr", "employees");    events = mongoc_client_get_collection(client, "reporting", "events");    rc = mongoc_read_concern_new();    mongoc_read_concern_set_level(rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);    wc = mongoc_write_concern_new();    mongoc_write_concern_set_w(       wc, MONGOC_WRITE_CONCERN_W_MAJORITY); /* Atlas connection strings include majority by default*/    txn_opts = mongoc_transaction_opts_new();    mongoc_transaction_opts_set_read_concern(txn_opts, rc);    mongoc_transaction_opts_set_write_concern(txn_opts, wc);    r = mongoc_client_session_start_transaction(cs, txn_opts, error);    if (!r) {       goto done;    }    r = mongoc_client_session_append(cs, &opts, error);    if (!r) {       goto done;    }    filter = BCON_NEW("employee", BCON_INT32(3));    update = BCON_NEW("$set", "{", "status", "Inactive", "}");    /* mongoc_collection_update_one will reinitialize reply */    bson_destroy(reply);    r = mongoc_collection_update_one(employees, filter, update, &opts, reply, error);    if (!r) {       goto abort;    }    event = BCON_NEW("employee", BCON_INT32(3));    BCON_APPEND(event, "status", "{", "new", "Inactive", "old", "Active", "}");    bson_destroy(reply);    r = mongoc_collection_insert_one(events, event, &opts, reply, error);    if (!r) {       goto abort;    }    r = commit_with_retry(cs, error); abort:    if (!r) {       MONGOC_ERROR("Aborting due to error in transaction: %s", error->message);       mongoc_client_session_abort_transaction(cs, NULL);    } done:    mongoc_collection_destroy(employees);    mongoc_collection_destroy(events);    mongoc_read_concern_destroy(rc);    mongoc_write_concern_destroy(wc);    mongoc_transaction_opts_destroy(txn_opts);    bson_destroy(&opts);    bson_destroy(filter);    bson_destroy(update);    bson_destroy(event);    return r; } void example_func(mongoc_client_t *client) {    mongoc_client_session_t *cs;    bson_error_t error;    bool r;    ASSERT(client);    cs = mongoc_client_start_session(client, NULL, &error);    if (!cs) {       MONGOC_ERROR("Could not start session: %s", error.message);       return;    }    r = run_transaction_with_retry(update_employee_info, cs, &error);    if (!r) {       MONGOC_ERROR("Could not update employee, permanent error: %s", error.message);    }    mongoc_client_session_destroy(cs); } 
g transaction_func = std::function<void(client_session & session)>;  run_transaction_with_retry = [](transaction_func txn_func, client_session& session) { while (true) {     try {         txn_func(session); // performs transaction.         break;     } catch (operation_exception const& oe) {         std::cout << "Transaction aborted. Caught exception during transaction." << std::endl;         // If transient error, retry the whole transaction.         if (oe.has_error_label("TransientTransactionError")) {             std::cout << "TransientTransactionError, retrying transaction ..." << std::endl;             continue;         } else {             throw oe;         }     } }  commit_with_retry = [](client_session& session) { while (true) {     try {         session.commit_transaction(); // Uses write concern set at transaction start.         std::cout << "Transaction committed." << std::endl;         break;     } catch (operation_exception const& oe) {         // Can retry commit         if (oe.has_error_label("UnknownTransactionCommitResult")) {             std::cout << "UnknownTransactionCommitResult, retrying commit operation ..." << std::endl;             continue;         } else {             std::cout << "Error during commit ..." << std::endl;             throw oe;         }     } } pdates two collections in a transaction  update_employee_info = [&](client_session& session) { auto& client = session.client(); auto employees = client["hr"]["employees"]; auto events = client["reporting"]["events"]; options::transaction txn_opts; read_concern rc; rc.acknowledge_level(read_concern::level::k_snapshot); txn_opts.read_concern(rc); write_concern wc; wc.acknowledge_level(write_concern::level::k_majority); txn_opts.write_concern(wc); session.start_transaction(txn_opts); try {     employees.update_one(         make_document(kvp("employee", 3)),         make_document(kvp("$set", make_document(kvp("status", "Inactive")))));     events.insert_one(make_document(         kvp("employee", 3), kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active"))))); } catch (operation_exception const& oe) {     std::cout << "Caught exception during transaction, aborting." << std::endl;     session.abort_transaction();     throw oe; } commit_with_retry(session);  session = client.start_session(); { run_transaction_with_retry(update_employee_info, session); tch (operation_exception const& oe) { // Do something with error. throw oe; 
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session) {     while (true)     {         try         {             txnFunc(client, session); // performs transaction             break;         }         catch (MongoException exception)         {             // if transient error, retry the whole transaction             if (exception.HasErrorLabel("TransientTransactionError"))             {                 Console.WriteLine("TransientTransactionError, retrying transaction.");                 continue;             }             else             {                 throw;             }         }     } } public void CommitWithRetry(IClientSessionHandle session) {     while (true)     {         try         {             session.CommitTransaction();             Console.WriteLine("Transaction committed.");             break;         }         catch (MongoException exception)         {             // can retry commit             if (exception.HasErrorLabel("UnknownTransactionCommitResult"))             {                 Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation");                 continue;             }             else             {                 Console.WriteLine($"Error during commit: {exception.Message}.");                 throw;             }         }     } } // updates two collections in a transaction public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session) {     var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees");     var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events");     session.StartTransaction(new TransactionOptions(         readConcern: ReadConcern.Snapshot,         writeConcern: WriteConcern.WMajority));     try     {         employeesCollection.UpdateOne(             session,             Builders<BsonDocument>.Filter.Eq("employee", 3),             Builders<BsonDocument>.Update.Set("status", "Inactive"));         eventsCollection.InsertOne(             session,             new BsonDocument             {                 { "employee", 3 },                 { "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } }             });     }     catch (Exception exception)     {         Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}.");         session.AbortTransaction();         throw;     }     CommitWithRetry(session); } public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client) {     // start a session     using (var session = client.StartSession())     {         try         {             RunTransactionWithRetry(UpdateEmployeeInfo, client, session);         }         catch (Exception exception)         {             // do something with error             Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}.");         }     } } 
	runTransactionWithRetry := func(ctx context.Context, txnFn func(context.Context) error) error { 		for { 			err := txnFn(ctx) // Performs transaction. 			if err == nil { 				return nil 			} 			log.Println("Transaction aborted. Caught exception during transaction.") 			// If transient error, retry the whole transaction 			if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") { 				log.Println("TransientTransactionError, retrying transaction...") 				continue 			} 			return err 		} 	} 	commitWithRetry := func(ctx context.Context) error { 		sess := mongo.SessionFromContext(ctx) 		for { 			err := sess.CommitTransaction(ctx) 			switch e := err.(type) { 			case nil: 				log.Println("Transaction committed.") 				return nil 			case mongo.CommandError: 				// Can retry commit 				if e.HasErrorLabel("UnknownTransactionCommitResult") { 					log.Println("UnknownTransactionCommitResult, retrying commit operation...") 					continue 				} 				log.Println("Error during commit...") 				return e 			default: 				log.Println("Error during commit...") 				return e 			} 		} 	} 	// Updates two collections in a transaction. 	updateEmployeeInfo := func(ctx context.Context) error { 		employees := client.Database("hr").Collection("employees") 		events := client.Database("reporting").Collection("events") 		sess := mongo.SessionFromContext(ctx) 		err := sess.StartTransaction(options.Transaction(). 			SetReadConcern(readconcern.Snapshot()). 			SetWriteConcern(writeconcern.Majority()), 		) 		if err != nil { 			return err 		} 		_, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}}) 		if err != nil { 			sess.AbortTransaction(ctx) 			log.Println("caught exception during transaction, aborting.") 			return err 		} 		_, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}}) 		if err != nil { 			sess.AbortTransaction(ctx) 			log.Println("caught exception during transaction, aborting.") 			return err 		} 		return commitWithRetry(ctx) 	} 	txnOpts := options.Transaction().SetReadPreference(readpref.Primary()) 	return client.UseSessionWithOptions( 		ctx, options.Session().SetDefaultTransactionOptions(txnOpts), 		func(ctx context.Context) error { 			return runTransactionWithRetry(ctx, updateEmployeeInfo) 		}, 	) } 
重要
要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。
void runTransactionWithRetry(Runnable transactional) {     while (true) {         try {             transactional.run();             break;         } catch (MongoException e) {             System.out.println("Transaction aborted. Caught exception during transaction.");             if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {                 System.out.println("TransientTransactionError, aborting transaction and retrying ...");                 continue;             } else {                 throw e;             }         }     } } void commitWithRetry(ClientSession clientSession) {     while (true) {         try {             clientSession.commitTransaction();             System.out.println("Transaction committed");             break;         } catch (MongoException e) {             // can retry commit             if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {                 System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");                 continue;             } else {                 System.out.println("Exception during commit ...");                 throw e;             }         }     } } void updateEmployeeInfo() {     MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees");     MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events");     TransactionOptions txnOptions = TransactionOptions.builder()             .readPreference(ReadPreference.primary())             .readConcern(ReadConcern.MAJORITY)             .writeConcern(WriteConcern.MAJORITY)             .build();     try (ClientSession clientSession = client.startSession()) {         clientSession.startTransaction(txnOptions);         employeesCollection.updateOne(clientSession,                 Filters.eq("employee", 3),                 Updates.set("status", "Inactive"));         eventsCollection.insertOne(clientSession,                 new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active")));         commitWithRetry(clientSession);     } } void updateEmployeeInfoWithRetry() {     runTransactionWithRetry(this::updateEmployeeInfo); } 
注意
对于Motor ,请参阅Callback API 。
重要
要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。
async function commitWithRetry(session) {   try {     await session.commitTransaction();     console.log('Transaction committed.');   } catch (error) {     if (error.hasErrorLabel('UnknownTransactionCommitResult')) {       console.log('UnknownTransactionCommitResult, retrying commit operation ...');       await commitWithRetry(session);     } else {       console.log('Error during commit ...');       throw error;     }   } } async function runTransactionWithRetry(txnFunc, client, session) {   try {     await txnFunc(client, session);   } catch (error) {     console.log('Transaction aborted. Caught exception during transaction.');     // If transient error, retry the whole transaction     if (error.hasErrorLabel('TransientTransactionError')) {       console.log('TransientTransactionError, retrying transaction ...');       await runTransactionWithRetry(txnFunc, client, session);     } else {       throw error;     }   } } async function updateEmployeeInfo(client, session) {   session.startTransaction({     readConcern: { level: 'snapshot' },     writeConcern: { w: 'majority' },     readPreference: 'primary'   });   const employeesCollection = client.db('hr').collection('employees');   const eventsCollection = client.db('reporting').collection('events');   await employeesCollection.updateOne(     { employee: 3 },     { $set: { status: 'Inactive' } },     { session }   );   await eventsCollection.insertOne(     {       employee: 3,       status: { new: 'Inactive', old: 'Active' }     },     { session }   );   try {     await commitWithRetry(session);   } catch (error) {     await session.abortTransaction();     throw error;   } } return client.withSession(session =>   runTransactionWithRetry(updateEmployeeInfo, client, session) ); 
重要
要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。
sub runTransactionWithRetry {     my ( $txnFunc, $session ) = @_;     LOOP: {         eval {             $txnFunc->($session); # performs transaction         };         if ( my $error = $@ ) {             print("Transaction aborted-> Caught exception during transaction.\n");             # If transient error, retry the whole transaction             if ( $error->has_error_label("TransientTransactionError") ) {                 print("TransientTransactionError, retrying transaction ->..\n");                 redo LOOP;             }             else {                 die $error;             }         }     }     return; } sub commitWithRetry {     my ($session) = @_;     LOOP: {         eval {             $session->commit_transaction(); # Uses write concern set at transaction start.             print("Transaction committed->\n");         };         if ( my $error = $@ ) {             # Can retry commit             if ( $error->has_error_label("UnknownTransactionCommitResult") ) {                 print("UnknownTransactionCommitResult, retrying commit operation ->..\n");                 redo LOOP;             }             else {                 print("Error during commit ->..\n");                 die $error;             }         }     }     return; } # Updates two collections in a transactions sub updateEmployeeInfo {     my ($session) = @_;     my $employeesCollection = $session->client->ns("hr.employees");     my $eventsCollection    = $session->client->ns("reporting.events");     $session->start_transaction(         {             readConcern  => { level => "snapshot" },             writeConcern => { w     => "majority" },             readPreference => 'primary',         }     );     eval {         $employeesCollection->update_one(             { employee => 3 }, { '$set' => { status => "Inactive" } },             { session => $session},         );         $eventsCollection->insert_one(             { employee => 3, status => { new => "Inactive", old => "Active" } },             { session => $session},         );     };     if ( my $error = $@ ) {         print("Caught exception during transaction, aborting->\n");         $session->abort_transaction();         die $error;     }     commitWithRetry($session); } # Start a session my $session = $client->start_session(); eval {     runTransactionWithRetry(\&updateEmployeeInfo, $session); }; if ( my $error = $@ ) {     # Do something with error } $session->end_session(); 
重要
要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。
private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session): void {     while (true) {         try {             $txnFunc($client, $session);  // performs transaction             break;         } catch (\MongoDB\Driver\Exception\CommandException $error) {             $resultDoc = $error->getResultDocument();             // If transient error, retry the whole transaction             if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {                 continue;             } else {                 throw $error;             }         } catch (\MongoDB\Driver\Exception\Exception $error) {             throw $error;         }     } } private function commitWithRetry3(\MongoDB\Driver\Session $session): void {     while (true) {         try {             $session->commitTransaction();             echo "Transaction committed.\n";             break;         } catch (\MongoDB\Driver\Exception\CommandException $error) {             $resultDoc = $error->getResultDocument();             if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {                 echo "UnknownTransactionCommitResult, retrying commit operation ...\n";                 continue;             } else {                 echo "Error during commit ...\n";                 throw $error;             }         } catch (\MongoDB\Driver\Exception\Exception $error) {             echo "Error during commit ...\n";             throw $error;         }     } } private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session): void {     $session->startTransaction([         'readConcern' => new \MongoDB\Driver\ReadConcern('snapshot'),         'readPrefernece' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::PRIMARY),         'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY),     ]);     try {         $client->hr->employees->updateOne(             ['employee' => 3],             ['$set' => ['status' => 'Inactive']],             ['session' => $session],         );         $client->reporting->events->insertOne(             ['employee' => 3, 'status' => ['new' => 'Inactive', 'old' => 'Active']],             ['session' => $session],         );     } catch (\MongoDB\Driver\Exception\Exception $error) {         echo "Caught exception during transaction, aborting.\n";         $session->abortTransaction();         throw $error;     }     $this->commitWithRetry3($session); } private function doUpdateEmployeeInfo(\MongoDB\Client $client): void {     // Start a session.     $session = $client->startSession();     try {         $this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session);     } catch (\MongoDB\Driver\Exception\Exception) {         // Do something with error     } } 
重要
要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。
def run_transaction_with_retry(txn_func, session):     while True:         try:             txn_func(session)  # performs transaction             break         except (ConnectionFailure, OperationFailure) as exc:             # If transient error, retry the whole transaction             if exc.has_error_label("TransientTransactionError"):                 print("TransientTransactionError, retrying transaction ...")                 continue             else:                 raise def commit_with_retry(session):     while True:         try:             # Commit uses write concern set at transaction start.             session.commit_transaction()             print("Transaction committed.")             break         except (ConnectionFailure, OperationFailure) as exc:             # Can retry commit             if exc.has_error_label("UnknownTransactionCommitResult"):                 print("UnknownTransactionCommitResult, retrying commit operation ...")                 continue             else:                 print("Error during commit ...")                 raise # Updates two collections in a transactions def update_employee_info(session):     employees_coll = session.client.hr.employees     events_coll = session.client.reporting.events     with session.start_transaction(         read_concern=ReadConcern("snapshot"),         write_concern=WriteConcern(w="majority"),         read_preference=ReadPreference.PRIMARY,     ):         employees_coll.update_one(             {"employee": 3}, {"$set": {"status": "Inactive"}}, session=session         )         events_coll.insert_one(             {"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session         )         commit_with_retry(session) # Start a session. with client.start_session() as session:     try:         run_transaction_with_retry(update_employee_info, session)     except Exception:         # Do something with error.         raise 
重要
要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。
def run_transaction_with_retry(session)   begin     yield session # performs transaction   rescue Mongo::Error => e     puts 'Transaction aborted. Caught exception during transaction.'     raise unless e.label?('TransientTransactionError')     puts "TransientTransactionError, retrying transaction ..."     retry   end end def commit_with_retry(session)   begin     session.commit_transaction     puts 'Transaction committed.'   rescue Mongo::Error => e     if e.label?('UnknownTransactionCommitResult')       puts "UnknownTransactionCommitResult, retrying commit operation ..."       retry     else       puts 'Error during commit ...'       raise     end   end end # updates two collections in a transaction def update_employee_info(session)   employees_coll = session.client.use(:hr)[:employees]   events_coll = session.client.use(:reporting)[:events]   session.start_transaction(read_concern: { level: :snapshot },                             write_concern: { w: :majority },                             read: {mode: :primary})   employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} },                             session: session)   events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } },                          session: session)   commit_with_retry(session) end session = client.start_session begin   run_transaction_with_retry(session) do     update_employee_info(session)   end rescue StandardError => e   # Do something with error   raise end 
重要
要将读取和写入操作与事务关联,必须将会话传递给事务中的每个操作。
/*  * Copyright 2008-present MongoDB, Inc.  *  * Licensed under the Apache License, Version 2.0 (the "License");  * you may not use this file except in compliance with the License.  * You may obtain a copy of the License at  *  *   http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package org.mongodb.scala import org.mongodb.scala.model.{Filters, Updates} import org.mongodb.scala.result.UpdateResult import scala.concurrent.Await import scala.concurrent.duration.Duration //scalastyle:off magic.number class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {   // Implicit functions that execute the Observable and return the results   val waitDuration = Duration(5, "seconds")   implicit class ObservableExecutor[T](observable: Observable[T]) {     def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)   }   implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {     def execute(): T = Await.result(observable.toFuture(), waitDuration)   }   // end implicit functions   "The Scala driver" should "be able to commit a transaction" in withClient { client =>     assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())     client.getDatabase("hr").drop().execute()     client.getDatabase("hr").createCollection("employees").execute()     client.getDatabase("hr").createCollection("events").execute()     updateEmployeeInfoWithRetry(client).execute() should equal(Completed())     client.getDatabase("hr").drop().execute() should equal(Completed())   }   def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {     observable.map(clientSession => {       val employeesCollection = database.getCollection("employees")       val eventsCollection = database.getCollection("events")       val transactionOptions = TransactionOptions.builder()         .readPreference(ReadPreference.primary())         .readConcern(ReadConcern.SNAPSHOT)         .writeConcern(WriteConcern.MAJORITY)         .build()       clientSession.startTransaction(transactionOptions)       employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))         .subscribe((res: UpdateResult) => println(res))       eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))         .subscribe((res: Completed) => println(res))       clientSession     })   }   def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {     observable.recoverWith({       case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {         println("UnknownTransactionCommitResult, retrying commit operation ...")         commitAndRetry(observable)       }       case e: Exception => {         println(s"Exception during commit ...: $e")         throw e       }     })   }   def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {     observable.recoverWith({       case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {         println("TransientTransactionError, aborting transaction and retrying ...")         runTransactionAndRetry(observable)       }     })   }   def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {     val database = client.getDatabase("hr")     val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())     val commitTransactionObservable: SingleObservable[Completed] =       updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())     val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)     runTransactionAndRetry(commitAndRetryObservable)   } } 
驱动程序版本
事务错误处理
无论是何种数据库系统,无论是 MongoDB 还是关系型数据库,应用程序都应采取相应措施来处理事务提交期间的错误,并包含事务的重试逻辑。
"TransientTransactionError"
事务中的单独写入操作不可重试,无论 retryWrites 的值如何。如果操作遇到与标签相关的 "TransientTransactionError" 错误(例如主节点降级时),可以将事务作为一个整体进行重试。
- 回调 API 包含 - "TransientTransactionError"的重试逻辑。
- 核心事务 API 不包含 - "TransientTransactionError"的重试逻辑。要处理- "TransientTransactionError",应用程序应显式包含错误的重试逻辑。要查看包含瞬时错误重试逻辑的示例,请参阅 Core API 示例。
"UnknownTransactionCommitResult"
提交操作是可重试写入操作。如果提交操作遇到错误,那么无论 retryWrites 的值如何,MongoDB 驱动程序都会重试提交。
如果提交操作遇到标记为 "UnknownTransactionCommitResult" 的错误,则可以重试提交。
- 回调 API 包含 - "UnknownTransactionCommitResult"的重试逻辑。
- 核心事务 API 不包含 - "UnknownTransactionCommitResult"的重试逻辑。要处理- "UnknownTransactionCommitResult",应用程序应显式包含错误的重试逻辑。要查看包含未知提交错误的重试逻辑示例,请参阅 Core API 示例。
TransactionTooLargeForCache
5.0.16版本新增。
从MongoDB 5.0.16开始,服务器在收到TransactionTooLargeForCache错误时不会重试ACID 事务。 此错误表示缓存太小,重试可能会失败。
transactionTooLargeForCacheThreshold 阈值的默认值为 0.75。当事务使用超过 75% 的缓存时,服务器会返回 TransactionTooLargeForCache 而不是重试事务。
在 MongoDB 的早期版本中,服务器会返回 TemporarilyUnavailable 或 WriteConflict,而不是 TransactionTooLargeForCache。
使用 setParameter 命令修改错误阈值。
更多信息
mongosh 例子
以下 mongosh 方法适用于事务:
// Create collections: db.getSiblingDB("mydb1").foo.insertOne(     {abc: 0},     { writeConcern: { w: "majority", wtimeout: 2000 } } ) db.getSiblingDB("mydb2").bar.insertOne(    {xyz: 0},    { writeConcern: { w: "majority", wtimeout: 2000 } } ) // Start a session. session = db.getMongo().startSession( { readPreference: { mode: "primary" } } ); coll1 = session.getDatabase("mydb1").foo; coll2 = session.getDatabase("mydb2").bar; // Start a transaction session.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } ); // Operations inside the transaction try {    coll1.insertOne( { abc: 1 } );    coll2.insertOne( { xyz: 999 } ); } catch (error) {    // Abort transaction on error    session.abortTransaction();    throw error; } // Commit the transaction using write concern set at transaction start session.commitTransaction(); session.endSession();