Docs Menu
Docs Home
/ /

API de drivers

El API de devolución de llamada:

Core API:

  • Requiere llamar explícitamente para iniciar la transacción y la confirmación de la transacción.

  • No incorpora lógica de manejo de errores para TransientTransactionError y UnknownTransactionCommitResult, y en su lugar proporciona la flexibilidad para incorporar un manejo de errores personalizado para estos errores.

La Callback API incorpora lógica:

A partir de MongoDB 6.2, el servidor no vuelve a intentar la transacción si recibe un error TransactionTooLargeForCache.


➤ Usar el menú desplegable Seleccionar lenguaje en la parte superior derecha para establecer el lenguaje de los ejemplos en esta página.


Importante

  • Utilice el controlador de MongoDB para su versión de MongoDB.

  • Al usar drivers, cada operación en la transacción debe pasar la sesión a cada operación.

  • Las operaciones en una transacción utilizan preocupación de lectura a nivel de transacción,preocupación deescritura a nivel de transacción y preferenciade lectura a nivel de transacción.

  • Puedes crear colecciones en transacciones de forma implícita o explícita. Consulta Crear colecciones e índices en una transacción.

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

static bool
with_transaction_example(bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_write_concern_t *wc = NULL;
mongoc_collection_t *coll = NULL;
bool success = false;
bool ret = false;
bson_t *doc = NULL;
bson_t *insert_opts = NULL;
mongoc_client_session_t *session = NULL;
mongoc_transaction_opt_t *txn_opts = NULL;
/* For a replica set, include the replica set name and a seedlist of the
* members in the URI string; e.g.
* uri_repl = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:" \
* "27017/?replicaSet=myRepl";
* client = mongoc_client_new (uri_repl);
* For a sharded cluster, connect to the mongos instances; e.g.
* uri_sharded =
* "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
* client = mongoc_client_new (uri_sharded);
*/
client = get_client();
/* Prereq: Create collections. Note Atlas connection strings include a majority write
* concern by default.
*/
wc = mongoc_write_concern_new();
mongoc_write_concern_set_wmajority(wc, 1000);
insert_opts = bson_new();
mongoc_write_concern_append(wc, insert_opts);
coll = mongoc_client_get_collection(client, "mydb1", "foo");
doc = BCON_NEW("abc", BCON_INT32(0));
ret = mongoc_collection_insert_one(coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
bson_destroy(doc);
mongoc_collection_destroy(coll);
coll = mongoc_client_get_collection(client, "mydb2", "bar");
doc = BCON_NEW("xyz", BCON_INT32(0));
ret = mongoc_collection_insert_one(coll, doc, insert_opts, NULL /* reply */, error);
if (!ret) {
goto fail;
}
/* Step 1: Start a client session. */
session = mongoc_client_start_session(client, NULL /* opts */, error);
if (!session) {
goto fail;
}
/* Step 2: Optional. Define options to use for the transaction. */
txn_opts = mongoc_transaction_opts_new();
mongoc_transaction_opts_set_write_concern(txn_opts, wc);
/* Step 3: Use mongoc_client_session_with_transaction to start a transaction,
* execute the callback, and commit (or abort on error). */
ret = mongoc_client_session_with_transaction(session, callback, txn_opts, NULL /* ctx */, NULL /* reply */, error);
if (!ret) {
goto fail;
}
success = true;
fail:
bson_destroy(doc);
mongoc_collection_destroy(coll);
bson_destroy(insert_opts);
mongoc_write_concern_destroy(wc);
mongoc_transaction_opts_destroy(txn_opts);
mongoc_client_session_destroy(session);
mongoc_client_destroy(client);
return success;
}
/* Define the callback that specifies the sequence of operations to perform
* inside the transactions. */
static bool
callback(mongoc_client_session_t *session, void *ctx, bson_t **reply, bson_error_t *error)
{
mongoc_client_t *client = NULL;
mongoc_collection_t *coll = NULL;
bson_t *doc = NULL;
bson_t *opts = bson_new();
bool success = false;
bool ret = false;
BSON_UNUSED(ctx);
BSON_UNUSED(reply);
// Important:: The logical session ID MUST be applied to all associated operations.
ret = mongoc_client_session_append(session, opts, error);
if (!ret) {
goto fail;
}
client = mongoc_client_session_get_client(session);
coll = mongoc_client_get_collection(client, "mydb1", "foo");
doc = BCON_NEW("abc", BCON_INT32(1));
ret = mongoc_collection_insert_one(coll, doc, opts, NULL, error);
if (!ret) {
goto fail;
}
bson_destroy(doc);
mongoc_collection_destroy(coll);
coll = mongoc_client_get_collection(client, "mydb2", "bar");
doc = BCON_NEW("xyz", BCON_INT32(999));
ret = mongoc_collection_insert_one(coll, doc, opts, NULL, error);
if (!ret) {
goto fail;
}
success = true;
fail:
mongoc_collection_destroy(coll);
bson_destroy(opts);
bson_destroy(doc);
return success;
}

Importante

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

// The mongocxx::instance constructor and destructor initialize and shut down the driver,
// respectively. Therefore, a mongocxx::instance must be created before using the driver and
// must remain alive for as long as the driver is in use.
mongocxx::instance inst{};
// For a replica set, include the replica set name and a seedlist of the members in the URI
// string; e.g.
// uriString =
// 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
mongocxx::client client{mongocxx::uri{}};
// Prepare to set majority write explicitly. Note: on Atlas deployments this won't always be
// needed. The suggested Atlas connection string includes majority write concern by default.
write_concern wc_majority{};
wc_majority.acknowledge_level(write_concern::level::k_majority);
// Prereq: Create collections.
auto foo = client["mydb1"]["foo"];
auto bar = client["mydb2"]["bar"];
try {
options::insert opts;
opts.write_concern(wc_majority);
foo.insert_one(make_document(kvp("abc", 0)), opts);
bar.insert_one(make_document(kvp("xyz", 0)), opts);
} catch (mongocxx::exception const& e) {
std::cout << "An exception occurred while inserting: " << e.what() << std::endl;
return EXIT_FAILURE;
}
// Step 1: Define the callback that specifies the sequence of operations to perform inside the
// transactions.
client_session::with_transaction_cb callback = [&](client_session* session) {
// Important:: You must pass the session to the operations.
foo.insert_one(*session, make_document(kvp("abc", 1)));
bar.insert_one(*session, make_document(kvp("xyz", 999)));
};
// Step 2: Start a client session
auto session = client.start_session();
// Step 3: Use with_transaction to start a transaction, execute the callback,
// and commit (or abort on error).
try {
options::transaction opts;
opts.write_concern(wc_majority);
session.with_transaction(callback, opts);
} catch (mongocxx::exception const& e) {
std::cout << "An exception occurred: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;

Importante

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// string uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl";
// For a sharded cluster, connect to the mongos instances; e.g.
// string uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/";
var client = new MongoClient(connectionString);
// Prereq: Create collections.
var database1 = client.GetDatabase("mydb1");
var collection1 = database1.GetCollection<BsonDocument>("foo").WithWriteConcern(WriteConcern.WMajority);
collection1.InsertOne(new BsonDocument("abc", 0));
var database2 = client.GetDatabase("mydb2");
var collection2 = database2.GetCollection<BsonDocument>("bar").WithWriteConcern(WriteConcern.WMajority);
collection2.InsertOne(new BsonDocument("xyz", 0));
// Step 1: Start a client session.
using (var session = client.StartSession())
{
// Step 2: Optional. Define options to use for the transaction.
var transactionOptions = new TransactionOptions(
writeConcern: WriteConcern.WMajority);
// Step 3: Define the sequence of operations to perform inside the transactions
var cancellationToken = CancellationToken.None; // normally a real token would be used
result = session.WithTransaction(
(s, ct) =>
{
try
{
collection1.InsertOne(s, new BsonDocument("abc", 1), cancellationToken: ct);
collection2.InsertOne(s, new BsonDocument("xyz", 999), cancellationToken: ct);
}
catch (MongoWriteException)
{
// Do something in response to the exception
throw; // NOTE: You must rethrow the exception otherwise an infinite loop can occur.
}
return "Inserted into collections in different databases";
},
transactionOptions,
cancellationToken);
}

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

Importante

// WithTransactionExample is an example of using the Session.WithTransaction function.
func WithTransactionExample(ctx context.Context) error {
// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"
// For a sharded cluster, connect to the mongos instances; e.g.
// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"
uri := mtest.ClusterURI()
clientOpts := options.Client().ApplyURI(uri)
client, err := mongo.Connect(clientOpts)
if err != nil {
return err
}
defer func() { _ = client.Disconnect(ctx) }()
// Prereq: Create collections.
wcMajority := writeconcern.Majority()
wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)
fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)
barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.
callback := func(sesctx context.Context) (any, error) {
// Important: You must pass sesctx as the Context parameter to the operations for them to be executed in the
// transaction.
if _, err := fooColl.InsertOne(sesctx, bson.D{{"abc", 1}}); err != nil {
return nil, err
}
if _, err := barColl.InsertOne(sesctx, bson.D{{"xyz", 999}}); err != nil {
return nil, err
}
return nil, nil
}
// Step 2: Start a session and run the callback using WithTransaction.
session, err := client.StartSession()
if err != nil {
return err
}
defer session.EndSession(ctx)
result, err := session.WithTransaction(ctx, callback)
if err != nil {
return err
}
log.Printf("result: %v\n", result)
return nil
}

Importante

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

/*
For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
String uri = "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/admin?replicaSet=myRepl";
For a sharded cluster, connect to the mongos instances; e.g.
String uri = "mongodb://mongos0.example.com:27017,mongos1.example.com:27017:27017/admin";
*/
final MongoClient client = MongoClients.create(uri);
/*
Create collections.
*/
client.getDatabase("mydb1").getCollection("foo")
.withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("abc", 0));
client.getDatabase("mydb2").getCollection("bar")
.withWriteConcern(WriteConcern.MAJORITY).insertOne( new Document("xyz", 0));
/* Step 1: Start a client session. */
final ClientSession clientSession = client.startSession();
/* Step 2: Optional. Define options to use for the transaction. */
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.LOCAL)
.writeConcern(WriteConcern.MAJORITY)
.build();
/* Step 3: Define the sequence of operations to perform inside the transactions. */
TransactionBody txnBody = new TransactionBody<String>() {
public String execute() {
MongoCollection<Document> coll1 = client.getDatabase("mydb1").getCollection("foo");
MongoCollection<Document> coll2 = client.getDatabase("mydb2").getCollection("bar");
/*
Important:: You must pass the session to the operations..
*/
coll1.insertOne(clientSession, new Document("abc", 1));
coll2.insertOne(clientSession, new Document("xyz", 999));
return "Inserted into collections in different databases";
}
};
try {
/*
Step 4: Use .withTransaction() to start a transaction,
execute the callback, and commit (or abort on error).
*/
clientSession.withTransaction(txnBody, txnOptions);
} catch (RuntimeException e) {
// some error handling
} finally {
clientSession.close();
}

Importante

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = AsyncIOMotorClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
await client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
await client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
async def callback(my_session):
collection_one = my_session.client.mydb1.foo
collection_two = my_session.client.mydb2.bar
# Important:: You must pass the session to the operations.
await collection_one.insert_one({"abc": 1}, session=my_session)
await collection_two.insert_one({"xyz": 999}, session=my_session)
# Step 2: Start a client session.
async with await client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
await session.with_transaction(
callback,
read_concern=ReadConcern("local"),
write_concern=wc_majority,
read_preference=ReadPreference.PRIMARY,
)

Importante

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
// const uri = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
// For a sharded cluster, connect to the mongos instances; e.g.
// const uri = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
const client = new MongoClient(uri);
await client.connect();
// Prereq: Create collections.
await client
.db('mydb1')
.collection('foo')
.insertOne({ abc: 0 }, { writeConcern: { w: 'majority' } });
await client
.db('mydb2')
.collection('bar')
.insertOne({ xyz: 0 }, { writeConcern: { w: 'majority' } });
// Step 1: Start a Client Session
const session = client.startSession();
// Step 2: Optional. Define options to use for the transaction
const transactionOptions = {
readPreference: 'primary',
readConcern: { level: 'local' },
writeConcern: { w: 'majority' }
};
// Step 3: Use withTransaction to start a transaction, execute the callback, and commit (or abort on error)
// Note: The callback for withTransaction MUST be async and/or return a Promise.
try {
await session.withTransaction(async () => {
const coll1 = client.db('mydb1').collection('foo');
const coll2 = client.db('mydb2').collection('bar');
// Important:: You must pass the session to the operations
await coll1.insertOne({ abc: 1 }, { session });
await coll2.insertOne({ xyz: 999 }, { session });
}, transactionOptions);
} finally {
await session.endSession();
await client.close();
}

Nota

Para el driver Perl, se puede ver en cambio el ejemplo de uso de Core API.

Importante

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

/*
* For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
* uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
* For a sharded cluster, connect to the mongos instances; e.g.
* uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
*/
$client = new \MongoDB\Client($uriString);
// Prerequisite: Create collections.
$client->selectCollection(
'mydb1',
'foo',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['abc' => 0]);
$client->selectCollection(
'mydb2',
'bar',
[
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->insertOne(['xyz' => 0]);
// Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
$callback = function (\MongoDB\Driver\Session $session) use ($client): void {
$client
->selectCollection('mydb1', 'foo')
->insertOne(['abc' => 1], ['session' => $session]);
$client
->selectCollection('mydb2', 'bar')
->insertOne(['xyz' => 999], ['session' => $session]);
};
// Step 2: Start a client session.
$session = $client->startSession();
// Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
\MongoDB\with_transaction($session, $callback);

Importante

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uriString = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = MongoClient(uriString)
wc_majority = WriteConcern("majority", wtimeout=1000)
# Prereq: Create collections.
client.get_database("mydb1", write_concern=wc_majority).foo.insert_one({"abc": 0})
client.get_database("mydb2", write_concern=wc_majority).bar.insert_one({"xyz": 0})
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
def callback(session):
collection_one = session.client.mydb1.foo
collection_two = session.client.mydb2.bar
# Important:: You must pass the session to the operations.
collection_one.insert_one({"abc": 1}, session=session)
collection_two.insert_one({"xyz": 999}, session=session)
# Step 2: Start a client session.
with client.start_session() as session:
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(callback)

Importante

En el ejemplo, se utiliza la nueva Callback API para trabajar con transacciones, que inicia una transacción, ejecuta las operaciones especificadas y realiza una confirmación (o anula en caso de error). La nueva Callback API incorpora lógica de reintento para errores de confirmación TransientTransactionError o UnknownTransactionCommitResult.

# For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.
# uriString = 'mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl'
# For a sharded cluster, connect to the mongos instances; e.g.
# uri_string = 'mongodb://mongos0.example.com:27017,mongos1.example.com:27017/'
client = Mongo::Client.new(uri_string, write_concern: {w: :majority, wtimeout: 1000})
# Prereq: Create collections.
client.use('mydb1')['foo'].insert_one(abc: 0)
client.use('mydb2')['bar'].insert_one(xyz: 0)
# Step 1: Define the callback that specifies the sequence of operations to perform inside the transactions.
callback = Proc.new do |my_session|
collection_one = client.use('mydb1')['foo']
collection_two = client.use('mydb2')['bar']
# Important: You must pass the session to the operations.
collection_one.insert_one({'abc': 1}, session: my_session)
collection_two.insert_one({'xyz': 999}, session: my_session)
end
#. Step 2: Start a client session.
session = client.start_session
# Step 3: Use with_transaction to start a transaction, execute the callback, and commit (or abort on error).
session.with_transaction(
read_concern: {level: :local},
write_concern: {w: :majority, wtimeout: 1000},
read: {mode: :primary},
&callback)

Nota

Para el driver de Scala, ver el ejemplo de uso Core API en su lugar.

La API de transacción principal no incorpora lógica de reintento para los errores etiquetados:


➤ Usar el menú desplegable Seleccionar lenguaje en la parte superior derecha para establecer el lenguaje de los ejemplos en esta página.


En el siguiente ejemplo, se incorpora la lógica para reintentar la transacción cuando se producen errores transitorios y para reintentar la confirmación cuando ocurre un error de confirmación desconocido:

/* takes a session, an out-param for server reply, and out-param for error. */
typedef bool (*txn_func_t)(mongoc_client_session_t *, bson_t *, bson_error_t *);
/* runs transactions with retry logic */
bool
run_transaction_with_retry(txn_func_t txn_func, mongoc_client_session_t *cs, bson_error_t *error)
{
bson_t reply;
bool r;
while (true) {
/* perform transaction */
r = txn_func(cs, &reply, error);
if (r) {
/* success */
bson_destroy(&reply);
return true;
}
MONGOC_WARNING("Transaction aborted: %s", error->message);
if (mongoc_error_has_label(&reply, "TransientTransactionError")) {
/* on transient error, retry the whole transaction */
MONGOC_WARNING("TransientTransactionError, retrying transaction...");
bson_destroy(&reply);
} else {
/* non-transient error */
break;
}
}
bson_destroy(&reply);
return false;
}
/* commit transactions with retry logic */
bool
commit_with_retry(mongoc_client_session_t *cs, bson_error_t *error)
{
bson_t reply;
bool r;
while (true) {
/* commit uses write concern set at transaction start, see
* mongoc_transaction_opts_set_write_concern */
r = mongoc_client_session_commit_transaction(cs, &reply, error);
if (r) {
MONGOC_DEBUG("Transaction committed");
break;
}
if (mongoc_error_has_label(&reply, "UnknownTransactionCommitResult")) {
MONGOC_WARNING("UnknownTransactionCommitResult, retrying commit ...");
bson_destroy(&reply);
} else {
/* commit failed, cannot retry */
break;
}
}
bson_destroy(&reply);
return r;
}
/* updates two collections in a transaction and calls commit_with_retry */
bool
update_employee_info(mongoc_client_session_t *cs, bson_t *reply, bson_error_t *error)
{
mongoc_client_t *client;
mongoc_collection_t *employees;
mongoc_collection_t *events;
mongoc_read_concern_t *rc;
mongoc_write_concern_t *wc;
mongoc_transaction_opt_t *txn_opts;
bson_t opts = BSON_INITIALIZER;
bson_t *filter = NULL;
bson_t *update = NULL;
bson_t *event = NULL;
bool r;
bson_init(reply);
client = mongoc_client_session_get_client(cs);
employees = mongoc_client_get_collection(client, "hr", "employees");
events = mongoc_client_get_collection(client, "reporting", "events");
rc = mongoc_read_concern_new();
mongoc_read_concern_set_level(rc, MONGOC_READ_CONCERN_LEVEL_SNAPSHOT);
wc = mongoc_write_concern_new();
mongoc_write_concern_set_w(
wc, MONGOC_WRITE_CONCERN_W_MAJORITY); /* Atlas connection strings include majority by default*/
txn_opts = mongoc_transaction_opts_new();
mongoc_transaction_opts_set_read_concern(txn_opts, rc);
mongoc_transaction_opts_set_write_concern(txn_opts, wc);
r = mongoc_client_session_start_transaction(cs, txn_opts, error);
if (!r) {
goto done;
}
r = mongoc_client_session_append(cs, &opts, error);
if (!r) {
goto done;
}
filter = BCON_NEW("employee", BCON_INT32(3));
update = BCON_NEW("$set", "{", "status", "Inactive", "}");
/* mongoc_collection_update_one will reinitialize reply */
bson_destroy(reply);
r = mongoc_collection_update_one(employees, filter, update, &opts, reply, error);
if (!r) {
goto abort;
}
event = BCON_NEW("employee", BCON_INT32(3));
BCON_APPEND(event, "status", "{", "new", "Inactive", "old", "Active", "}");
bson_destroy(reply);
r = mongoc_collection_insert_one(events, event, &opts, reply, error);
if (!r) {
goto abort;
}
r = commit_with_retry(cs, error);
abort:
if (!r) {
MONGOC_ERROR("Aborting due to error in transaction: %s", error->message);
mongoc_client_session_abort_transaction(cs, NULL);
}
done:
mongoc_collection_destroy(employees);
mongoc_collection_destroy(events);
mongoc_read_concern_destroy(rc);
mongoc_write_concern_destroy(wc);
mongoc_transaction_opts_destroy(txn_opts);
bson_destroy(&opts);
bson_destroy(filter);
bson_destroy(update);
bson_destroy(event);
return r;
}
void
example_func(mongoc_client_t *client)
{
mongoc_client_session_t *cs;
bson_error_t error;
bool r;
ASSERT(client);
cs = mongoc_client_start_session(client, NULL, &error);
if (!cs) {
MONGOC_ERROR("Could not start session: %s", error.message);
return;
}
r = run_transaction_with_retry(update_employee_info, cs, &error);
if (!r) {
MONGOC_ERROR("Could not update employee, permanent error: %s", error.message);
}
mongoc_client_session_destroy(cs);
}
g transaction_func = std::function<void(client_session & session)>;
run_transaction_with_retry = [](transaction_func txn_func, client_session& session) {
while (true) {
try {
txn_func(session); // performs transaction.
break;
} catch (operation_exception const& oe) {
std::cout << "Transaction aborted. Caught exception during transaction." << std::endl;
// If transient error, retry the whole transaction.
if (oe.has_error_label("TransientTransactionError")) {
std::cout << "TransientTransactionError, retrying transaction ..." << std::endl;
continue;
} else {
throw oe;
}
}
}
commit_with_retry = [](client_session& session) {
while (true) {
try {
session.commit_transaction(); // Uses write concern set at transaction start.
std::cout << "Transaction committed." << std::endl;
break;
} catch (operation_exception const& oe) {
// Can retry commit
if (oe.has_error_label("UnknownTransactionCommitResult")) {
std::cout << "UnknownTransactionCommitResult, retrying commit operation ..." << std::endl;
continue;
} else {
std::cout << "Error during commit ..." << std::endl;
throw oe;
}
}
}
pdates two collections in a transaction
update_employee_info = [&](client_session& session) {
auto& client = session.client();
auto employees = client["hr"]["employees"];
auto events = client["reporting"]["events"];
options::transaction txn_opts;
read_concern rc;
rc.acknowledge_level(read_concern::level::k_snapshot);
txn_opts.read_concern(rc);
write_concern wc;
wc.acknowledge_level(write_concern::level::k_majority);
txn_opts.write_concern(wc);
session.start_transaction(txn_opts);
try {
employees.update_one(
make_document(kvp("employee", 3)),
make_document(kvp("$set", make_document(kvp("status", "Inactive")))));
events.insert_one(make_document(
kvp("employee", 3), kvp("status", make_document(kvp("new", "Inactive"), kvp("old", "Active")))));
} catch (operation_exception const& oe) {
std::cout << "Caught exception during transaction, aborting." << std::endl;
session.abort_transaction();
throw oe;
}
commit_with_retry(session);
session = client.start_session();
{
run_transaction_with_retry(update_employee_info, session);
tch (operation_exception const& oe) {
// Do something with error.
throw oe;
public void RunTransactionWithRetry(Action<IMongoClient, IClientSessionHandle> txnFunc, IMongoClient client, IClientSessionHandle session)
{
while (true)
{
try
{
txnFunc(client, session); // performs transaction
break;
}
catch (MongoException exception)
{
// if transient error, retry the whole transaction
if (exception.HasErrorLabel("TransientTransactionError"))
{
Console.WriteLine("TransientTransactionError, retrying transaction.");
continue;
}
else
{
throw;
}
}
}
}
public void CommitWithRetry(IClientSessionHandle session)
{
while (true)
{
try
{
session.CommitTransaction();
Console.WriteLine("Transaction committed.");
break;
}
catch (MongoException exception)
{
// can retry commit
if (exception.HasErrorLabel("UnknownTransactionCommitResult"))
{
Console.WriteLine("UnknownTransactionCommitResult, retrying commit operation");
continue;
}
else
{
Console.WriteLine($"Error during commit: {exception.Message}.");
throw;
}
}
}
}
// updates two collections in a transaction
public void UpdateEmployeeInfo(IMongoClient client, IClientSessionHandle session)
{
var employeesCollection = client.GetDatabase("hr").GetCollection<BsonDocument>("employees");
var eventsCollection = client.GetDatabase("reporting").GetCollection<BsonDocument>("events");
session.StartTransaction(new TransactionOptions(
readConcern: ReadConcern.Snapshot,
writeConcern: WriteConcern.WMajority));
try
{
employeesCollection.UpdateOne(
session,
Builders<BsonDocument>.Filter.Eq("employee", 3),
Builders<BsonDocument>.Update.Set("status", "Inactive"));
eventsCollection.InsertOne(
session,
new BsonDocument
{
{ "employee", 3 },
{ "status", new BsonDocument { { "new", "Inactive" }, { "old", "Active" } } }
});
}
catch (Exception exception)
{
Console.WriteLine($"Caught exception during transaction, aborting: {exception.Message}.");
session.AbortTransaction();
throw;
}
CommitWithRetry(session);
}
public void UpdateEmployeeInfoWithTransactionRetry(IMongoClient client)
{
// start a session
using (var session = client.StartSession())
{
try
{
RunTransactionWithRetry(UpdateEmployeeInfo, client, session);
}
catch (Exception exception)
{
// do something with error
Console.WriteLine($"Non transient exception caught during transaction: ${exception.Message}.");
}
}
}
runTransactionWithRetry := func(ctx context.Context, txnFn func(context.Context) error) error {
for {
err := txnFn(ctx) // Performs transaction.
if err == nil {
return nil
}
log.Println("Transaction aborted. Caught exception during transaction.")
// If transient error, retry the whole transaction
if cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel("TransientTransactionError") {
log.Println("TransientTransactionError, retrying transaction...")
continue
}
return err
}
}
commitWithRetry := func(ctx context.Context) error {
sess := mongo.SessionFromContext(ctx)
for {
err := sess.CommitTransaction(ctx)
switch e := err.(type) {
case nil:
log.Println("Transaction committed.")
return nil
case mongo.CommandError:
// Can retry commit
if e.HasErrorLabel("UnknownTransactionCommitResult") {
log.Println("UnknownTransactionCommitResult, retrying commit operation...")
continue
}
log.Println("Error during commit...")
return e
default:
log.Println("Error during commit...")
return e
}
}
}
// Updates two collections in a transaction.
updateEmployeeInfo := func(ctx context.Context) error {
employees := client.Database("hr").Collection("employees")
events := client.Database("reporting").Collection("events")
sess := mongo.SessionFromContext(ctx)
err := sess.StartTransaction(options.Transaction().
SetReadConcern(readconcern.Snapshot()).
SetWriteConcern(writeconcern.Majority()),
)
if err != nil {
return err
}
_, err = employees.UpdateOne(ctx, bson.D{{"employee", 3}}, bson.D{{"$set", bson.D{{"status", "Inactive"}}}})
if err != nil {
sess.AbortTransaction(ctx)
log.Println("caught exception during transaction, aborting.")
return err
}
_, err = events.InsertOne(ctx, bson.D{{"employee", 3}, {"status", bson.D{{"new", "Inactive"}, {"old", "Active"}}}})
if err != nil {
sess.AbortTransaction(ctx)
log.Println("caught exception during transaction, aborting.")
return err
}
return commitWithRetry(ctx)
}
txnOpts := options.Transaction().SetReadPreference(readpref.Primary())
return client.UseSessionWithOptions(
ctx, options.Session().SetDefaultTransactionOptions(txnOpts),
func(ctx context.Context) error {
return runTransactionWithRetry(ctx, updateEmployeeInfo)
},
)
}

Importante

Para asociar las operaciones de lectura y escritura a una transacción, debes pasar la sesión a cada operación de la transacción.

void runTransactionWithRetry(Runnable transactional) {
while (true) {
try {
transactional.run();
break;
} catch (MongoException e) {
System.out.println("Transaction aborted. Caught exception during transaction.");
if (e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)) {
System.out.println("TransientTransactionError, aborting transaction and retrying ...");
continue;
} else {
throw e;
}
}
}
}
void commitWithRetry(ClientSession clientSession) {
while (true) {
try {
clientSession.commitTransaction();
System.out.println("Transaction committed");
break;
} catch (MongoException e) {
// can retry commit
if (e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
System.out.println("UnknownTransactionCommitResult, retrying commit operation ...");
continue;
} else {
System.out.println("Exception during commit ...");
throw e;
}
}
}
}
void updateEmployeeInfo() {
MongoCollection<Document> employeesCollection = client.getDatabase("hr").getCollection("employees");
MongoCollection<Document> eventsCollection = client.getDatabase("reporting").getCollection("events");
TransactionOptions txnOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.MAJORITY)
.writeConcern(WriteConcern.MAJORITY)
.build();
try (ClientSession clientSession = client.startSession()) {
clientSession.startTransaction(txnOptions);
employeesCollection.updateOne(clientSession,
Filters.eq("employee", 3),
Updates.set("status", "Inactive"));
eventsCollection.insertOne(clientSession,
new Document("employee", 3).append("status", new Document("new", "Inactive").append("old", "Active")));
commitWithRetry(clientSession);
}
}
void updateEmployeeInfoWithRetry() {
runTransactionWithRetry(this::updateEmployeeInfo);
}

Nota

Para Motor, ver Callback API en su lugar.

Importante

Para asociar las operaciones de lectura y escritura a una transacción, debes pasar la sesión a cada operación de la transacción.

async function commitWithRetry(session) {
try {
await session.commitTransaction();
console.log('Transaction committed.');
} catch (error) {
if (error.hasErrorLabel('UnknownTransactionCommitResult')) {
console.log('UnknownTransactionCommitResult, retrying commit operation ...');
await commitWithRetry(session);
} else {
console.log('Error during commit ...');
throw error;
}
}
}
async function runTransactionWithRetry(txnFunc, client, session) {
try {
await txnFunc(client, session);
} catch (error) {
console.log('Transaction aborted. Caught exception during transaction.');
// If transient error, retry the whole transaction
if (error.hasErrorLabel('TransientTransactionError')) {
console.log('TransientTransactionError, retrying transaction ...');
await runTransactionWithRetry(txnFunc, client, session);
} else {
throw error;
}
}
}
async function updateEmployeeInfo(client, session) {
session.startTransaction({
readConcern: { level: 'snapshot' },
writeConcern: { w: 'majority' },
readPreference: 'primary'
});
const employeesCollection = client.db('hr').collection('employees');
const eventsCollection = client.db('reporting').collection('events');
await employeesCollection.updateOne(
{ employee: 3 },
{ $set: { status: 'Inactive' } },
{ session }
);
await eventsCollection.insertOne(
{
employee: 3,
status: { new: 'Inactive', old: 'Active' }
},
{ session }
);
try {
await commitWithRetry(session);
} catch (error) {
await session.abortTransaction();
throw error;
}
}
return client.withSession(session =>
runTransactionWithRetry(updateEmployeeInfo, client, session)
);

Importante

Para asociar las operaciones de lectura y escritura a una transacción, debes pasar la sesión a cada operación de la transacción.

sub runTransactionWithRetry {
my ( $txnFunc, $session ) = @_;
LOOP: {
eval {
$txnFunc->($session); # performs transaction
};
if ( my $error = $@ ) {
print("Transaction aborted-> Caught exception during transaction.\n");
# If transient error, retry the whole transaction
if ( $error->has_error_label("TransientTransactionError") ) {
print("TransientTransactionError, retrying transaction ->..\n");
redo LOOP;
}
else {
die $error;
}
}
}
return;
}
sub commitWithRetry {
my ($session) = @_;
LOOP: {
eval {
$session->commit_transaction(); # Uses write concern set at transaction start.
print("Transaction committed->\n");
};
if ( my $error = $@ ) {
# Can retry commit
if ( $error->has_error_label("UnknownTransactionCommitResult") ) {
print("UnknownTransactionCommitResult, retrying commit operation ->..\n");
redo LOOP;
}
else {
print("Error during commit ->..\n");
die $error;
}
}
}
return;
}
# Updates two collections in a transactions
sub updateEmployeeInfo {
my ($session) = @_;
my $employeesCollection = $session->client->ns("hr.employees");
my $eventsCollection = $session->client->ns("reporting.events");
$session->start_transaction(
{
readConcern => { level => "snapshot" },
writeConcern => { w => "majority" },
readPreference => 'primary',
}
);
eval {
$employeesCollection->update_one(
{ employee => 3 }, { '$set' => { status => "Inactive" } },
{ session => $session},
);
$eventsCollection->insert_one(
{ employee => 3, status => { new => "Inactive", old => "Active" } },
{ session => $session},
);
};
if ( my $error = $@ ) {
print("Caught exception during transaction, aborting->\n");
$session->abort_transaction();
die $error;
}
commitWithRetry($session);
}
# Start a session
my $session = $client->start_session();
eval {
runTransactionWithRetry(\&updateEmployeeInfo, $session);
};
if ( my $error = $@ ) {
# Do something with error
}
$session->end_session();

Importante

Para asociar las operaciones de lectura y escritura a una transacción, debes pasar la sesión a cada operación de la transacción.

private function runTransactionWithRetry3(callable $txnFunc, \MongoDB\Client $client, \MongoDB\Driver\Session $session): void
{
while (true) {
try {
$txnFunc($client, $session); // performs transaction
break;
} catch (\MongoDB\Driver\Exception\CommandException $error) {
$resultDoc = $error->getResultDocument();
// If transient error, retry the whole transaction
if (isset($resultDoc->errorLabels) && in_array('TransientTransactionError', $resultDoc->errorLabels)) {
continue;
} else {
throw $error;
}
} catch (\MongoDB\Driver\Exception\Exception $error) {
throw $error;
}
}
}
private function commitWithRetry3(\MongoDB\Driver\Session $session): void
{
while (true) {
try {
$session->commitTransaction();
echo "Transaction committed.\n";
break;
} catch (\MongoDB\Driver\Exception\CommandException $error) {
$resultDoc = $error->getResultDocument();
if (isset($resultDoc->errorLabels) && in_array('UnknownTransactionCommitResult', $resultDoc->errorLabels)) {
echo "UnknownTransactionCommitResult, retrying commit operation ...\n";
continue;
} else {
echo "Error during commit ...\n";
throw $error;
}
} catch (\MongoDB\Driver\Exception\Exception $error) {
echo "Error during commit ...\n";
throw $error;
}
}
}
private function updateEmployeeInfo3(\MongoDB\Client $client, \MongoDB\Driver\Session $session): void
{
$session->startTransaction([
'readConcern' => new \MongoDB\Driver\ReadConcern('snapshot'),
'readPrefernece' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::PRIMARY),
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY),
]);
try {
$client->hr->employees->updateOne(
['employee' => 3],
['$set' => ['status' => 'Inactive']],
['session' => $session],
);
$client->reporting->events->insertOne(
['employee' => 3, 'status' => ['new' => 'Inactive', 'old' => 'Active']],
['session' => $session],
);
} catch (\MongoDB\Driver\Exception\Exception $error) {
echo "Caught exception during transaction, aborting.\n";
$session->abortTransaction();
throw $error;
}
$this->commitWithRetry3($session);
}
private function doUpdateEmployeeInfo(\MongoDB\Client $client): void
{
// Start a session.
$session = $client->startSession();
try {
$this->runTransactionWithRetry3([$this, 'updateEmployeeInfo3'], $client, $session);
} catch (\MongoDB\Driver\Exception\Exception) {
// Do something with error
}
}

Importante

Para asociar las operaciones de lectura y escritura a una transacción, debes pasar la sesión a cada operación de la transacción.

def run_transaction_with_retry(txn_func, session):
while True:
try:
txn_func(session) # performs transaction
break
except (ConnectionFailure, OperationFailure) as exc:
# If transient error, retry the whole transaction
if exc.has_error_label("TransientTransactionError"):
print("TransientTransactionError, retrying transaction ...")
continue
else:
raise
def commit_with_retry(session):
while True:
try:
# Commit uses write concern set at transaction start.
session.commit_transaction()
print("Transaction committed.")
break
except (ConnectionFailure, OperationFailure) as exc:
# Can retry commit
if exc.has_error_label("UnknownTransactionCommitResult"):
print("UnknownTransactionCommitResult, retrying commit operation ...")
continue
else:
print("Error during commit ...")
raise
# Updates two collections in a transactions
def update_employee_info(session):
employees_coll = session.client.hr.employees
events_coll = session.client.reporting.events
with session.start_transaction(
read_concern=ReadConcern("snapshot"),
write_concern=WriteConcern(w="majority"),
read_preference=ReadPreference.PRIMARY,
):
employees_coll.update_one(
{"employee": 3}, {"$set": {"status": "Inactive"}}, session=session
)
events_coll.insert_one(
{"employee": 3, "status": {"new": "Inactive", "old": "Active"}}, session=session
)
commit_with_retry(session)
# Start a session.
with client.start_session() as session:
try:
run_transaction_with_retry(update_employee_info, session)
except Exception:
# Do something with error.
raise

Importante

Para asociar las operaciones de lectura y escritura a una transacción, debes pasar la sesión a cada operación de la transacción.

def run_transaction_with_retry(session)
begin
yield session # performs transaction
rescue Mongo::Error => e
puts 'Transaction aborted. Caught exception during transaction.'
raise unless e.label?('TransientTransactionError')
puts "TransientTransactionError, retrying transaction ..."
retry
end
end
def commit_with_retry(session)
begin
session.commit_transaction
puts 'Transaction committed.'
rescue Mongo::Error => e
if e.label?('UnknownTransactionCommitResult')
puts "UnknownTransactionCommitResult, retrying commit operation ..."
retry
else
puts 'Error during commit ...'
raise
end
end
end
# updates two collections in a transaction
def update_employee_info(session)
employees_coll = session.client.use(:hr)[:employees]
events_coll = session.client.use(:reporting)[:events]
session.start_transaction(read_concern: { level: :snapshot },
write_concern: { w: :majority },
read: {mode: :primary})
employees_coll.update_one({ employee: 3 }, { '$set' => { status: 'Inactive'} },
session: session)
events_coll.insert_one({ employee: 3, status: { new: 'Inactive', old: 'Active' } },
session: session)
commit_with_retry(session)
end
session = client.start_session
begin
run_transaction_with_retry(session) do
update_employee_info(session)
end
rescue StandardError => e
# Do something with error
raise
end

Importante

Para asociar las operaciones de lectura y escritura a una transacción, debes pasar la sesión a cada operación de la transacción.

/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mongodb.scala
import org.mongodb.scala.model.{Filters, Updates}
import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Await
import scala.concurrent.duration.Duration
//scalastyle:off magic.number
class DocumentationTransactionsExampleSpec extends RequiresMongoDBISpec {
// Implicit functions that execute the Observable and return the results
val waitDuration = Duration(5, "seconds")
implicit class ObservableExecutor[T](observable: Observable[T]) {
def execute(): Seq[T] = Await.result(observable.toFuture(), waitDuration)
}
implicit class SingleObservableExecutor[T](observable: SingleObservable[T]) {
def execute(): T = Await.result(observable.toFuture(), waitDuration)
}
// end implicit functions
"The Scala driver" should "be able to commit a transaction" in withClient { client =>
assume(serverVersionAtLeast(List(4, 0, 0)) && !hasSingleHost())
client.getDatabase("hr").drop().execute()
client.getDatabase("hr").createCollection("employees").execute()
client.getDatabase("hr").createCollection("events").execute()
updateEmployeeInfoWithRetry(client).execute() should equal(Completed())
client.getDatabase("hr").drop().execute() should equal(Completed())
}
def updateEmployeeInfo(database: MongoDatabase, observable: SingleObservable[ClientSession]): SingleObservable[ClientSession] = {
observable.map(clientSession => {
val employeesCollection = database.getCollection("employees")
val eventsCollection = database.getCollection("events")
val transactionOptions = TransactionOptions.builder()
.readPreference(ReadPreference.primary())
.readConcern(ReadConcern.SNAPSHOT)
.writeConcern(WriteConcern.MAJORITY)
.build()
clientSession.startTransaction(transactionOptions)
employeesCollection.updateOne(clientSession, Filters.eq("employee", 3), Updates.set("status", "Inactive"))
.subscribe((res: UpdateResult) => println(res))
eventsCollection.insertOne(clientSession, Document("employee" -> 3, "status" -> Document("new" -> "Inactive", "old" -> "Active")))
.subscribe((res: Completed) => println(res))
clientSession
})
}
def commitAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) => {
println("UnknownTransactionCommitResult, retrying commit operation ...")
commitAndRetry(observable)
}
case e: Exception => {
println(s"Exception during commit ...: $e")
throw e
}
})
}
def runTransactionAndRetry(observable: SingleObservable[Completed]): SingleObservable[Completed] = {
observable.recoverWith({
case e: MongoException if e.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL) => {
println("TransientTransactionError, aborting transaction and retrying ...")
runTransactionAndRetry(observable)
}
})
}
def updateEmployeeInfoWithRetry(client: MongoClient): SingleObservable[Completed] = {
val database = client.getDatabase("hr")
val updateEmployeeInfoObservable: Observable[ClientSession] = updateEmployeeInfo(database, client.startSession())
val commitTransactionObservable: SingleObservable[Completed] =
updateEmployeeInfoObservable.flatMap(clientSession => clientSession.commitTransaction())
val commitAndRetryObservable: SingleObservable[Completed] = commitAndRetry(commitTransactionObservable)
runTransactionAndRetry(commitAndRetryObservable)
}
}

Independientemente del sistema de base de datos, ya sea MongoDB o bases de datos relacionales, las aplicaciones deben tomar medidas para gestionar los errores durante la confirmación de transacciones e incorporar una lógica de reintento para las transacciones.

Las operaciones de guardado individuales dentro de la transacción no se pueden volver a intentar, independientemente del valor de retryWrites. Si en una operación, se produce un error asociado con la etiqueta "TransientTransactionError", por ejemplo, cuando el primario se degrada, se puede volver a intentar la transacción en su totalidad.

  • La callback API incorpora lógica de reintento para "TransientTransactionError".

  • La API central de transacción no incorpora lógica de reintentos para "TransientTransactionError". Para gestionar "TransientTransactionError", las aplicaciones deben incorporar explícitamente la lógica de reintentos para el error. Para ver un ejemplo que incorpora lógica de reintentos para errores transitorios, consulta Ejemplo de Core API.

Las operaciones de confirmación son operaciones de escritura reintentable. Si se produce un error en la operación de confirmación, los drivers de MongoDB vuelven a intentar la confirmación independientemente del valor de retryWrites.

Si en la operación de confirmación, se produce un error etiquetado "UnknownTransactionCommitResult", se puede reintentar la confirmación.

  • La callback API incorpora lógica de reintento para "UnknownTransactionCommitResult".

  • La API central de transacción no incorpora lógica de reintentos para "UnknownTransactionCommitResult". Para gestionar "UnknownTransactionCommitResult", las aplicaciones deben incorporar explícitamente la lógica de reintentos para el error. Para ver un ejemplo que incorpora la lógica de reintento para errores de confirmación desconocidos, consulta Ejemplo de Core API.

Nuevo en la versión 6.2.

A partir de MongoDB 6.2, el servidor no reintenta la transacción si recibe un error TransactionTooLargeForCache. Este error significa que la caché es demasiado pequeña y es probable que falle el reintento.

El valor por defecto para el umbral transactionTooLargeForCacheThreshold es 0.75. El servidor devuelve TransactionTooLargeForCache en lugar de reintentar la transacción cuando esta usa más del 75 % de la caché.

En versiones anteriores de MongoDB, el servidor devolvía TemporarilyUnavailable o WriteConflict en lugar de TransactionTooLargeForCache.

Utiliza el comando setParameter para modificar el umbral de error.

Los siguientes métodos mongosh están disponibles para las transacciones:

Nota

El ejemplo mongosh omite la lógica de reintento y el manejo robusto de errores para simplificarlo. Para ver un ejemplo más práctico de cómo incorporar transacciones en las aplicaciones, consulta Manejo de errores de las transacciones.

// Create collections:
db.getSiblingDB("mydb1").foo.insertOne(
{abc: 0},
{ writeConcern: { w: "majority", wtimeout: 2000 } }
)
db.getSiblingDB("mydb2").bar.insertOne(
{xyz: 0},
{ writeConcern: { w: "majority", wtimeout: 2000 } }
)
// Start a session.
session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );
coll1 = session.getDatabase("mydb1").foo;
coll2 = session.getDatabase("mydb2").bar;
// Start a transaction
session.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } );
// Operations inside the transaction
try {
coll1.insertOne( { abc: 1 } );
coll2.insertOne( { xyz: 999 } );
} catch (error) {
// Abort transaction on error
session.abortTransaction();
throw error;
}
// Commit the transaction using write concern set at transaction start
session.commitTransaction();
session.endSession();

Volver

Transacciones

En esta página