Change Streams and Socketio

Hi All,

My watch of the stream is not working, returning nothing, no error displayed and can’t insert data into the chat collection either but can get data from the chatfeed collection.

const express = require('express')
const app = express()
const { MongoClient } = require("mongodb");
// const { Kafka, KafkaJSBrokerNotFound, CompressionTypes, logLevel, Partitioners } = require('kafkajs')
// const cors = require('cors')
// const dotenv = require('dotenv')
const http = require('http');
const server = http.createServer(app);



const { Server } = require('socket.io')
// const io = require('socket.io')

const io = new Server(server, {
  cors: {
    origin: "http://localhost:3000" 
  }
});


console.log('MongoDB connected...');



//Connect to MongoDB
const uri = "mongodb+srv://<username/password@cluster0.ebt7p.mongodb.net/?retryWrites=true&w=majority";

const client = new MongoClient(uri);


const simulateAsyncPause = () =>
  new Promise(resolve => {
    setTimeout(() => resolve(), 1000);
  });


let changeStream;


async function run() {

  try {

  // Select DB and Collection
    await client.connect();
   
    const database = client.db("socketIo-MongoDb");

    const chat = database.collection("chat");

    const chatfeed= database.collection("chatfeed");

            
             chatfeed.find().limit(100).sort({_id:1}).toArray((err, res) => {
                      if(err){
                          throw err;
                      }

                      
                      io.on('connection', socket => {

                      // Emit the messages
                      socket.emit('output', res);

                      socket.on('disconnect', () => {
                        console.log(`disconnect: ${socket.id}`);
                      });

                    })
              
               })
          

            // open a Change Stream on the "chatfeed" collection
              changeStream = chatfeed.watch();

              // set up a listener when change events are emitted
              changeStream.on("change", async next => {
                // process any change next
                console.log("received a change to the collection: \t", next);

                         // process any change next
                  switch (next.operationType) {
                      case 'insert':


                         await simulateAsyncPause();
                      
                          io.emit('output', next.fullDocument.message);

                           await simulateAsyncPause();


                          console.log( 'INSERT',next.fullDocument.message);


                          break;


                      case 'update':

                       
                          io.emit('output', next.updateDescription.updatedFields.message);

                       

                          console.log('UPDATE', next.updateDescription.updatedFields.message);
                  }
              });


              await simulateAsyncPause();

              await chat.insertOne({
                name: "John Johnson",
                message: "No bytes, no problem. Just insert a document, in MongoDB",
              });

               await simulateAsyncPause();

            

          io.on('connection',  socket => {
            // Handle input events
           socket.on('input',  async data => {

                

                    let name = data.name;
                    let message = data.message;

                   

                     await chat.insertOne({name: name, message: message}, () => {
                      
                           socket.emit('output', [data])

                             console.log('OUTPUT', [data])
                           
                     })

                   

                    });

                     socket.on('disconnect', () => {
                         console.log(`disconnect: ${socket.id}`);
                       });


         
                  })
              
             await changeStream.close();
              
              console.log("closed the change stream");

            
           


         



  } finally {

    // Ensures that the client will close when you finish/error
    await client.close();

  }
}
run().catch(console.dir);

const PORT = 4000;


server.listen(PORT, console.log('IO RUNNING on PORT 4000')) ```

Thank you for your help