참고
대안으로서의 집계 파이프라인
MongoDB 5.0 부터 맵 리듀스 는 더 이상 사용되지 않습니다.
맵 리듀스 대신 집계 파이프라인을 사용해야 합니다. 집계 파이프라인은 맵 리듀스보다 성능과 유용성 측면에서 더 우수합니다.
4}
$group$merge, 등과 같은 집계 파이프라인 단계를 사용하여 맵 축소 연산을 다시 작성할 수 있습니다.사용자 지정 기능이 필요한 맵 리듀스 작업의 경우
$accumulator및$function집계 연산자를 사용할 수 있습니다. 이러한 연산자를 사용하여 JavaScript에서 사용자 지정 집계 표현식을 정의할 수 있습니다.
맵 리듀스 대안으로서의 집계 파이프라인 예시는 다음을 참조하세요.
이 섹션에는 사용자 지정 함수를 사용하지 않는 맵 리듀스 대신 사용할 수 있는 집계 파이프라인 예시가 있습니다. 사용자 지정 함수를 사용하는 예제는 맵-리듀스에서 집계 파이프라인으로의 섹션을 참조하세요.
맵 리듀스 듀스 작업을 수행하기 위해 MongoDB 는 mapReduce 명령과 mongosh 에서 db.collection.mapReduce() 래퍼 메서드를 제공합니다.
맵 리듀스 데이터 세트가 지속적으로 증가하는 경우 매번 전체 데이터 세트에 대해 맵 리듀스 작업을 수행하는 대신 증분 맵 리듀스를 수행하는 것이 좋습니다.
증분 맵 리듀스를 수행하려면 다음을 수행합니다.
현재 collection에 대해 맵 리듀스 작업을 실행하고 결과를 별도의 collection에 출력합니다.
처리할 데이터가 더 많은 경우 다음을 사용하여 후속 맵 리듀스 작업을 실행합니다.
새 문서와 만 일치하는 조건을 지정하는
query매개변수입니다.새 결과를 기존 출력 collection에 병합하는
reduce조치를 지정하는out매개 변수입니다.
매일 하루가 끝날 때까지 usersessions 컬렉션에 대한 맵 리듀스 작업이 실행되도록 예약하는 다음 예시를 생각해 보세요.
데이터 설정
usersessions collection에는 매일 사용자의 세션을 기록하는 문서가 포함되어 있습니다.
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
초기 맵 리듀스 collection
다음과 같이 첫 번째 맵 리듀스 작업을 실행합니다.
userid을total_time,count및avg_time필드가 포함된 객체에 매핑하는 맵 함수를 정의합니다.var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; 두 개의 인수
key와values을 사용하여 해당 축소 함수를 정의하여 총 시간과 횟수를 계산합니다.key는userid에 해당하고,values는 요소가mapFunction의userid에 매핑된 개별 객체에 해당하는 배열입니다.var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; 두 개의 인수
key및reducedValue을 사용하여 최종 함수를 정의합니다. 이 함수는reducedValue문서를 수정하여 다른 필드average을 추가하고 수정된 문서를 반환합니다.var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; mapFunction,reduceFunction및finalizeFunction함수를 사용하여usersessions컬렉션에서 맵 리듀스를 수행합니다. 결과를 collectionsession_stats에 출력합니다.session_statscollection이 이미 존재하는 경우 작업은 해당 콘텐츠를 대체합니다.db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) session_stats컬렉션을 쿼리하여 결과를 확인합니다.db.session_stats.find().sort( { _id: 1 } ) 이 작업은 다음 문서를 반환합니다.
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
후속 증분 맵-리듀스
나중에 usersessions collection이 커짐에 따라 추가 맵 리듀스 작업을 실행할 수 있습니다. 예를 들어, usersessions collection에 새 문서를 추가합니다.
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
하루가 끝나면 usersessions collection에 대해 증분 맵 리듀스를 수행하되 query 필드를 사용하여 새 문서만 선택합니다. 결과를 collection session_stats 에 출력하지만 reduce 내용을 증분 맵 리듀스 결과로 출력합니다.
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
session_stats 컬렉션을 쿼리하여 결과를 확인합니다.
db.session_stats.find().sort( { _id: 1 } )
이 작업은 다음 문서를 반환합니다.
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
집계 대안
맵 리듀스 대신 단계와 단계를 결합하는 $group 집계 $merge 파이프라인 을 사용하여 보다 유연한 작업으로 동일한 결과를 얻을 수 있습니다.
usersessions collection을 다시 생성합니다.
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
사용 가능한 집계 파이프라인 연산자를 사용하면 사용자 지정 함수를 정의하지 않고도 맵 리듀스 예제를 다시 작성할 수 있습니다.
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
$group는userid을 기준으로 그룹화하고 다음을 계산합니다.이 작업은 다음 문서를 반환합니다.
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } $project단계에서는 맵 리듀스의 출력을 미러링하여_id와value두 필드를 갖도록 출력 문서를 재구성합니다._id및value구조를 미러링할 필요가 없는 경우 이 단계는 선택 사항입니다.{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } 단계는
$merge결과를session_stats_aggcollection에 출력합니다. 기존 문서에 새 결과와 동일한_id이(가) 있는 경우 작업은 지정된 파이프라인을 적용하여 결과와 기존 문서에서 total_time, 개수, avg_time을 계산합니다.session_stats_agg에 동일한_id를 가진 기존 문서가 없는 경우 작업은 문서를 삽입합니다.session_stats_agg컬렉션을 쿼리하여 결과를 확인합니다.db.session_stats_agg.find().sort( { _id: 1 } ) 이 작업은 다음 문서를 반환합니다.
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } usersessionscollection에 새 문서를 추가합니다.db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) 파이프라인 시작 부분에
$match단계를 추가하여 날짜 필터를 지정합니다.db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) session_stats_agg컬렉션을 쿼리하여 결과를 확인합니다.db.session_stats_agg.find().sort( { _id: 1 } ) 이 작업은 다음 문서를 반환합니다.
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } 선택 사항. 실행할 때마다 집계 파이프라인의
$match날짜 조건을 수정하지 않으려면 헬퍼 함수에서 집계 래핑을 정의할 수 있습니다.updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; 그런 다음 실행하려면
updateSessionStats()함수에 시작 날짜를 전달하기만 하면 됩니다.updateSessionStats(ISODate('2020-03-05 00:00:00'))