MongoDB Atlas Trigger Function Help for Python Developer

Hi,
I am a data scientist/python developer by background and trying to create a scheduled Atlas Trigger function to run at the end of the day to aggregate the floating equity data (from a MetaTrader5 Windows client application) inserted into a MongoDB Collection asynchronously at random time points. I followed some Atlas trigger samples as an example, but I dont have any JS experience so I would be very happy if I could receive any guidance on writing the function correctly. The typical sample document that will be inserted into this FloatingEquity collection is below:

time:2022-03-22T13:55:14.000+00:00
_id:62501c4e1128da601d62a83e
ClosedPositions:1
equity:25001

I just need the aggregation to return the min() and max() value of the equity value along with the time of occurence for the entire day, so each day’s aggregation will just have one record or document, with four fields (or four columns if you are imagining a dataframe), viz, Equity_day_low, Equity_low_timestamp,Equity_day_high, Equity_high_timestamp. This aggregated data needs to be inserted into a new collection called FloatingEquityDailyReport.

The other important point is the timezones issues, I am in India so I scheduled the Atlas trigger to execute every day +3 hrs from GMT using cron schedule 59 2 * * * but the timestamps are inserted as per Europe/Bucharest timezone of the broker, so the trigger function needs to pay attention to the timezones of the MT5 broker server time as well. I would like to know if I am doing this correctly?

The code for the Atlast Trigger is below, along with the screenshots:

exports = function() {
  /*A Scheduled Trigger will always call a function without arguments.Documentation on Triggers: https://docs.mongodb.com/realm/triggers/overview */
  const mongodb = context.services.get('MT-Cluster');
  const FloatingEquityRaw = mongodb.db("MT5_AccountMetrix").collection("FloatingEquity");
  const FloatingEquityAggReport = mongodb.db("MT5_AccountMetrix").collection("FloatingEquityDailyReport");
  const generatedObjectId  = new BSON.ObjectId();
  //Generate Daily Report
  return FloatingEquityRaw.aggregate([
    {
      $match: {
        createdAt:{
          $gte: new Date(new Date().setHours(0,0,0,0)), //This DateTime needs to be in 'Europe/Bucharest' TimeZone of Mt5 broker
          $lt : new Date(new Date().setHours(23,59,59,999)), //This DateTime needs to be in 'Europe/Bucharest' TimeZone of Mt5 broker
        },
      },
    },
    {
      $group:{
        _id: generatedObjectId,
        min: {$min : 1}, //Need to capture the minimum equity value for the day along with the timestamp of the event occurence
        max: {$max : 1} //Need to capture the maximum equity value for the day along with the timestamp of the event occurence
      },
    },
    ]).next()
    .then(dailyReport => {
      FloatingEquityAggReport.insertOne(dailyReport);
    })
    .catch(err => console.error("Failed to Generate FloatingEquityDataReport:", err));
}

This is probably a minor fix for a JavaScript / MongoDB Atlas expert and should not take much time. Looking forward to receiving any gracious help with regard to this issue.

Best Regards,
Dilip

Hi @Dilip_Rajkumar ,

I since the scheduled time is done via a cron expression I don’t see how inserts should affect a schedule as long as the schedule make sense business wise (eg. After the 24h of inserted daily data).

The insert times should effect the date filtering inside the function to make sure you are scanning correct ranges of data. Mongodb does allow time alignment by using timezone in your query operators.

Now regarding the $max and $min, it sounds like the should be applied on a field like “$equity” or “$time” to get the max value or time per day, but you apply it to a static 1, which does not make sense to me… Can you elaborate.

I have to say that grouping on a new _id does not make sense as well. The _id in a group should be some meaningful expression , like a day date or null if you want to find min and max for the entire date range filter…

Now additionally please note that you can use a $merge stage in the end to write the output to the report collection rather than doing a loop of insertOne. This will probably be more efficient.

Let me know what are you current hazards.

Thanks
Pavel

Hi @Pavel_Duchovny ,
Thank you for taking the time to look at my issue. Yes, I don’t have any experience with JS so the Trigger function code was mostly just a copy and paste from this resource. The cron schedule is set to +3 hours in my IST zone, which I think should correspond with midnight of the broker server time in Bucharest.

I made some changes to the code based on your feedback and here is the updated code of the trigger function.

exports = function() {
  /*
    A Scheduled Trigger will always call a function without arguments.
    Documentation on Triggers: https://docs.mongodb.com/realm/triggers/overview/
  */
  const mongodb = context.services.get('mongodb-atlas');
  const db = mongodb.db("MT5_AccountMetrix");
  const FloatingEquity = db.collection("FloatingEquity");
  const test_item = FloatingEquity.find().sort({"equity":-1}).limit(1);
  console.log(test_item);
  const FloatingEquityDailyReport = db.collection("FloatingEquityDailyReport");
  // const generatedObjectId  = new BSON.ObjectId();
  
  //Generate Daily Report
  return FloatingEquity.aggregate([
    {
      $match: {
        createdAt:{
          $gte: new Date(new Date().setHours(0,0,0,0)), //This DateTime needs to be in 'Europe/Bucharest' TimeZone of Mt5 broker. Is this correct?
          $lt : new Date(new Date().setHours(23,59,59,999)), //This DateTime needs to be in 'Europe/Bucharest' TimeZone of Mt5 broker. Is this correct?
        },
      },
    },
    {
      $group:{
        _id: null,
        min_equity: {$min : "$equity"},
        max_equity: {$max : "$equity"},
        timestamp_min_equity : {}, //What should go here?
        timestamp_max_equity: {} //what should go here?
      },
    },
    { 
      $merge : {
        into : "FloatingEquityDailyReport", // Output Collection name
        on: "", //What  should go here?
        }
    },
    console.log("MinEquity:", min_equity),
    ]).next()
    // .then(dailyReport => {FloatingEquityDailyReport.insertOne(dailyReport);})
    .catch(err => console.error("Failed to Generate FloatingEquityDataReport:", err));
}

The current hazards I encounter are as follows:
1.) How do I print variables in the mongoDB realm function editor? I tried using console.log to print the test_item and the min_equity values but it does not work?
2.) Am I doing the $match condition to filter the documents between the start and end of the day as corresponding to Bucharest time zone? How do i set the timezone in $gte and $lt line?
3.) How do I retrieve the timestamp_min_equity which corresponds to the time field in the document ID which contains the min_equity value? Also the same for timestamp_max_equity?
4.) How do i use $merge operator to push the aggregated values into the target collection FloatingEquityDailyReport?

For the purpose of helping to save the time for all of us instead of doing back and forth code iterations, May i kindly request you to provide the correct solution by editing my JS code?

Best Regards,
Dilip

Hi @Dilip_Rajkumar ,

I will try to answer your questions:

1.) How do I print variables in the mongoDB realm function editor? I tried using console.log to print the test_item and the min_equity values but it does not work?
2.) Am I doing the $match condition to filter the documents between the start and end of the day as corresponding to Bucharest time zone? How do i set the timezone in $gte and $lt line?
3.) How do I retrieve the timestamp_min_equity which corresponds to the time field in the document ID which contains the min_equity value? Also the same for timestamp_max_equity ?
4.) How do i use $merge operator to push the aggregated values into the target collection FloatingEquityDailyReport ?

The console.log should show the information in he Realm logs of that specific application trigger run… You need to expand a row under the “Logs” tab.

I think the problem is that the a find command return a JS promise and not the actual document the way you wrote it. I have tried to follow your logic. Let me know if that works.

This code was never tested so treat it cautiously in a development environment!!!

For the dates you can use a helper function like:

function convertTZ(date, tzString) {
    return new Date((typeof date === "string" ? new Date(date) : date).toLocaleString("en-US", {timeZone: tzString}));   
}

Plant it inside the main function.

I would suggest to do something like the following , please note I haven’t tested the code:

exports = async function() {
  /*
    A Scheduled Trigger will always call a function without arguments.
    Documentation on Triggers: https://docs.mongodb.com/realm/triggers/overview/
  */

 function convertTZ(date, tzString) {
    return new Date((typeof date === "string" ? new Date(date) : date).toLocaleString("en-US", {timeZone: tzString}));   
}

  const mongodb = context.services.get('mongodb-atlas');
  const db = mongodb.db("MT5_AccountMetrix");
  const FloatingEquity = db.collection("FloatingEquity");
  const test_item = await FloatingEquity.find().sort({"equity":-1}).limit(1);
  console.log(EJSON.stringify(test_item));
  const FloatingEquityDailyReport = db.collection("FloatingEquityDailyReport");
  const generatedObjectId  = new BSON.ObjectId();
  
  //Generate Daily Report
 try
 {
  await FloatingEquity.aggregate([
    {
      $match: {
        createdAt:{
           $gte : convertTZ(new Date().setHours(0,0,0,0), 'Europe/Bucharest' ),
          $lt: convertTZ(new Date().setHours(23,59,59,999),  'Europe/Bucharest' )
        },
      },
    },
    {
      $group:{
        _id: null,
        min_equity: {$min : "$equity"},
        max_equity: {$max : "$equity"},
        timestamp_min_equity : { $min : "$time"}, 
        timestamp_max_equity: { $max : "$time"}
      },
    },
  {
    $addFields : 
            { 
                _id : generatedObjectId
           }
  },
    { 
      $merge : {
        into : "FloatingEquityDailyReport"
        }
    }
    ]);


  var calculatedResult = await FloatingEquityDailyReport.findOne({ _id : generatedObjectId});
  console.log(EJSON.stringify(calculatedResult));
  }
 catch (e) {
   console.error(`Error : ${e.message}`) 
}
}

Let me know if this helps you.

Thanks
Pavel

Hi Pavel,
Thank you very much :pray: for taking the time to provide the solution. When I run your code I get the following error messages, presented as two cases:
i.) If I run your code as it is, I get the following error message when I click the Run button:

> ran at 1649678299527
> took 322.600276ms
> logs: 
{}
> error logs: 
Error : toString() radix argument must be between 2 and 36
> result: 
{
  "$undefined": true
}
> result (JavaScript): 
EJSON.parse('{"$undefined":true}')

ii.) If i comment out the console.log lines of code, then I don’t get Error:toString() :

> ran at 1649678639238
> took 331.312139ms
> result: 
{
  "$undefined": true
}
> result (JavaScript): 
EJSON.parse('{"$undefined":true}')

I am also not able to see any logs either in the code editor or in the Logs Side panel button.

Lastly, I don’t think timestamp_min_equity : { $min : "$time"} is correct, because I need the value of the time field in the same document or record where the min_equity was found. If you do { $min : "$time"}, I think it may return the minimum time value from the entire collection for that particular day.

Best Regards,
Dilip

@Dilip_Rajkumar ,

In order to build a full solution I need sample documents and desired output not in screenshot please.

Hi Pavel,
As I am a new user I cannot, upload attachments in this forum post. Please find below the dropbox URLs for the two json files representing the sample input in collection ‘FloatingEquityData’ and the expected output (see Floating_Equity_DailyReport_Output.json) to be aggregated and stored in FloatingEquityDailyReport collection.

I hope the above dropbox links are accessible to you, if not this is the expected JSON output after the min/max aggregation:

[
  {"date":"22-03-2022","Min_Equity":25000.0,"timestamp_min_equity":"2022-03-22 13:20:57","MaxEquity":25014.0,"timestamp_max_equity":"2022-03-22 13:56:26"},
  {"date":"23-03-2022","Min_Equity":24994.48,"timestamp_min_equity":"2022-03-23 10:47:58","MaxEquity":25342.64,"timestamp_max_equity":"2022-03-23 12:49:04"},
  {"date":"24-03-2022","Min_Equity":25066.84,"timestamp_min_equity":"2022-03-24 08:03:19","MaxEquity":25182.74,"timestamp_max_equity":"2022-03-24 04:14:47"},
  {"date":"25-03-2022","Min_Equity":25103.59,"timestamp_min_equity":"2022-03-25 18:54:09","MaxEquity":25103.59,"timestamp_max_equity":"2022-03-25 18:54:09"},
  {"date":"28-03-2022","Min_Equity":23906.73,"timestamp_min_equity":"2022-03-28 14:57:02","MaxEquity":24754.09,"timestamp_max_equity":"2022-03-28 13:00:20"},
  {"date":"29-03-2022","Min_Equity":24017.34,"timestamp_min_equity":"2022-03-29 08:45:51","MaxEquity":24806.54,"timestamp_max_equity":"2022-03-29 12:08:23"},
  {"date":"30-03-2022","Min_Equity":24004.45,"timestamp_min_equity":"2022-03-30 19:36:45","MaxEquity":24234.45,"timestamp_max_equity":"2022-03-30 19:13:56"},
  {"date":"31-03-2022","Min_Equity":23818.45,"timestamp_min_equity":"2022-03-31 06:49:58","MaxEquity":24007.45,"timestamp_max_equity":"2022-03-31 05:04:51"},
  {"date":"01-04-2022","Min_Equity":23225.45,"timestamp_min_equity":"2022-04-01 13:04:45","MaxEquity":23861.45,"timestamp_max_equity":"2022-04-01 14:01:38"},
  {"date":"05-04-2022","Min_Equity":26208.25,"timestamp_min_equity":"2022-04-05 21:31:20","MaxEquity":26208.25,"timestamp_max_equity":"2022-04-05 21:31:20"},
  {"date":"07-04-2022","Min_Equity":26065.25,"timestamp_min_equity":"2022-04-07 09:10:48","MaxEquity":27080.25,"timestamp_max_equity":"2022-04-07 19:43:03"},
  {"date":"08-04-2022","Min_Equity":26807.73,"timestamp_min_equity":"2022-04-08 11:40:47","MaxEquity":27109.25,"timestamp_max_equity":"2022-04-08 07:28:18"},
  {"date":"10-04-2022","Min_Equity":27010.09,"timestamp_min_equity":"2022-04-10 16:17:42","MaxEquity":27038.05,"timestamp_max_equity":"2022-04-10 07:48:10"}
]

Hope this helps!

Best Regards

Hi @Dilip_Rajkumar ,

So I figured out everything beside the timezone time setting. Something is off when setting the format, is that critical?

exports = async function() {
  /*
    A Scheduled Trigger will always call a function without arguments.
    Documentation on Triggers: https://docs.mongodb.com/realm/triggers/overview/
  */

 function convertTZ(date, tzString) {
    return Date(new Date(date).toLocaleString("", {timeZone: tzString}));   
}

  const mongodb = context.services.get('mongodb-atlas');
  const db = mongodb.db("MT5_AccountMetrix");
  const FloatingEquity = db.collection("FloatingEquity");
  const test_item = await FloatingEquity.find().sort({"equity":-1}).limit(1);
  console.log(EJSON.stringify(test_item));
  const FloatingEquityDailyReport = db.collection("FloatingEquityDailyReport");
  const generatedObjectId  = new BSON.ObjectId();
  
  //Generate Daily Report
 try
 {
  var pipeline = [{$match: {
                        time:  { $gt : new Date(new Date().setHours(0,0,0,0)),
                                $lt: new Date(new Date().setHours(23,59,59,59))  }
                      }}, 
                  {$group: {
                    _id: null,
                    min_equity: {
                      $min: '$equity'
                    },
                    max_equity: {
                      $max: '$equity'
                    }
                  }}, 
                  {$addFields: {
                        _id: generatedObjectId
                        }}, 
                  {$lookup: {
                      from: 'FloatingEquity',
                      localField: 'min_equity',
                      foreignField: 'equity',
                      as: 'min_equity_timestamp'
                    }},
                    {$lookup: {
                        from: 'FloatingEquity',
                        localField: 'max_equity',
                        foreignField: 'equity',
                        as: 'max_equity_timestamp'
                      }}, 
                      {$addFields: {
                          max_equity_timestamp: {$first : "$max_equity_timestamp.time"},
                            min_equity_timestamp: {$first : "$min_equity_timestamp.time"},
                            "date" : "$$NOW"
                        
                          }},
                      { 
                        $merge : {
                          into : "FloatingEquityDailyReport"
                          }
                      }
                ];
            var x = await FloatingEquity.aggregate(pipeline).toArray();
            console.log(EJSON.stringify(x));
            console.log(EJSON.stringify(pipeline));
            var calculatedResult = await FloatingEquityDailyReport.findOne({ _id : generatedObjectId});
            console.log(EJSON.stringify(calculatedResult));
        }
       catch (e) {
         console.error(`Error : ${e.message}`) 
      }
}

Please note that the aggregation does all the logic of calculating the max, min and then looking up the corresponding date of the first occurance of this max value…

Please note that you have to run the trigger not via debug to see logs in the logs tab. When you run it via Debug the console is just seen in the run output of that specific debug run. You have to save and press the blue bar to deploy the triggers code properly otherwise it does not really exist.

The function should also run as “SYSTEM” in the functions settings, make sure you save and deploy it that way under the Functions tab…

I suggest you to run through our tutorials, documentation and other material to get yourself more familiar with Atlas and realm…

Also please conisder taking JS courses as this is vital for writing triggers/functions etc.

I was able to get the desired report added… Make sure you have sufficient data for current 24h you run the code with…

Thanks
Pavel

Hi Pavel,
Thank you for all your efforts. But the code is not working in my end , I also see that your output in the FloatingEquityDailyReport is not correct and does not match my output that I sent in the last message, which on April 07th is {"date":"07-04-2022","Min_Equity":26065.25,"timestamp_min_equity":"2022-04-07 09:10:48","MaxEquity":27080.25,"timestamp_max_equity":"2022-04-07 19:43:03"}, .
The min_equity was not 13 and max_equity also was not 19 on April 07th. It is also critical that I get the date right based on the timezone in which this trigger function is called, as you can imagine April 07th 23:59:59 in Bucharest will be April 08, 02:59:59 in India (depending on DayLight Savings) .

It took me 15 mins to write the code in python and it is sad that MongoDB is not considering the option to allow MongoDB users to code Atlas/Realm trigger functions in python. Perhaps I hope this will change in the future. Anyways, I think it is much too difficult to write this code in Javascript, so I will just write a python function and schedule it using AWS Lambda or Windows Task Scheduler in an AWS VPS.

Best Regards,
Dilip

I did not use your data…

You can shift the data as you please with the aggregation pipeline…

I did use my own documents. therefore the data is a bit different… The dates you send are strings and not dates…

I think that coding with Java script is perfectly fine as long as you are used to the langauge.

Hopefully the AWS code will not get you into resource or security problems when connecting to the Atlas cluster.

Thanks
Pavel