I am currently storing the tick data of about 5000 stocks into a collection, and I am trying to find the last doc inserted into the database of each stock. The collection holds about 6m docs on average, and all stock have the following format:
{
htsc_code: 'AAPL',
time: ISODate('2023-08-09T12:34:056Z'),
trading_phase_code: "1",
exchange: 'NASDAQ',
security_type: 'stock',
price_max: '100',
price_min: '1',
prev_close: '12',
num_trades: "100",
volume: 100,
value: 10000,
last: 20,
open: 2,
high: 10,
low: 1,
close: 20
}
with some extra fields and values: {bid/ask}{price/size}{i} for i in range(1, 11): {some float values}.
I need only the last value of the ‘last’ field and the ‘time’ field. I did the following:
(1) First create the UNIQUE index htsc_code_1_time_-1, then run the following code:
stocks = db[collection].distinct('htsc_code')
doc_ls = []
for stock in stocks:
doc_ls.append(db[col].find({'htsc_code': stock'}, {'last': 1, 'time': 1, 'stock_code': 1, '_id': 0}).limit(1))
It took 1.3 seconds to generate the list
(2) Do not create any index, perform the following aggregation:
[
{'$group': {
'_id': '$htsc_code',
'price_last': {'$last': '$last'},
'time_last': {'$last': '$time'}
}
]
The query took 5 seconds. The explained result is:
(3) Similar to (2), but create the UNIQUE index htsc_code_1_time_-1 in advance, and perform the aggregation in (2). The query took 10 seconds. The explain result is:
{
"explainVersion": "2",
"queryPlanner": {
"namespace": "htsc.tick_stock",
"indexFilterSet": false,
"parsedQuery": {},
"queryHash": "AACE9B53",
"planCacheKey": "AACE9B53",
"optimizedPipeline": true,
"maxIndexedOrSolutionsReached": false,
"maxIndexedAndSolutionsReached": false,
"maxScansToExplodeReached": false,
"winningPlan": {
"queryPlan": {
"stage": "GROUP",
"planNodeId": 2,
"inputStage": {
"stage": "COLLSCAN",
"planNodeId": 1,
"filter": {},
"direction": "forward"
}
},
"slotBasedPlan": {
"slots": "$$RESULT=s13 env: { s1 = TimeZoneDatabase(Etc/GMT-1...Europe/Kyiv) (timeZoneDB), s3 = Timestamp(1691633168, 163) (CLUSTER_TIME), s2 = Nothing (SEARCH_META), s4 = 1691633168828 (NOW) }",
"stages": "[2] mkbson s13 [_id = s8, price_last = s10, time_last = s12] true false \n[2] group [s8] [s10 = last (fillEmpty (s9, null)), s12 = last (fillEmpty (s11, null))] \n[2] project [s11 = getField (s5, \"time\")] \n[2] project [s9 = getField (s5, \"last\")] \n[2] project [s8 = fillEmpty (s7, null)] \n[2] project [s7 = getField (s5, \"htsc_code\")] \n[1] scan s5 s6 none none none none [] @\"2fc8cb4f-41e2-4005-9f02-738da2081a4f\" true false "
}
},
"rejectedPlans": []
},
"executionStats": {
"executionSuccess": true,
"nReturned": 5381,
"executionTimeMillis": 10428,
"totalKeysExamined": 0,
"totalDocsExamined": 6901554,
"executionStages": {
"stage": "mkbson",
"planNodeId": 2,
"nReturned": 5381,
"executionTimeMillisEstimate": 10427,
"opens": 1,
"closes": 1,
"saveState": 6901,
"restoreState": 6901,
"isEOF": 1,
"objSlot": 13,
"fields": [],
"projectFields": [
"_id",
"price_last",
"time_last"
],
"projectSlots": [
8,
10,
12
],
"forceNewObject": true,
"returnOldObject": false,
"inputStage": {
"stage": "group",
"planNodeId": 2,
"nReturned": 5381,
"executionTimeMillisEstimate": 10426,
"opens": 1,
"closes": 1,
"saveState": 6901,
"restoreState": 6901,
"isEOF": 1,
"groupBySlots": [
8
],
"expressions": {
"10": "last (fillEmpty (s9, null)) ",
"12": "last (fillEmpty (s11, null)) "
},
"usedDisk": false,
"spilledRecords": 0,
"spilledBytesApprox": 0,
"inputStage": {
"stage": "project",
"planNodeId": 2,
"nReturned": 6901554,
"executionTimeMillisEstimate": 8903,
"opens": 1,
"closes": 1,
"saveState": 6901,
"restoreState": 6901,
"isEOF": 1,
"projections": {
"11": "getField (s5, \"time\") "
},
"inputStage": {
"stage": "project",
"planNodeId": 2,
"nReturned": 6901554,
"executionTimeMillisEstimate": 8674,
"opens": 1,
"closes": 1,
"saveState": 6901,
"restoreState": 6901,
"isEOF": 1,
"projections": {
"9": "getField (s5, \"last\") "
},
"inputStage": {
"stage": "project",
"planNodeId": 2,
"nReturned": 6901554,
"executionTimeMillisEstimate": 8174,
"opens": 1,
"closes": 1,
"saveState": 6901,
"restoreState": 6901,
"isEOF": 1,
"projections": {
"8": "fillEmpty (s7, null) "
},
"inputStage": {
"stage": "project",
"planNodeId": 2,
"nReturned": 6901554,
"executionTimeMillisEstimate": 8010,
"opens": 1,
"closes": 1,
"saveState": 6901,
"restoreState": 6901,
"isEOF": 1,
"projections": {
"7": "getField (s5, \"htsc_code\") "
},
"inputStage": {
"stage": "scan",
"planNodeId": 1,
"nReturned": 6901554,
"executionTimeMillisEstimate": 7627,
"opens": 1,
"closes": 1,
"saveState": 6901,
"restoreState": 6901,
"isEOF": 1,
"numReads": 6901554,
"recordSlot": 5,
"recordIdSlot": 6,
"fields": [],
"outputSlots": []
}
}
}
}
}
}
},
"allPlansExecution": []
},
"command": {
"aggregate": "tick_stock",
"pipeline": [
{
"$group": {
"_id": "$htsc_code",
"price_last": {
"$last": "$last"
},
"time_last": {
"$last": "$time"
}
}
}
],
"allowDiskUse": true,
"cursor": {},
"maxTimeMS": 60000,
"$db": "htsc"
},
"serverInfo": {
"host": "IT-3",
"port": 27017,
"version": "6.0.3",
"gitVersion": "f803681c3ae19817d31958965850193de067c516"
},
"serverParameters": {
"internalQueryFacetBufferSizeBytes": 104857600,
"internalQueryFacetMaxOutputDocSizeBytes": 104857600,
"internalLookupStageIntermediateDocumentMaxSizeBytes": 104857600,
"internalDocumentSourceGroupMaxMemoryBytes": 104857600,
"internalQueryMaxBlockingSortMemoryUsageBytes": 104857600,
"internalQueryProhibitBlockingMergeOnMongoS": 0,
"internalQueryMaxAddToSetBytes": 104857600,
"internalDocumentSourceSetWindowFieldsMaxMemoryBytes": 104857600
},
"ok": 1,
"$clusterTime": {
"clusterTime": {
"$timestamp": {
"t": 1691633179,
"i": 137
}
},
"signature": {
"hash": {
"$binary": {
"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=",
"subType": "00"
}
},
"keyId": 0
}
},
"operationTime": {
"$timestamp": {
"t": 1691633179,
"i": 137
}
}
}
and no index is used.
(4) Similar to (2), but add the unique index mentioned and sort by the fields in advance, i.e. add a stage:
{
'$sort': {
'htsc_code': 1, 'time': -1, 'last': 1
}
}
It took 45 seconds, the explain result is:
{
"explainVersion": "2",
"queryPlanner": {
"namespace": "htsc.tick_stock",
"indexFilterSet": false,
"parsedQuery": {},
"queryHash": "9B48956E",
"planCacheKey": "9B48956E",
"optimizedPipeline": true,
"maxIndexedOrSolutionsReached": false,
"maxIndexedAndSolutionsReached": false,
"maxScansToExplodeReached": false,
"winningPlan": {
"queryPlan": {
"stage": "GROUP",
"planNodeId": 4,
"inputStage": {
"stage": "SORT",
"planNodeId": 3,
"sortPattern": {
"htsc_code": 1,
"time": 1,
"last": 1
},
"memLimit": 104857600,
"type": "simple",
"inputStage": {
"stage": "PROJECTION_SIMPLE",
"planNodeId": 2,
"transformBy": {
"htsc_code": true,
"last": true,
"time": true,
"_id": false
},
"inputStage": {
"stage": "COLLSCAN",
"planNodeId": 1,
"filter": {},
"direction": "forward"
}
}
}
},
"slotBasedPlan": {
"slots": "$$RESULT=s27 env: { s1 = TimeZoneDatabase(Etc/GMT-1...Europe/Kyiv) (timeZoneDB), s3 = Timestamp(1691633545, 439) (CLUSTER_TIME), s2 = Nothing (SEARCH_META), s4 = 1691633545927 (NOW) }",
"stages": "[4] mkbson s27 [_id = s22, price_last = s24, time_last = s26] true false \n[4] group [s22] [s24 = last (fillEmpty (s23, null)), s26 = last (fillEmpty (s25, null))] \n[4] project [s25 = getField (s7, \"time\")] \n[4] project [s23 = getField (s7, \"last\")] \n[4] project [s22 = fillEmpty (s21, null)] \n[4] project [s21 = getField (s7, \"htsc_code\")] \n[3] sort [s14, s17, s20] [asc, asc, asc] [s7] \n[3] project [s20 = fillEmpty (s19, undefined)] \n[3] traverse s19 s18 s10 {if (s18 <=> s19 < 0, s18, s19)} {} \nfrom \n [3] project [s17 = fillEmpty (s16, undefined)] \n [3] traverse s16 s15 s9 {if (s15 <=> s16 < 0, s15, s16)} {} \n from \n [3] project [s14 = fillEmpty (s13, undefined)] \n [3] traverse s13 s12 s8 {if (s12 <=> s13 < 0, s12, s13)} {} \n from \n [3] project [s11 = isArray (s8) <=> false + isArray (s9) <=> false + isArray (s10) <=> false <= 1 || fail ( 2 ,cannot sort with keys that are parallel arrays)] \n [3] project [s8 = fillEmpty (getField (s7, \"htsc_code\"), null), s9 = fillEmpty (getField (s7, \"time\"), null), s10 = fillEmpty (getField (s7, \"last\"), null)] \n [2] mkbson s7 s5 [htsc_code, last, time] keep [] true false \n [1] scan s5 s6 none none none none [] @\"2fc8cb4f-41e2-4005-9f02-738da2081a4f\" true false \n in \n [3] project [s12 = s8] \n [3] limit 1 \n [3] coscan \n \n in \n [3] project [s15 = s9] \n [3] limit 1 \n [3] coscan \n \nin \n [3] project [s18 = s10] \n [3] limit 1 \n [3] coscan \n"
}
},
"rejectedPlans": []
},
"executionStats": {
"executionSuccess": true,
"nReturned": 5381,
"executionTimeMillis": 45345,
"totalKeysExamined": 0,
"totalDocsExamined": 7054503,
"executionStages": {
"stage": "mkbson",
"planNodeId": 4,
"nReturned": 5381,
"executionTimeMillisEstimate": 45343,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"objSlot": 27,
"fields": [],
"projectFields": [
"_id",
"price_last",
"time_last"
],
"projectSlots": [
22,
24,
26
],
"forceNewObject": true,
"returnOldObject": false,
"inputStage": {
"stage": "group",
"planNodeId": 4,
"nReturned": 5381,
"executionTimeMillisEstimate": 45341,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"groupBySlots": [
22
],
"expressions": {
"24": "last (fillEmpty (s23, null)) ",
"26": "last (fillEmpty (s25, null)) "
},
"usedDisk": false,
"spilledRecords": 0,
"spilledBytesApprox": 0,
"inputStage": {
"stage": "project",
"planNodeId": 4,
"nReturned": 7054503,
"executionTimeMillisEstimate": 43614,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"projections": {
"25": "getField (s7, \"time\") "
},
"inputStage": {
"stage": "project",
"planNodeId": 4,
"nReturned": 7054503,
"executionTimeMillisEstimate": 43235,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"projections": {
"23": "getField (s7, \"last\") "
},
"inputStage": {
"stage": "project",
"planNodeId": 4,
"nReturned": 7054503,
"executionTimeMillisEstimate": 42795,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"projections": {
"22": "fillEmpty (s21, null) "
},
"inputStage": {
"stage": "project",
"planNodeId": 4,
"nReturned": 7054503,
"executionTimeMillisEstimate": 42429,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"projections": {
"21": "getField (s7, \"htsc_code\") "
},
"inputStage": {
"stage": "sort",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 41871,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"memLimit": 104857600,
"totalDataSizeSorted": 988911916,
"usedDisk": true,
"spills": 10,
"orderBySlots": {
"14": "asc",
"17": "asc",
"20": "asc"
},
"outputSlots": [
7
],
"inputStage": {
"stage": "project",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 14191,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"projections": {
"20": "fillEmpty (s19, undefined) "
},
"inputStage": {
"stage": "traverse",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 13967,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"innerOpens": 0,
"innerCloses": 0,
"inputSlot": 10,
"outputSlot": 19,
"outputSlotInner": 18,
"correlatedSlots": [],
"nestedArraysDepth": 1,
"fold": "if (s18 <=> s19 < 0, s18, s19) ",
"outerStage": {
"stage": "project",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 13326,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"projections": {
"17": "fillEmpty (s16, undefined) "
},
"inputStage": {
"stage": "traverse",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 13138,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"innerOpens": 0,
"innerCloses": 0,
"inputSlot": 9,
"outputSlot": 16,
"outputSlotInner": 15,
"correlatedSlots": [],
"nestedArraysDepth": 1,
"fold": "if (s15 <=> s16 < 0, s15, s16) ",
"outerStage": {
"stage": "project",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 12561,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"projections": {
"14": "fillEmpty (s13, undefined) "
},
"inputStage": {
"stage": "traverse",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 12351,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"innerOpens": 0,
"innerCloses": 0,
"inputSlot": 8,
"outputSlot": 13,
"outputSlotInner": 12,
"correlatedSlots": [],
"nestedArraysDepth": 1,
"fold": "if (s12 <=> s13 < 0, s12, s13) ",
"outerStage": {
"stage": "project",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 11430,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"projections": {
"11": "isArray (s8) <=> false + isArray (s9) <=> false + isArray (s10) <=> false <= 1 || fail ( 2 ,cannot sort with keys that are parallel arrays) "
},
"inputStage": {
"stage": "project",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 10261,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"projections": {
"8": "fillEmpty (getField (s7, \"htsc_code\"), null) ",
"9": "fillEmpty (getField (s7, \"time\"), null) ",
"10": "fillEmpty (getField (s7, \"last\"), null) "
},
"inputStage": {
"stage": "mkbson",
"planNodeId": 2,
"nReturned": 7054503,
"executionTimeMillisEstimate": 9114,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"objSlot": 7,
"rootSlot": 5,
"fieldBehavior": "keep",
"fields": [
"htsc_code",
"last",
"time"
],
"projectFields": [],
"projectSlots": [],
"forceNewObject": true,
"returnOldObject": false,
"inputStage": {
"stage": "scan",
"planNodeId": 1,
"nReturned": 7054503,
"executionTimeMillisEstimate": 6932,
"opens": 1,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 1,
"numReads": 7054503,
"recordSlot": 5,
"recordIdSlot": 6,
"fields": [],
"outputSlots": []
}
}
}
},
"innerStage": {
"stage": "project",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 634,
"opens": 7054503,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 0,
"projections": {
"12": "s8 "
},
"inputStage": {
"stage": "limit",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 301,
"opens": 7054503,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 0,
"limit": 1,
"inputStage": {
"stage": "coscan",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 132,
"opens": 7054503,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 0
}
}
}
}
},
"innerStage": {
"stage": "project",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 456,
"opens": 7054503,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 0,
"projections": {
"15": "s9 "
},
"inputStage": {
"stage": "limit",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 239,
"opens": 7054503,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 0,
"limit": 1,
"inputStage": {
"stage": "coscan",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 122,
"opens": 7054503,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 0
}
}
}
}
},
"innerStage": {
"stage": "project",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 481,
"opens": 7054503,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 0,
"projections": {
"18": "s10 "
},
"inputStage": {
"stage": "limit",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 227,
"opens": 7054503,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 0,
"limit": 1,
"inputStage": {
"stage": "coscan",
"planNodeId": 3,
"nReturned": 7054503,
"executionTimeMillisEstimate": 142,
"opens": 7054503,
"closes": 1,
"saveState": 7055,
"restoreState": 7055,
"isEOF": 0
}
}
}
}
}
}
}
}
}
}
}
},
"allPlansExecution": []
},
"command": {
"aggregate": "tick_stock",
"pipeline": [
{
"$sort": {
"htsc_code": 1,
"time": 1,
"last": 1
}
},
{
"$group": {
"_id": "$htsc_code",
"price_last": {
"$last": "$last"
},
"time_last": {
"$last": "$time"
}
}
}
],
"allowDiskUse": true,
"cursor": {},
"maxTimeMS": 60000,
"$db": "htsc"
},
"serverInfo": {
"host": "IT-3",
"port": 27017,
"version": "6.0.3",
"gitVersion": "f803681c3ae19817d31958965850193de067c516"
},
"serverParameters": {
"internalQueryFacetBufferSizeBytes": 104857600,
"internalQueryFacetMaxOutputDocSizeBytes": 104857600,
"internalLookupStageIntermediateDocumentMaxSizeBytes": 104857600,
"internalDocumentSourceGroupMaxMemoryBytes": 104857600,
"internalQueryMaxBlockingSortMemoryUsageBytes": 104857600,
"internalQueryProhibitBlockingMergeOnMongoS": 0,
"internalQueryMaxAddToSetBytes": 104857600,
"internalDocumentSourceSetWindowFieldsMaxMemoryBytes": 104857600
},
"ok": 1,
"$clusterTime": {
"clusterTime": {
"$timestamp": {
"t": 1691633591,
"i": 78
}
},
"signature": {
"hash": {
"$binary": {
"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=",
"subType": "00"
}
},
"keyId": 0
}
},
"operationTime": {
"$timestamp": {
"t": 1691633591,
"i": 78
}
}
}
The database is running on a PC with i5-13600K and 64GB of RAM. If I managed to make the query faster, it can be performed on a server with 48 cores and 500G of RAM. Is there any other ways I can try to perform the $group faster than querying with a for-loop in Python? Thank you!