Nota
Pipeline de agregación como alternativa
A partir de MongoDB 5.0, El map-reduce está en desuso:
En lugar de map-reduce, debería usar una canalización de agregación. Las canalizaciones de agregación ofrecen mejor rendimiento y usabilidad que map-reduce.
Puedes reescribir las operaciones de map-reduce usando etapas del pipeline de agregación, como
$group,$mergey otros.Para las operaciones map-reduce que requieren una funcionalidad personalizada, puedes utilizar los operadores de agregación
$accumulatory$function. Puedes utilizar esos operadores para definir expresiones de agregación personalizadas en JavaScript.
Para ejemplos de alternativas de pipeline de agregación a map-reduce, consulte:
Esta sección tiene un ejemplo de alternativa de pipeline de agregación para Map-Reduce que no utiliza una función personalizada. Para un ejemplo que utilice una función personalizada, consulta map-reduce a pipeline de agregación.
Para realizar operaciones de reducción de mapas, MongoDB proporciona el comando y,mapReduce en mongosh, el método contenedor db.collection.mapReduce().
Si el conjunto de datos de map-reduce está creciendo constantemente, puede que desees realizar un map-reduce incremental en lugar de realizar la operación de map-reduce sobre el conjunto de datos completo cada vez.
Para realizar el mapa-reducción incremental:
Ejecute un trabajo de map-reduce sobre la colección actual y envíe el resultado a una colección separada.
Cuando tengas más datos para procesar, ejecuta los siguientes trabajos de map-reduce con:
el parámetro
queryque especifica condiciones que solo coinciden con los nuevos documentos.el parámetro
outque especifica la acciónreducepara fusionar los nuevos resultados en la colección de resultados existente.
Considera el siguiente ejemplo donde programas una operación de map-reduce en una colección usersessions para ejecutarse al final de cada día.
Configurar datos
La colección usersessions contiene documentos que registran las sesiones de los usuarios cada día, por ejemplo:
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
Map-Reduce inicial de la colección actual
Ejecute la primera operación map-reduce de la siguiente manera:
Define la función de mapeo que asigna el
userida un objeto que contiene los campostotal_time,countyavg_time:var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; Define la función de reducción correspondiente con dos argumentos
keyyvaluespara calcular el tiempo total y el recuento. Elkeycorresponde aluserid, y elvalueses un arreglo cuyos elementos corresponden a los objetos individuales mapeados aluseriden elmapFunction.var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; Define la función de finalización con dos argumentos
keyyreducedValue. La función modifica el documentoreducedValuepara añadir otro campoaveragey devuelve el documento modificado.var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; Realiza map-reduce en la colección
usersessionsusando las funcionesmapFunction,reduceFunctionyfinalizeFunction. Envía los resultados a una colecciónsession_stats. Si la colecciónsession_statsya existe, la operación reemplazará el contenido:db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) Query la colección
session_statspara verificar los resultados:db.session_stats.find().sort( { _id: 1 } ) La operación devuelve los siguientes documentos:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
Map-Reduce incremental posterior
Luego, a medida que la colección usersessions crece, puedes ejecutar operaciones de map-reduce adicionales. Por ejemplo, agregar nuevos documentos a la colección usersessions:
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
Al final del día, realiza un map-reduce incremental en la colección usersessions, pero utiliza el campo query para seleccionar solo los nuevos documentos. Envie los resultados a la colección session_stats, pero reduce el contenido con los resultados del map-reduce incremental:
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
Query la colección session_stats para verificar los resultados:
db.session_stats.find().sort( { _id: 1 } )
La operación devuelve los siguientes documentos:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
Alternativa de agregación
Como alternativa a map-reduce, puede utilizar un pipeline de agregación que combina las etapas $group y $merge para lograr el mismo resultado en una operación más flexible.
Recrear la colección usersessions:
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
Usando los operadores disponibles del pipeline de agregación, puedes reescribir el ejemplo de map-reduce sin definir funciones personalizadas:
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
El
$groupse agrupa por eluseridy calcula:El
total_timeusando el$sumoperadorEl
countusando el$sumoperadorEl
avg_timeusando el$avgoperador
La operación devuelve los siguientes documentos:
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } La etapa
$projectmodifica el documento de salida para reflejar la salida de map-reduce para tener dos campos_idyvalue. La etapa es opcional si no es necesario reflejar la estructura_idyvalue.{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } La etapa genera los resultados en
$mergeunasession_stats_aggcolección. Si un documento existente tiene el mismo_idvalor que el nuevo resultado, la operación aplica la canalización especificada para calcular el tiempo total, el recuento y el tiempo promedio a partir del resultado y del documento existente. Si no existe ningún documento con el mismo_idvalorsession_stats_aggen, la operación inserta el documento.Query la colección
session_stats_aggpara verificar los resultados:db.session_stats_agg.find().sort( { _id: 1 } ) La operación devuelve los siguientes documentos:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } Añadir nuevos documentos a la colección
usersessions:db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) Agrega una etapa
$matchal inicio de la pipeline para especificar el filtro de fecha:db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) Query la colección
session_stats_aggpara verificar los resultados:db.session_stats_agg.find().sort( { _id: 1 } ) La operación devuelve los siguientes documentos:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } Opcional. Para evitar tener que modificar la
$matchcondición de fecha de la canalización de agregación cada vez que se ejecuta, puede definir la función de envoltura de la agregación en una función auxiliar:updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; Luego, para ejecutarlo, simplemente deberá pasar la fecha de inicio a la función
updateSessionStats():updateSessionStats(ISODate('2020-03-05 00:00:00'))