Greg_Olson
(Greg Olson)
January 12, 2023, 11:35pm
1
I am trying to write a function for a scheduled trigger that sends all documents from a collection that have been updated since the trigger was last executed. Is there a system variable or somewhere in the context object that would return this timestamp? I’ve searched extensively and found no examples. Another option may be to write the execution time to a collection at the end of the function, then retrieve that date at the beginning of each function call. If anyone has any similar examples, I would appreciate it. My pipeline definition looks like the following. I just want to be able to set start_date=‘trigger last execution date/time’:
const pipeline = [
{
$match: {
"updatedAt": {
$gt: start_date
}
}
}, {
"$out": {
"s3": {
"bucket": "bucket_name",
"region": "us-west-2",
"filename": filename,
"format": {
"name": "parquet",
"maxFileSize": "10GB",
"maxRowGroupSize": "100MB"
}
}
}
}
];
return events.aggregate(pipeline);
};
Greg_Olson
(Greg Olson)
January 20, 2023, 12:01am
2
Was able to get this working with the following solution, tracking each execution time in a collection:
exports = async function () {
const service = context.services.get("my_federated_db_name");
const db = service.db("VirtualDatabase0")
const events = db.collection("VirtualCollection0");
const current_timestamp = new Date(Date.now()).toISOString();
const date_str = current_timestamp.split('T')[0];
//console.log(date_str);
const epoch_date=new Date().getTime() / 1000;
const epoch_date_str=epoch_date.toString();
//generate unique file name including partition
const filename="customers/file_date=".concat(date_str).concat("/").concat("customers").concat("_").concat(epoch_date_str);
console.log('filename: ',filename);
const event_service=context.services.get("change-stream-poc");
const event_db=event_service.db("trigger_execution");
const event_collection=event_db.collection("events");
//retrieve last execution date from collection
const start_date_array= await event_collection.find({type: "execution"}).sort( { timestamp: -1 } ).limit(1).toArray(); //Works!
const start_date_str=start_date_array[0].timestamp;
const start_date=new Date(start_date_str);
console.log(JSON.stringify(start_date_array[0].timestamp, null, 4));
const query_results = await events.find({updatedAt: {$gt: start_date}}).toArray();
console.log('query results: ',JSON.stringify(query_results, null, 4));
//const start_date = new Date("2023-01-19T00:00:00.0Z");
const pipeline = [
{
$match: {
"updatedAt": {
$gt: start_date
}
}
}, {
"$out": {
"s3": {
"bucket": "mybucket",
"region": "us-west-2",
"filename": filename,
"format": {
"name": "parquet",
"maxFileSize": "10GB",
"maxRowGroupSize": "100MB"
}
}
}
}
];
const response= events.aggregate(pipeline);
//insert execution timestamp into collection
event_collection.insertOne( { type: "execution", timestamp: current_timestamp.toString() } );
return response
};