Docs Menu
Docs Home
/ /
/ / /

Realizar Map-Reduce incremental

Nota

Pipeline de agregación como alternativa

A partir de MongoDB 5.0, map-reduce está obsoleto:

Para ejemplos de alternativas de pipeline de agregación a map-reduce, consulte:

Esta sección presenta un ejemplo de canalización de agregación alternativa a map-reduce que no utiliza una función personalizada. Para ver un ejemplo que utiliza una función personalizada, consulte Map-Reduce a canalización de agregación.

Para realizar operaciones de reducción de mapas, MongoDB proporciona el comando y,mapReduce en mongosh, el método db.collection.mapReduce() contenedor.

Si el conjunto de datos de mapa-reducción crece constantemente, es posible que desee realizar una reducción de mapa incremental en lugar de realizar la operación de mapa-reducción en todo el conjunto de datos cada vez.

Para realizar una reducción de mapa incremental:

  1. Ejecute un trabajo de map-reduce sobre la colección actual y envíe el resultado a una colección separada.

  2. Cuando tenga más datos para procesar, ejecute trabajos de map-reduce posteriores con:

    • el query parámetro que especifica las condiciones que coinciden solo con los nuevos documentos.

    • el parámetro out que especifica la acción reduce para fusionar los nuevos resultados en la colección de salida existente.

Considere el siguiente ejemplo donde programa una operación de mapa-reducción en una colección usersessions para que se ejecute al final de cada día.

La colección usersessions contiene documentos que registran las sesiones de los usuarios cada día, por ejemplo:

db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Ejecute la primera operación map-reduce de la siguiente manera:

  1. Define la función de mapa que asigna userid a un objeto que contiene los campos total_time, count y avg_time:

    var mapFunction = function() {
    var key = this.userid;
    var value = { total_time: this.length, count: 1, avg_time: 0 };
    emit( key, value );
    };
  2. Define la función de reducción correspondiente con dos argumentos key y values para calcular el tiempo total y el recuento. El key corresponde al userid, y el values es un arreglo cuyos elementos corresponden a los objetos individuales mapeados al userid en el mapFunction.

    var reduceFunction = function(key, values) {
    var reducedObject = { total_time: 0, count:0, avg_time:0 };
    values.forEach(function(value) {
    reducedObject.total_time += value.total_time;
    reducedObject.count += value.count;
    });
    return reducedObject;
    };
  3. Define la función de finalización con dos argumentos key y reducedValue. La función modifica el documento reducedValue para añadir otro campo average y devuelve el documento modificado.

    var finalizeFunction = function(key, reducedValue) {
    if (reducedValue.count > 0)
    reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    return reducedValue;
    };
  4. Realiza map-reduce en la colección usersessions usando las funciones mapFunction, reduceFunction y finalizeFunction. Envía los resultados a una colección session_stats. Si la colección session_stats ya existe, la operación reemplazará el contenido:

    db.usersessions.mapReduce(
    mapFunction,
    reduceFunction,
    {
    out: "session_stats",
    finalize: finalizeFunction
    }
    )
  5. Consulta la colección session_stats para verificar los resultados:

    db.session_stats.find().sort( { _id: 1 } )

    La operación devuelve los siguientes documentos:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

Luego, a medida que la colección usersessions crece, puedes ejecutar operaciones de map-reduce adicionales. Por ejemplo, agregar nuevos documentos a la colección usersessions:

db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])

Al final del día, ejecute map-reduce incremental en la colección usersessions, pero use el campo query para seleccionar solo los nuevos documentos. Envíe los resultados a la colección session_stats, pero reduce el contenido con los resultados del map-reduce incremental:

db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);

Consulta la colección session_stats para verificar los resultados:

db.session_stats.find().sort( { _id: 1 } )

La operación devuelve los siguientes documentos:

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

Como alternativa a map-reduce, puede utilizar una canalización de agregación que combine $group las $merge etapas y para lograr el mismo resultado en una operación más flexible.

Recrear la colección usersessions:

db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Al utilizar los operadores de canalización de agregación disponibles, puede reescribir el ejemplo de map-reduce sin definir funciones personalizadas:

db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
  1. Los se agrupan por $group los userid y calculan:

    • El total_time usando el $sum operador

    • El count usando el $sum operador

    • El avg_time usando el $avg operador

    La operación devuelve los siguientes documentos:

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. La etapa modifica la estructura del documento de salida para reflejar la salida de map-reduce y tener dos $project campos:_id valuey. Esta etapa es opcional si no necesita reflejar la _id value estructura y.

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. La etapa genera los resultados en $merge una session_stats_agg colección. Si un documento existente tiene el mismo _id valor que el nuevo resultado, la operación aplica la canalización especificada para calcular el tiempo total, el recuento y el tiempo promedio a partir del resultado y del documento existente. Si no existe ningún documento con el mismo _id valor session_stats_agg en, la operación inserta el documento.

  4. Consulta la colección session_stats_agg para verificar los resultados:

    db.session_stats_agg.find().sort( { _id: 1 } )

    La operación devuelve los siguientes documentos:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. Añadir nuevos documentos a la colección usersessions:

    db.usersessions.insertMany([
    { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
    { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
    { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
    { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. Agrega una etapa $match al inicio de la pipeline para especificar el filtro de fecha:

    db.usersessions.aggregate([
    { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ])
  7. Consulta la colección session_stats_agg para verificar los resultados:

    db.session_stats_agg.find().sort( { _id: 1 } )

    La operación devuelve los siguientes documentos:

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. Opcional. Para evitar tener que modificar la $match condición de fecha de la canalización de agregación cada vez que se ejecuta, puede definir la función de envoltura de la agregación en una función auxiliar:

    updateSessionStats = function(startDate) {
    db.usersessions.aggregate([
    { $match: { ts: { $gte: startDate } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ]);
    };

    Luego, para ejecutarlo, simplemente deberá pasar la fecha de inicio a la función updateSessionStats():

    updateSessionStats(ISODate('2020-03-05 00:00:00'))

Tip

Volver

Ejemplos

En esta página