可以使用聚合管道操作符重写 Map-reduce 操作,例如 、 $group 、 $merge等。
对于需要自定义功能的 map-reduce 操作,MongoDB 提供了 $accumulator 和 $function 聚合操作符。可以使用这些操作符在 JavaScript 中定义自定义聚合表达式。
您可以重写 map-reduce 表达式,如以下各部分所示。
Map-reduce 到聚合管道转换表
该表仅为一个近似转换。例如,该表显示了使用 $project 对 mapFunction 进行近似转换。
- 但是, - mapFunction逻辑可能需要其他阶段,例如当此逻辑包含数组迭代时:- function() { - this.items.forEach(function(item){ emit(item.sku, 1); }); - } - 然后,该聚合管道会包括 - $unwind和- $project:- { $unwind: "$items "}, - { $project: { emits: { key: { "$items.sku" }, value: 1 } } }, 
- $project中的- emits字段可能被命名为其他名称。为进行直观比较,我们选择了字段名称- emits。
| Map-Reduce | 聚合管道 (Aggregation Pipeline) | 
|---|---|
| db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: <collection> } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } }, { $out: <collection> } ] ) | 
| db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { replace: <collection>, db:<db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } }, { $out: { db: <db>, coll: <collection> } } ] ) | 
| db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } }, { $merge: { into: { db: <db>, coll: <collection>}, on: "_id" whenMatched: "replace", whenNotMatched: "insert" } }, ] ) | 
| db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { reduce: <collection>, db: <db> } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } }, { $merge: { into: { db: <db>, coll: <collection> }, on: "_id" whenMatched: [ { $project: { value: { $function: { body: <reduceFunction>, args: [ "$_id", [ "$value", "$$new.value" ] ], lang: "js" } } } } ] whenNotMatched: "insert" } }, ] ) | 
| db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { inline: 1 } } ) | db.collection.aggregate( [ { $match: <queryFilter> }, { $sort: <sortOrder> }, { $limit: <number> }, { $project: { emits: { k: <expression>, v: <expression> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <initCode>, accumulate: <reduceFunction>, accumulateArgs: [ "$emit.v"], merge: <reduceFunction>, finalize: <finalizeFunction>, lang: "js" }} } } ] ) | 
示例
可以使用聚合管道操作符(例如$group 、 $merge等)重写各种 map-reduce 表达式,而无需使用自定义函数。 但是,为便于说明,以下示例提供了两种替代方案。
示例 1
针对 orders 集合的以下 map-reduce 操作会按 cust_id 进行分组,并为每个 cust_id 计算 price 之和
var mapFunction1 = function() {    emit(this.cust_id, this.price); }; var reduceFunction1 = function(keyCustId, valuesPrices) {    return Array.sum(valuesPrices); }; db.orders.mapReduce(    mapFunction1,    reduceFunction1,    { out: "map_reduce_example" } ) 
替代方案 1:(推荐)您可将此操作重写为聚合管道,而无需将 map-reduce 函数转换为等效的管道阶段:
db.orders.aggregate([    { $group: { _id: "$cust_id", value: { $sum: "$price" } } },    { $out: "agg_alternative_1" } ]) 
替代方案 2:(仅用于说明)通过使用 $accumulator 来定义自定义函数,以下聚合管道提供对各种 map-reduce 函数的转换:
db.orders.aggregate( [    { $project: { emit: { key: "$cust_id", value: "$price" } } },  // equivalent to the map function    { $group: {                                                    // equivalent to the reduce function          _id: "$emit.key",          valuesPrices: { $accumulator: {                      init: function() { return 0; },                      initArgs: [],                      accumulate: function(state, value) { return state + value; },                      accumulateArgs: [ "$emit.value" ],                      merge: function(state1, state2) { return state1 + state2; },                      lang: "js"          } }    } },    { $out: "agg_alternative_2" } ] ) 
- 首先, - $project阶段会输出包含- emit字段的文档。- emit字段是包含以下字段的文档:- key包含该文档的- cust_id值
- value包含该文档的- price值
 - { "_id" : 1, "emit" : { "key" : "Ant O. Knee", "value" : 25 } } - { "_id" : 2, "emit" : { "key" : "Ant O. Knee", "value" : 70 } } - { "_id" : 3, "emit" : { "key" : "Busby Bee", "value" : 50 } } - { "_id" : 4, "emit" : { "key" : "Busby Bee", "value" : 25 } } - { "_id" : 5, "emit" : { "key" : "Busby Bee", "value" : 50 } } - { "_id" : 6, "emit" : { "key" : "Cam Elot", "value" : 35 } } - { "_id" : 7, "emit" : { "key" : "Cam Elot", "value" : 25 } } - { "_id" : 8, "emit" : { "key" : "Don Quis", "value" : 75 } } - { "_id" : 9, "emit" : { "key" : "Don Quis", "value" : 55 } } - { "_id" : 10, "emit" : { "key" : "Don Quis", "value" : 25 } } 
- 然后, - $group会使用- $accumulator运算符添加发出的值:- { "_id" : "Don Quis", "valuesPrices" : 155 } - { "_id" : "Cam Elot", "valuesPrices" : 60 } - { "_id" : "Ant O. Knee", "valuesPrices" : 95 } - { "_id" : "Busby Bee", "valuesPrices" : 125 } 
- 最后, - $out在集合- agg_alternative_2中写入输出结果。或者,你可以使用- $merge代替- $out。
示例 2
针对 orders 集合的以下 map-reduce 操作会按 item.sku 字段进行分组,并为每个 sku 计算订单数和订购总量。然后,此操作会为每个 sku 值计算每个订单的平均数量,并将计算结果合并到输出集合中。
var mapFunction2 = function() {       for (var idx = 0; idx < this.items.length; idx++) {          var key = this.items[idx].sku;          var value = { count: 1, qty: this.items[idx].qty };          emit(key, value);       } }; var reduceFunction2 = function(keySKU, countObjVals) {    reducedVal = { count: 0, qty: 0 };    for (var idx = 0; idx < countObjVals.length; idx++) {          reducedVal.count += countObjVals[idx].count;          reducedVal.qty += countObjVals[idx].qty;    }    return reducedVal; }; var finalizeFunction2 = function (key, reducedVal) {    reducedVal.avg = reducedVal.qty/reducedVal.count;    return reducedVal; }; db.orders.mapReduce(    mapFunction2,    reduceFunction2,    {       out: { merge: "map_reduce_example2" },       query: { ord_date: { $gte: new Date("2020-03-01") } },       finalize: finalizeFunction2    }    ); 
替代方案 1:(推荐)您可将此操作重写为聚合管道,而无需将 map-reduce 函数转换为等效的管道阶段:
db.orders.aggregate( [    { $match: { ord_date: { $gte: new Date("2020-03-01") } } },    { $unwind: "$items" },    { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } }  },    { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },    { $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace",  whenNotMatched: "insert" } } ] ) 
替代方案 2:(仅用于说明)通过使用 $accumulator 来定义自定义函数,以下聚合管道提供对各种 map-reduce 函数的转换:
db.orders.aggregate( [       { $match: { ord_date: {$gte: new Date("2020-03-01") } } },       { $unwind: "$items" },       { $project: { emit: { key: "$items.sku", value: { count: { $literal: 1 }, qty: "$items.qty" } } } },       { $group: {             _id: "$emit.key",             value: { $accumulator: {                init: function() { return { count: 0, qty: 0 }; },                initArgs: [],                accumulate: function(state, value) {                   state.count += value.count;                   state.qty += value.qty;                   return state;                },                accumulateArgs: [ "$emit.value" ],                merge: function(state1, state2) {                   return { count: state1.count + state2.count, qty: state1.qty + state2.qty };                },                finalize: function(state) {                   state.avg = state.qty / state.count;                   return state;                },                lang: "js"}             }       } },       { $merge: {          into: "agg_alternative_4",          on: "_id",          whenMatched: "replace",          whenNotMatched: "insert"       } } ] ) 
- $match阶段仅选择- ord_date大于等于- new Date("2020-03-01")的文档。
- $unwind阶段按- items数组字段对文档进行分解,为每个数组元素输出一个文档。例如:- { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" } - { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" } - { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" } - { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } - { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } - { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" } - { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } - { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } - ... 
- $project阶段会输出包含- emit字段的文档。- emit字段是包含以下字段的文档:- key包含- items.sku值
- value包含具有- qty值和- count值的文档
 - { "_id" : 1, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 5 } } } - { "_id" : 1, "emit" : { "key" : "apples", "value" : { "count" : 1, "qty" : 5 } } } - { "_id" : 2, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 8 } } } - { "_id" : 2, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } } - { "_id" : 3, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } } - { "_id" : 3, "emit" : { "key" : "pears", "value" : { "count" : 1, "qty" : 10 } } } - { "_id" : 4, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } } - { "_id" : 5, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } } - ... 
- $group使用- $accumulator运算符会将发出的- count和- qty相加并计算- avg字段:- { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } } - { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } } - { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } } - { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } } - { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } } 
- 最后, - $merge将输出写入集合- agg_alternative_4。如果现有文档的键- _id与新结果相同,则操作会覆盖现有文档。如果没有具有相同键的现有文档,操作将插入该文档。