I am just trying to get data from mongodb atlas collection via mongodb-kafka-source-connector to redpanda then back to a different collection in mongodb atlas via mongodb-kafka-sink-connector.
I managing to consume the data but then unable to send to mongodb atlas - getting exception —>
connect | [2022-07-27 17:51:46,250] ERROR Failed to put into the sink some records, see log entries below for the details (com.mongodb.kafka.connect.sink.MongoSinkTask)
connect | com.mongodb.MongoBulkWriteException: Bulk write operation error on server cluster0-shard-00-02.ebt7p.mongodb.net:27017. Write errors: [BulkWriteError{index=0, code=2, message='unknown operator: $oid', details={}}].
its now working, now encountering problems with inserting data into mongodb
My watch of the stream is not working, returning nothing, no error displayed and can’t insert data into the chat collection either but can get data from the chatfeed collection.
const express = require('express')
const app = express()
const { MongoClient } = require("mongodb");
// const { Kafka, KafkaJSBrokerNotFound, CompressionTypes, logLevel, Partitioners } = require('kafkajs')
// const cors = require('cors')
// const dotenv = require('dotenv')
const http = require('http');
const server = http.createServer(app);
const { Server } = require('socket.io')
// const io = require('socket.io')
const io = new Server(server, {
cors: {
origin: "http://localhost:3000"
}
});
console.log('MongoDB connected...');
//Connect to MongoDB
const uri = "mongodb+srv://<username/password@cluster0.ebt7p.mongodb.net/?retryWrites=true&w=majority";
const client = new MongoClient(uri);
const simulateAsyncPause = () =>
new Promise(resolve => {
setTimeout(() => resolve(), 1000);
});
let changeStream;
async function run() {
try {
// Select DB and Collection
await client.connect();
const database = client.db("socketIo-MongoDb");
const chat = database.collection("chat");
const chatfeed= database.collection("chatfeed");
chatfeed.find().limit(100).sort({_id:1}).toArray((err, res) => {
if(err){
throw err;
}
io.on('connection', socket => {
// Emit the messages
socket.emit('output', res);
socket.on('disconnect', () => {
console.log(`disconnect: ${socket.id}`);
});
})
})
// open a Change Stream on the "chatfeed" collection
changeStream = chatfeed.watch();
// set up a listener when change events are emitted
changeStream.on("change", async next => {
// process any change next
console.log("received a change to the collection: \t", next);
// process any change next
switch (next.operationType) {
case 'insert':
await simulateAsyncPause();
io.emit('output', next.fullDocument.message);
await simulateAsyncPause();
console.log( 'INSERT',next.fullDocument.message);
break;
case 'update':
io.emit('output', next.updateDescription.updatedFields.message);
console.log('UPDATE', next.updateDescription.updatedFields.message);
}
});
await simulateAsyncPause();
await chat.insertOne({
name: "John Johnson",
message: "No bytes, no problem. Just insert a document, in MongoDB",
});
await simulateAsyncPause();
io.on('connection', socket => {
// Handle input events
socket.on('input', async data => {
let name = data.name;
let message = data.message;
await chat.insertOne({name: name, message: message}, () => {
socket.emit('output', [data])
console.log('OUTPUT', [data])
})
});
socket.on('disconnect', () => {
console.log(`disconnect: ${socket.id}`);
});
})
await changeStream.close();
console.log("closed the change stream");
} finally {
// Ensures that the client will close when you finish/error
await client.close();
}
}
run().catch(console.dir);
const PORT = 4000;
server.listen(PORT, console.log('IO RUNNING on PORT 4000')) ```
Thank you for your help
Hi Ribert_Walters, please can you point me to any docs or resources on multiple kafka source/sink topics - can’t find any from the community or mongodb docs or stackoverflow.