Docs Menu
Docs Home
/ /
Map-Reduce
/ / /

Map-Reduce al pipeline de agregación

An La canalización de agregación proporciona un mejor rendimiento y facilidad de uso que una operación de mapa-reducción.

Las operaciones de mapa-reducción se pueden reescribir utilizando operadores de canalización de agregación, como $group, y$merge otros.

Para operaciones de map-reduce que requieren funcionalidad personalizada, MongoDB proporciona los $accumulator operadores de agregación y. Úselos para definir expresiones de agregación personalizadas en JavaScript.$function

Las expresiones de mapa-reducción se pueden reescribir como se muestra en las siguientes secciones.

La tabla es solo una traducción aproximada. Por ejemplo, la tabla muestra una traducción aproximada de mapFunction $projectusando.

  • Sin embargo, la lógica mapFunction puede requerir etapas adicionales, como si la lógica incluye iteración sobre una matriz:

    function() {
    this.items.forEach(function(item){ emit(item.sku, 1); });
    }

    Luego, la canalización de agregación incluye un $unwind y $project un:

    { $unwind: "$items "},
    { $project: { emits: { key: { "$items.sku" }, value: 1 } } },
  • El emits campo en podría tener $project otro nombre. Para una comparación visual,emits se eligió el nombre de campo.

Map-Reduce
Pipeline de agregación
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: <collection>
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $out: <collection> }
] )
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { replace: <collection>, db:<db> }
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $out: { db: <db>, coll: <collection> } }
] )
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { merge: <collection>, db: <db> }
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $merge: {
into: { db: <db>, coll: <collection>},
on: "_id"
whenMatched: "replace",
whenNotMatched: "insert"
} },
] )
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { reduce: <collection>, db: <db> }
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} },
{ $merge: {
into: { db: <db>, coll: <collection> },
on: "_id"
whenMatched: [
{ $project: {
value: { $function: {
body: <reduceFunction>,
args: [
"$_id",
[ "$value", "$$new.value" ]
],
lang: "js"
} }
} }
]
whenNotMatched: "insert"
} },
] )
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { inline: 1 }
}
)
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: "$emits" },
{ $group: {
_id: "$emits.k"},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ "$emit.v"],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: "js" }}
} }
] )

Varias expresiones de map-reduce se pueden reescribir mediante operadores de canalización de agregación,$group $mergecomo, y ​​otros, sin necesidad de funciones personalizadas. Sin embargo, a modo de ejemplo, los siguientes ejemplos ofrecen ambas alternativas.

La siguiente operación de reducción de mapas en la colección orders se agrupa por cust_id y calcula la suma de price para cada cust_id:

var mapFunction1 = function() {
emit(this.cust_id, this.price);
};
var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices);
};
db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)

1Alternativa: (recomendada) Puede reescribir la operación en una canalización de agregación sin traducir la función map-reduce a etapas de canalización equivalentes:

db.orders.aggregate([
{ $group: { _id: "$cust_id", value: { $sum: "$price" } } },
{ $out: "agg_alternative_1" }
])

2Alternativa: (solo con fines ilustrativos) La siguiente secuencia de agregación proporciona una traducción de las distintas funciones de map-reduce, utilizando para definir funciones $accumulator personalizadas:

db.orders.aggregate( [
{ $project: { emit: { key: "$cust_id", value: "$price" } } }, // equivalent to the map function
{ $group: { // equivalent to the reduce function
_id: "$emit.key",
valuesPrices: { $accumulator: {
init: function() { return 0; },
initArgs: [],
accumulate: function(state, value) { return state + value; },
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) { return state1 + state2; },
lang: "js"
} }
} },
{ $out: "agg_alternative_2" }
] )
  1. Primero, la etapa genera documentos con $project un emit campo. El emit campo es un documento con los siguientes campos:

    • key que contiene el valor cust_id para el documento

    • value que contiene el valor price para el documento

    { "_id" : 1, "emit" : { "key" : "Ant O. Knee", "value" : 25 } }
    { "_id" : 2, "emit" : { "key" : "Ant O. Knee", "value" : 70 } }
    { "_id" : 3, "emit" : { "key" : "Busby Bee", "value" : 50 } }
    { "_id" : 4, "emit" : { "key" : "Busby Bee", "value" : 25 } }
    { "_id" : 5, "emit" : { "key" : "Busby Bee", "value" : 50 } }
    { "_id" : 6, "emit" : { "key" : "Cam Elot", "value" : 35 } }
    { "_id" : 7, "emit" : { "key" : "Cam Elot", "value" : 25 } }
    { "_id" : 8, "emit" : { "key" : "Don Quis", "value" : 75 } }
    { "_id" : 9, "emit" : { "key" : "Don Quis", "value" : 55 } }
    { "_id" : 10, "emit" : { "key" : "Don Quis", "value" : 25 } }
  2. Luego,$group usa el operador para sumar los valores $accumulator emitidos:

    { "_id" : "Don Quis", "valuesPrices" : 155 }
    { "_id" : "Cam Elot", "valuesPrices" : 60 }
    { "_id" : "Ant O. Knee", "valuesPrices" : 95 }
    { "_id" : "Busby Bee", "valuesPrices" : 125 }
  3. Finalmente, escribe la salida en $out la agg_alternative_2 colección. Como alternativa, podría usar en $merge lugar $out de.

La siguiente operación de map-reduce en la colección orders agrupa por el campo item.sku y calcula el número de pedidos y la cantidad total solicitada para cada SKU. A continuación, calcula la cantidad promedio por pedido para cada valor de SKU y fusiona los resultados en la colección de salida.

var mapFunction2 = function() {
for (var idx = 0; idx < this.items.length; idx++) {
var key = this.items[idx].sku;
var value = { count: 1, qty: this.items[idx].qty };
emit(key, value);
}
};
var reduceFunction2 = function(keySKU, countObjVals) {
reducedVal = { count: 0, qty: 0 };
for (var idx = 0; idx < countObjVals.length; idx++) {
reducedVal.count += countObjVals[idx].count;
reducedVal.qty += countObjVals[idx].qty;
}
return reducedVal;
};
var finalizeFunction2 = function (key, reducedVal) {
reducedVal.avg = reducedVal.qty/reducedVal.count;
return reducedVal;
};
db.orders.mapReduce(
mapFunction2,
reduceFunction2,
{
out: { merge: "map_reduce_example2" },
query: { ord_date: { $gte: new Date("2020-03-01") } },
finalize: finalizeFunction2
}
);

1Alternativa: (recomendada) Puede reescribir la operación en una canalización de agregación sin traducir la función map-reduce a etapas de canalización equivalentes:

db.orders.aggregate( [
{ $match: { ord_date: { $gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
{ $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
{ $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )

2Alternativa: (solo con fines ilustrativos) La siguiente secuencia de agregación proporciona una traducción de las distintas funciones de map-reduce, utilizando para definir funciones $accumulator personalizadas:

db.orders.aggregate( [
{ $match: { ord_date: {$gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $project: { emit: { key: "$items.sku", value: { count: { $literal: 1 }, qty: "$items.qty" } } } },
{ $group: {
_id: "$emit.key",
value: { $accumulator: {
init: function() { return { count: 0, qty: 0 }; },
initArgs: [],
accumulate: function(state, value) {
state.count += value.count;
state.qty += value.qty;
return state;
},
accumulateArgs: [ "$emit.value" ],
merge: function(state1, state2) {
return { count: state1.count + state2.count, qty: state1.qty + state2.qty };
},
finalize: function(state) {
state.avg = state.qty / state.count;
return state;
},
lang: "js"}
}
} },
{ $merge: {
into: "agg_alternative_4",
on: "_id",
whenMatched: "replace",
whenNotMatched: "insert"
} }
] )
  1. La etapa selecciona solo aquellos documentos $match con ord_date mayor o igual new Date("2020-03-01") a.

  2. La etapa desglosa el documento por $unwind el items campo de la matriz para generar un documento por cada elemento de la matriz. Por ejemplo:

    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    ...
  3. La etapa genera documentos con $project un emit campo. El emit campo es un documento con los siguientes campos:

    • key que contiene el valor items.sku

    • value que contiene un documento con el valor qty y un valor count

    { "_id" : 1, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 5 } } }
    { "_id" : 1, "emit" : { "key" : "apples", "value" : { "count" : 1, "qty" : 5 } } }
    { "_id" : 2, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 8 } } }
    { "_id" : 2, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
    { "_id" : 3, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
    { "_id" : 3, "emit" : { "key" : "pears", "value" : { "count" : 1, "qty" : 10 } } }
    { "_id" : 4, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } }
    { "_id" : 5, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } }
    ...
  4. El $group utiliza el operador para sumar $accumulator los count y emitidos qty y calcular el avg campo:

    { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
    { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
    { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
    { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
    { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
  5. Finalmente, escribe la salida en $merge la agg_alternative_4 colección. Si un documento existente tiene la misma clave _id que el nuevo resultado, la operación sobrescribe el documento existente. Si no existe ningún documento con la misma clave, la operación inserta el documento.

Tip

Volver

Solucionar problemas de reducción