Observação
Pipeline de agregação como alternativa
A partir do MongoDB , 5.0, map-reduce está obsoleto:
Em vez de map-reduce, você deve usar um aggregation pipeline. aggregation pipeline fornece melhor desempenho e usabilidade do que a redução de mapa.
Você pode reescrever operações de redução de mapa utilizando estágios do pipeline de agregação, como
$group,$mergee outros.Nas operações de map-reduce que exigem funcionalidade personalizada, você pode usar os operadores de agregação
$accumulatore$function. Você pode usar esses operadores para definir expressões de agregação personalizadas no JavaScript.
Para obter exemplos de alternativas de aggregation pipeline para map-reduce, consulte:
Esta seção tem um exemplo de alternativa de aggregation pipeline para map-reduce que não usa uma função personalizada. Para obter um exemplo que usa uma função personalizada, consulte map-reduce to pipeline de agregação.
Para executar operações de map-reduce , o MongoDB fornece o comando mapReduce e, no mongosh, o método wrapper db.collection.mapReduce() .
Se o conjunto de dados de map-reduce estiver crescendo constantemente, você pode querer executar um map-reduce incremental em vez de executar a operação de map-reduce sobre todo o conjunto de dados a cada vez.
Para executar o map-reduce incremental:
Execute uma tarefa de map-reduce sobre a collection atual e envie o resultado para uma collection separada.
Quando você tiver mais dados para processar, execute trabalhos de redução de mapa subsequentes com:
o parâmetro
queryque especifica condições que correspondem somente aos novos documentos.o parâmetro
outque especifica a açãoreducepara mesclar os novos resultados na coleta de saída existente.
Considere o exemplo a seguir, em que você agenda uma operação de map-reduce em uma collection usersessions para ser executada no final de cada dia.
Configuração de dados
A collection usersessions contém documento que registram as sessões dos usuários todos os dias, por exemplo:
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
map-reduce inicial da collection atual
Execute a primeira operação de map-reduce da seguinte forma:
Defina a função de mapa que mapeia o
useridpara um objeto que contém os campostotal_time,counteavg_time:var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; Defina a função de redução correspondente com dois argumentos
keyevaluespara calcular o tempo total e a contagem. Okeycorresponde aouserid, e ovaluesé uma array cujos elementos correspondem aos objetos individuais mapeados para ouseridnomapFunction.var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; Defina a função de finalização com dois argumentos
keyereducedValue. A função modifica o documentoreducedValuepara adicionar outro campoaveragee retorna o documento modificado.var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; Execute o map-reduce na collection do
usersessionsutilizando as funçõesmapFunction,reduceFunctionefinalizeFunction. Enviar os resultados para uma collectionsession_stats. Se a collectionsession_statsjá existir, a operação substituirá o conteúdo:db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) Consulte a coleção
session_statspara verificar os resultados:db.session_stats.find().sort( { _id: 1 } ) A operação retorna os seguintes documentos:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
Mapeamento incremental subsequente-redução
Posteriormente, à medida que a collection usersessions cresce, você poderá executar operações adicionais de map-reduce. Por exemplo, adicione novos documentos à collection usersessions :
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
No final do dia, execute a redução de mapa incremental na coleção do usersessions , mas utilize o campo query para selecionar somente os novos documentos. Envie os resultados para a collection session_stats, mas reduce o conteúdo com os resultados do map-reduce incremental:
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
Consulte a coleção session_stats para verificar os resultados:
db.session_stats.find().sort( { _id: 1 } )
A operação retorna os seguintes documentos:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
Alternativa de aggregation
Como alternativa ao map-reduce, você pode usar um aggregation pipeline que combina os estágios $group e $merge para obter o mesmo resultado em uma operação mais flexível.
Recriar a collection usersessions :
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
Usando os operadores de aggregation pipeline disponíveis, você pode reescrever o exemplo de map-reduce sem definir funções personalizadas:
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
O
$groupagrupa pelouseride calcula:A operação retorna os seguintes documentos:
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } O estágio
$projectremodela o documento de saída para espelhar a saída do map-reduce para ter dois campos_idevalue. O estágio é opcional se você não precisar espelhar a estrutura_idevalue.{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } O estágio
$mergegera resultados para uma collectionsession_stats_agg. Se um documento existente tiver o mesmo_idque o novo resultado, a operação aplicará o pipeline especificado para calcular o total_time, count e avg_time a partir do resultado e do documento existente. Se não houver nenhum documento existente com o mesmo_idnosession_stats_agg, a operação inserirá o documento.Consulte a coleção
session_stats_aggpara verificar os resultados:db.session_stats_agg.find().sort( { _id: 1 } ) A operação retorna os seguintes documentos:
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } Adicionar novo documento à collection
usersessions:db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) Adicione um estágio
$matchno início do pipeline para especificar o filtro de data:db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) Consulte a coleção
session_stats_aggpara verificar os resultados:db.session_stats_agg.find().sort( { _id: 1 } ) A operação retorna os seguintes documentos:
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } Opcional. Para evitar a necessidade de modificar a condição de data
$matchdo aggregation pipeline toda vez que você executa, você pode definir o encapsulamento do aggregation em uma função auxiliar:updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; Em seguida, para executar, você apenas passaria a data de início para a função
updateSessionStats():updateSessionStats(ISODate('2020-03-05 00:00:00'))