Nota
Pipeline de agregación como alternativa
A partir de MongoDB 5.0, map-reduce está obsoleto:
En lugar de map-reduce, debería usar una canalización de agregación. Las canalizaciones de agregación ofrecen mejor rendimiento y usabilidad que map-reduce.
Puede reescribir las operaciones de map-reduce utilizando etapas de canalización de agregación, como
$group, y$mergeotros.Para las operaciones map-reduce que requieren una funcionalidad personalizada, puedes utilizar los operadores de agregación
$accumulatory$function. Puedes utilizar esos operadores para definir expresiones de agregación personalizadas en JavaScript.
Para ejemplos de alternativas de pipeline de agregación a map-reduce, consulte:
Esta sección presenta un ejemplo de canalización de agregación alternativa a map-reduce que no utiliza una función personalizada. Para ver un ejemplo que utiliza una función personalizada, consulte Map-Reduce a canalización de agregación.
Para realizar operaciones de reducción de mapas, MongoDB proporciona el comando y,mapReduce en mongosh, el método db.collection.mapReduce() contenedor.
Si el conjunto de datos de mapa-reducción crece constantemente, es posible que desee realizar una reducción de mapa incremental en lugar de realizar la operación de mapa-reducción en todo el conjunto de datos cada vez.
Para realizar una reducción de mapa incremental:
Ejecute un trabajo de map-reduce sobre la colección actual y envíe el resultado a una colección separada.
Cuando tenga más datos para procesar, ejecute trabajos de map-reduce posteriores con:
el
queryparámetro que especifica las condiciones que coinciden solo con los nuevos documentos.el parámetro
outque especifica la acciónreducepara fusionar los nuevos resultados en la colección de salida existente.
Considere el siguiente ejemplo donde programa una operación de mapa-reducción en una colección usersessions para que se ejecute al final de cada día.
Configuración de datos
La colección usersessions contiene documentos que registran las sesiones de los usuarios cada día, por ejemplo:
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
Mapa inicial reducido de la colección actual
Ejecute la primera operación map-reduce de la siguiente manera:
Define la función de mapa que asigna
userida un objeto que contiene los campostotal_time,countyavg_time:var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; Define la función de reducción correspondiente con dos argumentos
keyyvaluespara calcular el tiempo total y el recuento. Elkeycorresponde aluserid, y elvalueses un arreglo cuyos elementos corresponden a los objetos individuales mapeados aluseriden elmapFunction.var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; Define la función de finalización con dos argumentos
keyyreducedValue. La función modifica el documentoreducedValuepara añadir otro campoaveragey devuelve el documento modificado.var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; Realiza map-reduce en la colección
usersessionsusando las funcionesmapFunction,reduceFunctionyfinalizeFunction. Envía los resultados a una colecciónsession_stats. Si la colecciónsession_statsya existe, la operación reemplazará el contenido:db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) Consulta la colección
session_statspara verificar los resultados:db.session_stats.find().sort( { _id: 1 } ) La operación devuelve los siguientes documentos:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
Mapa-reducción incremental subsiguiente
Luego, a medida que la colección usersessions crece, puedes ejecutar operaciones de map-reduce adicionales. Por ejemplo, agregar nuevos documentos a la colección usersessions:
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
Al final del día, ejecute map-reduce incremental en la colección usersessions, pero use el campo query para seleccionar solo los nuevos documentos. Envíe los resultados a la colección session_stats, pero reduce el contenido con los resultados del map-reduce incremental:
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
Consulta la colección session_stats para verificar los resultados:
db.session_stats.find().sort( { _id: 1 } )
La operación devuelve los siguientes documentos:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
Alternativa de agregación
Como alternativa a map-reduce, puede utilizar una canalización de agregación que combine $group las $merge etapas y para lograr el mismo resultado en una operación más flexible.
Recrear la colección usersessions:
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
Al utilizar los operadores de canalización de agregación disponibles, puede reescribir el ejemplo de map-reduce sin definir funciones personalizadas:
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
Los se agrupan por
$grouplosuseridy calculan:El
total_timeusando el$sumoperadorEl
countusando el$sumoperadorEl
avg_timeusando el$avgoperador
La operación devuelve los siguientes documentos:
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } La etapa modifica la estructura del documento de salida para reflejar la salida de map-reduce y tener dos
$projectcampos:_idvaluey. Esta etapa es opcional si no necesita reflejar la_idvalueestructura y.{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } La etapa genera los resultados en
$mergeunasession_stats_aggcolección. Si un documento existente tiene el mismo_idvalor que el nuevo resultado, la operación aplica la canalización especificada para calcular el tiempo total, el recuento y el tiempo promedio a partir del resultado y del documento existente. Si no existe ningún documento con el mismo_idvalorsession_stats_aggen, la operación inserta el documento.Consulta la colección
session_stats_aggpara verificar los resultados:db.session_stats_agg.find().sort( { _id: 1 } ) La operación devuelve los siguientes documentos:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } Añadir nuevos documentos a la colección
usersessions:db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) Agrega una etapa
$matchal inicio de la pipeline para especificar el filtro de fecha:db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) Consulta la colección
session_stats_aggpara verificar los resultados:db.session_stats_agg.find().sort( { _id: 1 } ) La operación devuelve los siguientes documentos:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } Opcional. Para evitar tener que modificar la
$matchcondición de fecha de la canalización de agregación cada vez que se ejecuta, puede definir la función de envoltura de la agregación en una función auxiliar:updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; Luego, para ejecutarlo, simplemente deberá pasar la fecha de inicio a la función
updateSessionStats():updateSessionStats(ISODate('2020-03-05 00:00:00'))