Is it OK to use change streams inside a server API?

app.get('/msg/:id', (req, res) => {

  const pipeline = [
    { $match: { 'fullDocument.parent': req.params.id } }
  ];
  const changeStream = db.collection('message').watch(pipeline);
  changeStream.on('change', (result) => {
    res.write('event: test\n');
    res.write('data: ' + JSON.stringify([result.fullDocument]) + '\n\n');
  });
});

I’m using express / mongodb official driver to make APIs for a real time app.
When users make a request to /msg/:id, will it spawn many changeStream listeners every time so that the performance will eventually drop?
or is it OK to use like this?

what’s the variable scope of this change stream object? can it be reused later by a new request?

i’m a bit concerned with resource leak, yeah, as you said. Something is allocated but then ref lost and never gets released.

const pipeline = [
    { $match: { 'fullDocument.parent': req.params.id } }
]
const changeStream = db.collection('message').watch(pipeline)

app.get('/msg/:id', (req, res) => {

  changeStream.on('change', (result) => {
    res.write('event: test\n');
    res.write('data: ' + JSON.stringify([result.fullDocument]) + '\n\n');
  });
});

Oh yeah moving the changeStream to a global variable would be better.
but I’m still wondering about using changeStream.on() inside a server API is safe or not.

Have you tried your code?

As far as I understand on() simply sets a listener and returns right away. So, I see

1 - res will never get a value with res.write unless you are lucky and a message is updated just at the right moment
2 - if you could set many listeners on the same stream, the you will be susceptible to DOS

That is why, have you tried your code?

app.get('/msg/:id', (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Content-Type": "text/event-stream",
    "Cache-Control": "no-cache",
  });

  const pipeline = [
    { $match: { 'fullDocument.parent': req.params.id } }
  ];
  const changeStream = db.collection('message').watch(pipeline);
  changeStream.on('change', (result) => {
    res.write('event: test\n');
    res.write('data: ' + JSON.stringify([result.fullDocument]) + '\n\n');
  });
});

Yeah the res.write works fine because I used SSE and it keeps the connection alive.
About number 2, I should think more about it, thanks.

1 Like