Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /
Query

Realice consultas de snapshots de larga duración

Las queries snapshot permiten leer datos tal como aparecieron en un único punto en el tiempo, en el pasado reciente.

A partir de MongoDB 5.0, puede usar la preocupación de lectura "snapshot" para query datos en nodos secundarios. Esta funcionalidad incrementa la versatilidad y resiliencia de las lecturas de tu aplicación. No es necesario crear una copia estática de tus datos, moverlos a un sistema aparte, y aislar manualmente estas queries de larga duración para que no interfieran en tu carga de trabajo operativa. En vez de eso, puedes ejecutar consultas de larga duración contra una base de datos transaccional en vivo mientras lees desde un estado coherente de los datos.

Utilizar el nivel de consistencia de lectura "snapshot" en nodos secundarios no afecta la carga de trabajo de escritura de la aplicación. Solo las lecturas de aplicaciones se benefician de que las consultas de larga duración estén aisladas a los secundarios.

Utiliza consultas por snapshot cuando se quiera:

  • Realizar múltiples queries relacionadas y asegurarse de que cada query lea datos desde el mismo punto en el tiempo.

  • Asegúrate de que estás leyendo desde un estado coherente de los datos desde algún punto en el pasado.

Cuando MongoDB realiza queries de larga duración usando el nivel de consistencia de lectura "local" por defecto, los resultados de la query pueden contener datos de guardados que ocurren al mismo tiempo que la query. Como resultado, la query puede devolver resultados inesperados o inconsistentes.

Para evitar este escenario, crea una sesión y especifica el nivel de consistencia de lectura "snapshot". Con nivel de consistencia de lectura "snapshot", MongoDB ejecuta tu consulta con aislamiento de instantáneas, lo que significa que tu consulta lee datos tal como aparecieron en un solo punto en el pasado reciente.

Los ejemplos de esta página muestran cómo se pueden usar snapshot queries para:

Nivel de consistencia de lectura "snapshot" te permite ejecutar varias queries relacionadas en una sesión y garantizar que cada query lea datos del mismo punto en el tiempo.

Un refugio de animales tiene una base de datos pets que contiene colecciones para cada tipo de mascota. La base de datos pets tiene estas colecciones:

  • cats

  • dogs

Cada documento en cada colección contiene un campo adoptable, que indica si la mascota está disponible para adopción. Por ejemplo, un documento en la colección cats se ve así:

{
"name": "Whiskers",
"color": "white",
"age": 10,
"adoptable": true
}

Quieres ejecutar una query para ver el número total de mascotas disponibles para adopción en todas las colecciones. Para proporcionar una visión coherente de los datos, debes asegurarte de que los datos devueltos por cada colección correspondan a un único punto en el tiempo.

Para lograr este objetivo, utilice la preocupación "snapshot" de lectura dentro de una sesión:

mongoc_client_session_t *cs = NULL;
mongoc_collection_t *cats_collection = NULL;
mongoc_collection_t *dogs_collection = NULL;
int64_t adoptable_pets_count = 0;
bson_error_t error;
mongoc_session_opt_t *session_opts;
cats_collection = mongoc_client_get_collection(client, "pets", "cats");
dogs_collection = mongoc_client_get_collection(client, "pets", "dogs");
/* Seed 'pets.cats' and 'pets.dogs' with example data */
if (!pet_setup(cats_collection, dogs_collection)) {
goto cleanup;
}
/* start a snapshot session */
session_opts = mongoc_session_opts_new();
mongoc_session_opts_set_snapshot(session_opts, true);
cs = mongoc_client_start_session(client, session_opts, &error);
mongoc_session_opts_destroy(session_opts);
if (!cs) {
MONGOC_ERROR("Could not start session: %s", error.message);
goto cleanup;
}
/*
* Perform the following aggregation pipeline, and accumulate the count in
* `adoptable_pets_count`.
*
* adoptablePetsCount = db.cats.aggregate(
* [ { "$match": { "adoptable": true } },
* { "$count": "adoptableCatsCount" } ], session=s
* ).next()["adoptableCatsCount"]
*
* adoptablePetsCount += db.dogs.aggregate(
* [ { "$match": { "adoptable": True} },
* { "$count": "adoptableDogsCount" } ], session=s
* ).next()["adoptableDogsCount"]
*
* Remember in order to apply the client session to
* this operation, you must append the client session to the options passed
* to `mongoc_collection_aggregate`, i.e.,
*
* mongoc_client_session_append (cs, &opts, &error);
* cursor = mongoc_collection_aggregate (
* collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL);
*/
accumulate_adoptable_count(cs, cats_collection, &adoptable_pets_count);
accumulate_adoptable_count(cs, dogs_collection, &adoptable_pets_count);
printf("there are %" PRId64 " adoptable pets\n", adoptable_pets_count);
using namespace mongocxx;
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_document;
auto db = client["pets"];
int64_t adoptable_pets_count = 0;
auto opts = mongocxx::options::client_session{};
opts.snapshot(true);
auto session = client.start_session(opts);
{
pipeline p;
p.match(make_document(kvp("adoptable", true))).count("adoptableCatsCount");
auto cursor = db["cats"].aggregate(session, p);
for (auto doc : cursor) {
adoptable_pets_count += doc.find("adoptableCatsCount")->get_int32();
}
}
{
pipeline p;
p.match(make_document(kvp("adoptable", true))).count("adoptableDogsCount");
auto cursor = db["dogs"].aggregate(session, p);
for (auto doc : cursor) {
adoptable_pets_count += doc.find("adoptableDogsCount")->get_int32();
}
}
ctx := context.TODO()
sess, err := client.StartSession(options.Session().SetSnapshot(true))
if err != nil {
return err
}
defer sess.EndSession(ctx)
var adoptablePetsCount int32
err = mongo.WithSession(ctx, sess, func(ctx context.Context) error {
// Count the adoptable cats
const adoptableCatsOutput = "adoptableCatsCount"
cursor, err := db.Collection("cats").Aggregate(ctx, mongo.Pipeline{
bson.D{{"$match", bson.D{{"adoptable", true}}}},
bson.D{{"$count", adoptableCatsOutput}},
})
if err != nil {
return err
}
if !cursor.Next(ctx) {
return fmt.Errorf("expected aggregate to return a document, but got none")
}
resp := cursor.Current.Lookup(adoptableCatsOutput)
adoptableCatsCount, ok := resp.Int32OK()
if !ok {
return fmt.Errorf("failed to find int32 field %q in document %v", adoptableCatsOutput, cursor.Current)
}
adoptablePetsCount += adoptableCatsCount
// Count the adoptable dogs
const adoptableDogsOutput = "adoptableDogsCount"
cursor, err = db.Collection("dogs").Aggregate(ctx, mongo.Pipeline{
bson.D{{"$match", bson.D{{"adoptable", true}}}},
bson.D{{"$count", adoptableDogsOutput}},
})
if err != nil {
return err
}
if !cursor.Next(ctx) {
return fmt.Errorf("expected aggregate to return a document, but got none")
}
resp = cursor.Current.Lookup(adoptableDogsOutput)
adoptableDogsCount, ok := resp.Int32OK()
if !ok {
return fmt.Errorf("failed to find int32 field %q in document %v", adoptableDogsOutput, cursor.Current)
}
adoptablePetsCount += adoptableDogsCount
return nil
})
if err != nil {
return err
}
db = client.pets
async with await client.start_session(snapshot=True) as s:
adoptablePetsCount = 0
docs = await db.cats.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}], session=s
).to_list(None)
adoptablePetsCount = docs[0]["adoptableCatsCount"]
docs = await db.dogs.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}], session=s
).to_list(None)
adoptablePetsCount += docs[0]["adoptableDogsCount"]
print(adoptablePetsCount)
$catsCollection = $client->selectCollection('pets', 'cats');
$dogsCollection = $client->selectCollection('pets', 'dogs');
$session = $client->startSession(['snapshot' => true]);
$adoptablePetsCount = $catsCollection->aggregate(
[
['$match' => ['adoptable' => true]],
['$count' => 'adoptableCatsCount'],
],
['session' => $session],
)->toArray()[0]->adoptableCatsCount;
$adoptablePetsCount += $dogsCollection->aggregate(
[
['$match' => ['adoptable' => true]],
['$count' => 'adoptableDogsCount'],
],
['session' => $session],
)->toArray()[0]->adoptableDogsCount;
var_dump($adoptablePetsCount);
db = client.pets
with client.start_session(snapshot=True) as s:
adoptablePetsCount = (
(
db.cats.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableCatsCount"}],
session=s,
)
).next()
)["adoptableCatsCount"]
adoptablePetsCount += (
(
db.dogs.aggregate(
[{"$match": {"adoptable": True}}, {"$count": "adoptableDogsCount"}],
session=s,
)
).next()
)["adoptableDogsCount"]
print(adoptablePetsCount)
client = Mongo::Client.new(uri_string, database: 'pets')
client.start_session(snapshot: true) do |session|
adoptable_pets_count = client['cats'].aggregate([
{ '$match': { adoptable: true } },
{ '$count': 'adoptable_cats_count' }
], session: session).first['adoptable_cats_count']
adoptable_pets_count += client['dogs'].aggregate([
{ '$match': { adoptable: true } },
{ '$count': 'adoptable_dogs_count' }
], session: session).first['adoptable_dogs_count']
puts adoptable_pets_count
end

La serie previa de comandos:

  • Utiliza MongoClient() para establecer una conexión con la implementación de MongoDB.

  • Cambia a la base de datos pets.

  • Establece una sesión. El comando especifica snapshot=True, por lo que la sesión utiliza el nivel de consistencia de lectura "snapshot".

  • Realiza estas acciones para cada colección en la base de datos pets:

    • Utiliza $match para filtrar los documentos donde el campo adoptable es True.

    • Utiliza $count para devolver un recuento de los documentos filtrados.

    • Incrementa la variable adoptablePetsCount con el recuento de la base de datos.

  • Imprime la variable adoptablePetsCount.

Todas las queries dentro de la sesión leen los datos tal y como se encontraban en el mismo punto. Como resultado, el recuento final refleja una snapshot coherente de los datos.

Nota

Si la sesión dura más que el periodo de retención del historial de WiredTiger (300 segundos, por defecto), la consulta genera un SnapshotTooOld error. Para saber cómo configurar la retención de instantáneas y habilitar consultas de mayor duración, consulte Configurar la retención de instantáneas.

El nivel de consistencia de lectura "snapshot" garantiza que su query lea los datos tal como aparecieron en un solo punto en el tiempo en el pasado reciente.

Una tienda de zapatos en línea tiene una colección sales que contiene datos de cada artículo vendido. Por ejemplo, un documento de la colección sales se ve así:

{
"shoeType": "boot",
"price": 30,
"saleDate": ISODate("2022-02-02T06:01:17.171Z")
}

Cada día a la medianoche, se ejecuta una query para ver cuántos pares de zapatos se vendieron ese día. La query de ventas diarias se ve así:

mongoc_client_session_t *cs = NULL;
mongoc_collection_t *sales_collection = NULL;
bson_error_t error;
mongoc_session_opt_t *session_opts;
bson_t *pipeline = NULL;
bson_t opts = BSON_INITIALIZER;
mongoc_cursor_t *cursor = NULL;
const bson_t *doc = NULL;
bool ok = true;
bson_iter_t iter;
int64_t total_sales = 0;
sales_collection = mongoc_client_get_collection(client, "retail", "sales");
/* seed 'retail.sales' with example data */
if (!retail_setup(sales_collection)) {
goto cleanup;
}
/* start a snapshot session */
session_opts = mongoc_session_opts_new();
mongoc_session_opts_set_snapshot(session_opts, true);
cs = mongoc_client_start_session(client, session_opts, &error);
mongoc_session_opts_destroy(session_opts);
if (!cs) {
MONGOC_ERROR("Could not start session: %s", error.message);
goto cleanup;
}
if (!mongoc_client_session_append(cs, &opts, &error)) {
MONGOC_ERROR("could not apply session options: %s", error.message);
goto cleanup;
}
pipeline = BCON_NEW("pipeline",
"[",
"{",
"$match",
"{",
"$expr",
"{",
"$gt",
"[",
"$saleDate",
"{",
"$dateSubtract",
"{",
"startDate",
"$$NOW",
"unit",
BCON_UTF8("day"),
"amount",
BCON_INT64(1),
"}",
"}",
"]",
"}",
"}",
"}",
"{",
"$count",
BCON_UTF8("totalDailySales"),
"}",
"]");
cursor = mongoc_collection_aggregate(sales_collection, MONGOC_QUERY_NONE, pipeline, &opts, NULL);
bson_destroy(&opts);
ok = mongoc_cursor_next(cursor, &doc);
if (mongoc_cursor_error(cursor, &error)) {
MONGOC_ERROR("could not get totalDailySales: %s", error.message);
goto cleanup;
}
if (!ok) {
MONGOC_ERROR("%s", "cursor has no results");
goto cleanup;
}
ok = bson_iter_init_find(&iter, doc, "totalDailySales");
if (ok) {
total_sales = bson_iter_as_int64(&iter);
} else {
MONGOC_ERROR("%s", "missing key: 'totalDailySales'");
goto cleanup;
}
using namespace mongocxx;
using bsoncxx::builder::basic::kvp;
using bsoncxx::builder::basic::make_array;
using bsoncxx::builder::basic::make_document;
auto opts = mongocxx::options::client_session{};
opts.snapshot(true);
auto session = client.start_session(opts);
auto db = client["retail"];
pipeline p;
p
.match(make_document(kvp(
"$expr",
make_document(kvp(
"$gt",
make_array(
"$saleDate", make_document(kvp("startDate", "$$NOW"), kvp("unit", "day"), kvp("amount", 1))))))))
.count("totalDailySales");
auto cursor = db["sales"].aggregate(session, p);
auto doc = *cursor.begin();
auto total_daily_sales = doc.find("totalDailySales")->get_int32();
ctx := context.TODO()
sess, err := client.StartSession(options.Session().SetSnapshot(true))
if err != nil {
return err
}
defer sess.EndSession(ctx)
var totalDailySales int32
err = mongo.WithSession(ctx, sess, func(ctx context.Context) error {
// Count the total daily sales
const totalDailySalesOutput = "totalDailySales"
cursor, err := db.Collection("sales").Aggregate(ctx, mongo.Pipeline{
bson.D{{
"$match",
bson.D{{
"$expr",
bson.D{{
"$gt",
bson.A{
"$saleDate",
bson.D{{
"$dateSubtract",
bson.D{
{"startDate", "$$NOW"},
{"unit", "day"},
{"amount", 1},
},
}},
},
}},
}},
}},
bson.D{{"$count", totalDailySalesOutput}},
})
if err != nil {
return err
}
if !cursor.Next(ctx) {
return fmt.Errorf("expected aggregate to return a document, but got none")
}
resp := cursor.Current.Lookup(totalDailySalesOutput)
var ok bool
totalDailySales, ok = resp.Int32OK()
if !ok {
return fmt.Errorf("failed to find int32 field %q in document %v", totalDailySalesOutput, cursor.Current)
}
return nil
})
if err != nil {
return err
}
db = client.retail
async with await client.start_session(snapshot=True) as s:
docs = await db.sales.aggregate(
[
{
"$match": {
"$expr": {
"$gt": [
"$saleDate",
{
"$dateSubtract": {
"startDate": "$$NOW",
"unit": "day",
"amount": 1,
}
},
]
}
}
},
{"$count": "totalDailySales"},
],
session=s,
).to_list(None)
total = docs[0]["totalDailySales"]
print(total)
$salesCollection = $client->selectCollection('retail', 'sales');
$session = $client->startSession(['snapshot' => true]);
$totalDailySales = $salesCollection->aggregate(
[
[
'$match' => [
'$expr' => [
'$gt' => ['$saleDate', [
'$dateSubtract' => [
'startDate' => '$$NOW',
'unit' => 'day',
'amount' => 1,
],
],
],
],
],
],
['$count' => 'totalDailySales'],
],
['session' => $session],
)->toArray()[0]->totalDailySales;
db = client.retail
with client.start_session(snapshot=True) as s:
_ = (
(
db.sales.aggregate(
[
{
"$match": {
"$expr": {
"$gt": [
"$saleDate",
{
"$dateSubtract": {
"startDate": "$$NOW",
"unit": "day",
"amount": 1,
}
},
]
}
}
},
{"$count": "totalDailySales"},
],
session=s,
)
).next()
)["totalDailySales"]
client = Mongo::Client.new(uri_string, database: 'retail')
client.start_session(snapshot: true) do |session|
total = client['sales'].aggregate([
{
'$match': {
'$expr': {
'$gt': [
'$saleDate',
{
'$dateSubtract': {
startDate: '$$NOW',
unit: 'day',
amount: 1
}
}
]
}
}
},
{ '$count': 'total_daily_sales' }
], session: session).first['total_daily_sales']
end

La consulta anterior:

  • Utiliza $match con $expr para especificar un filtro en el campo saleDate.

  • Utiliza el $gt operador y $dateSubtract la expresión para devolver documentos donde saleDate es mayor que un día antes del momento en que se ejecuta la consulta.

  • Utiliza $count para devolver un recuento de los documentos coincidentes. El recuento se almacena en la variable totalDailySales.

  • Especifica el nivel de consistencia de lectura "snapshot" para garantizar que la query se lleve a cabo en un solo punto en el tiempo.

La colección sales es bastante grande y, como resultado, puede tomar algunos minutos ejecutar esta query. Debido a que la tienda está en linea, las ventas pueden ocurrir en cualquier momento del día.

Por ejemplo, considere lo siguiente:

  • La consulta comienza a ejecutarse a las 12:00 AM.

  • Un cliente compra tres pares de zapatos a las 12:02 AM.

  • La consulta termina de ejecutarse a las 12:04 AM.

Si la consulta no utiliza el nivel de consistencia de lectura "snapshot", las ventas que ocurren entre el inicio y el término de la consulta pueden incluirse en el recuento de la consulta, aunque no ocurran en el día del reporte. Esto podría resultar en reportes inexactos con algunas ventas siendo contabilizadas dos veces.

Al especificar nivel de consistencia de lectura "snapshot"), la consulta solo devuelve datos que estaban presentes en la base de datos en un momento poco antes de que la consulta empezara a ejecutarse.

Nota

Si la query tarda más que el período de retención del historial de WiredTiger (300 segundos, por defecto), la query se produce un error de con un error SnapshotTooOld. Para aprender a configurar la retención de capturas instantáneas y habilitar consultas de mayor duración, consulte Configurar la retención de capturas instantáneas.

Por defecto, el motor de almacenamiento WiredTiger conserva el historial durante 300 segundos. Puedes usar una sesión con snapshot=true durante un total de 300 segundos desde la primera operación en la sesión hasta la última. Si utilizas la sesión durante un período de tiempo más prolongado, la sesión falla con un error SnapshotTooOld. De manera similar, si consultas datos utilizando nivel de consistencia de lectura "snapshot" y tu consulta dura más de 300 segundos, la consulta falla.

Si tu query o sesión se ejecuta durante más de 300 segundos, considera aumentar el período de retención de snapshots. Para aumentar el periodo de retención, modifique el parámetro minSnapshotHistoryWindowInSeconds.

Por ejemplo, este comando configura el valor de minSnapshotHistoryWindowInSeconds a 600 segundos:

db.adminCommand( { setParameter: 1, minSnapshotHistoryWindowInSeconds: 600 } )

Importante

Para modificar minSnapshotHistoryWindowInSeconds para un clúster de MongoDB Atlas, debes ponerte en contacto con Soporte de Atlas.

Incrementar el valor de minSnapshotHistoryWindowInSeconds incrementa el uso del disco porque el servidor debe mantener el historial de valores modificados anteriores dentro de la ventana de tiempo especificada. La cantidad de espacio en disco utilizado depende de la carga de trabajo, y las cargas de trabajo de mayor volumen requieren más espacio en disco.

Volver

Campos nulos o faltantes

En esta página