Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /
/ / /

Realizar Map-Reduce incremental

Nota

Pipeline de agregación como alternativa

A partir de MongoDB 5.0, El map-reduce está en desuso:

Para ejemplos de alternativas de pipeline de agregación a map-reduce, consulte:

Esta sección tiene un ejemplo de alternativa de pipeline de agregación para Map-Reduce que no utiliza una función personalizada. Para un ejemplo que utilice una función personalizada, consulta map-reduce a pipeline de agregación.

Para realizar operaciones de reducción de mapas, MongoDB proporciona el comando y,mapReduce en mongosh, el método contenedor db.collection.mapReduce().

Si el conjunto de datos de map-reduce está creciendo constantemente, puede que desees realizar un map-reduce incremental en lugar de realizar la operación de map-reduce sobre el conjunto de datos completo cada vez.

Para realizar el mapa-reducción incremental:

  1. Ejecute un trabajo de map-reduce sobre la colección actual y envíe el resultado a una colección separada.

  2. Cuando tengas más datos para procesar, ejecuta los siguientes trabajos de map-reduce con:

    • el parámetro query que especifica condiciones que solo coinciden con los nuevos documentos.

    • el parámetro out que especifica la acción reduce para fusionar los nuevos resultados en la colección de resultados existente.

Considera el siguiente ejemplo donde programas una operación de map-reduce en una colección usersessions para ejecutarse al final de cada día.

La colección usersessions contiene documentos que registran las sesiones de los usuarios cada día, por ejemplo:

db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Ejecute la primera operación map-reduce de la siguiente manera:

  1. Define la función de mapeo que asigna el userid a un objeto que contiene los campos total_time, count y avg_time:

    var mapFunction = function() {
    var key = this.userid;
    var value = { total_time: this.length, count: 1, avg_time: 0 };
    emit( key, value );
    };
  2. Define la función de reducción correspondiente con dos argumentos key y values para calcular el tiempo total y el recuento. El key corresponde al userid, y el values es un arreglo cuyos elementos corresponden a los objetos individuales mapeados al userid en el mapFunction.

    var reduceFunction = function(key, values) {
    var reducedObject = { total_time: 0, count:0, avg_time:0 };
    values.forEach(function(value) {
    reducedObject.total_time += value.total_time;
    reducedObject.count += value.count;
    });
    return reducedObject;
    };
  3. Define la función de finalización con dos argumentos key y reducedValue. La función modifica el documento reducedValue para añadir otro campo average y devuelve el documento modificado.

    var finalizeFunction = function(key, reducedValue) {
    if (reducedValue.count > 0)
    reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    return reducedValue;
    };
  4. Realiza map-reduce en la colección usersessions usando las funciones mapFunction, reduceFunction y finalizeFunction. Envía los resultados a una colección session_stats. Si la colección session_stats ya existe, la operación reemplazará el contenido:

    db.usersessions.mapReduce(
    mapFunction,
    reduceFunction,
    {
    out: "session_stats",
    finalize: finalizeFunction
    }
    )
  5. Query la colección session_stats para verificar los resultados:

    db.session_stats.find().sort( { _id: 1 } )

    La operación devuelve los siguientes documentos:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

Luego, a medida que la colección usersessions crece, puedes ejecutar operaciones de map-reduce adicionales. Por ejemplo, agregar nuevos documentos a la colección usersessions:

db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])

Al final del día, realiza un map-reduce incremental en la colección usersessions, pero utiliza el campo query para seleccionar solo los nuevos documentos. Envie los resultados a la colección session_stats, pero reduce el contenido con los resultados del map-reduce incremental:

db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);

Query la colección session_stats para verificar los resultados:

db.session_stats.find().sort( { _id: 1 } )

La operación devuelve los siguientes documentos:

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

Como alternativa a map-reduce, puede utilizar un pipeline de agregación que combina las etapas $group y $merge para lograr el mismo resultado en una operación más flexible.

Recrear la colección usersessions:

db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Usando los operadores disponibles del pipeline de agregación, puedes reescribir el ejemplo de map-reduce sin definir funciones personalizadas:

db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
  1. El $group se agrupa por el userid y calcula:

    • El total_time usando el $sum operador

    • El count usando el $sum operador

    • El avg_time usando el $avg operador

    La operación devuelve los siguientes documentos:

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. La etapa $project modifica el documento de salida para reflejar la salida de map-reduce para tener dos campos _id y value. La etapa es opcional si no es necesario reflejar la estructura _id y value.

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. La etapa genera los resultados en $merge una session_stats_agg colección. Si un documento existente tiene el mismo _id valor que el nuevo resultado, la operación aplica la canalización especificada para calcular el tiempo total, el recuento y el tiempo promedio a partir del resultado y del documento existente. Si no existe ningún documento con el mismo _id valor session_stats_agg en, la operación inserta el documento.

  4. Query la colección session_stats_agg para verificar los resultados:

    db.session_stats_agg.find().sort( { _id: 1 } )

    La operación devuelve los siguientes documentos:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. Añadir nuevos documentos a la colección usersessions:

    db.usersessions.insertMany([
    { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
    { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
    { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
    { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. Agrega una etapa $match al inicio de la pipeline para especificar el filtro de fecha:

    db.usersessions.aggregate([
    { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ])
  7. Query la colección session_stats_agg para verificar los resultados:

    db.session_stats_agg.find().sort( { _id: 1 } )

    La operación devuelve los siguientes documentos:

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. Opcional. Para evitar tener que modificar la $match condición de fecha de la canalización de agregación cada vez que se ejecuta, puede definir la función de envoltura de la agregación en una función auxiliar:

    updateSessionStats = function(startDate) {
    db.usersessions.aggregate([
    { $match: { ts: { $gte: startDate } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ]);
    };

    Luego, para ejecutarlo, simplemente deberá pasar la fecha de inicio a la función updateSessionStats():

    updateSessionStats(ISODate('2020-03-05 00:00:00'))

Tip

Volver

Ejemplos

En esta página