BulkWriteError - MongoBulkWriteException

Hi All,

I am just trying to get data from mongodb atlas collection via mongodb-kafka-source-connector to redpanda then back to a different collection in mongodb atlas via mongodb-kafka-sink-connector.

I managing to consume the data but then unable to send to mongodb atlas - getting exception —>

connect  | [2022-07-27 17:51:46,250] ERROR Failed to put into the sink some records, see log entries below for the details (com.mongodb.kafka.connect.sink.MongoSinkTask)
connect  | com.mongodb.MongoBulkWriteException: Bulk write operation error on server cluster0-shard-00-02.ebt7p.mongodb.net:27017. Write errors: [BulkWriteError{index=0, code=2, message='unknown operator: $oid', details={}}]. 
{"name": "mongo-ts-sink",
    "config": {
      "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max":"1",
      "topics": "ChatData.socketIo-MongoDb.chat",
      "connection.uri":"",
      "database":"socketIo-MongoDb",
      "collection":"chatfeed",
      "key.converter":"org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable":false,
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":false,
      "publish.full.document.only": true
    } 
}

Although when i run

docker exec -ti redpanda rpk topic consume ChatData.socketIo-MongoDb.chat

{
  "topic": "ChatData.socketIo-MongoDb.chat",
  "key": "{\"_id\": {\"_data\": \"8262E17ECA000000092B022C0100296E5A1004FC2D06E10EF64CA2967AEFB29F6E510B46645F6964006462E17ECA1F262FEED5EFB6520004\"}}",
  "value": "{\"_id\": {\"$oid\": \"62e17eca1f262feed5efb652\"}, \"name\": \"John Johnson\", \"message\": \"Bonjour\"}",
  "timestamp": 1658945232072,
  "partition": 2,
  "offset": 0
}
Atlas atlas-7j1r6x-shard-0 [primary] socketIo-MongoDb> db.chatfeed.insertOne( { 'test' : 1 } )

– > works fine

Thank you for your help.

try changing the key and value converter to

"key.converter": "org.apache.kafka.connect.storage.StringConverter",

and

"value.converter": "org.apache.kafka.connect.storage.StringConverter",

1 Like

Thank you Robert_Walters,

its now working, now encountering problems with inserting data into mongodb

My watch of the stream is not working, returning nothing, no error displayed and can’t insert data into the chat collection either but can get data from the chatfeed collection.

const express = require('express')
const app = express()
const { MongoClient } = require("mongodb");
// const { Kafka, KafkaJSBrokerNotFound, CompressionTypes, logLevel, Partitioners } = require('kafkajs')
// const cors = require('cors')
// const dotenv = require('dotenv')
const http = require('http');
const server = http.createServer(app);



const { Server } = require('socket.io')
// const io = require('socket.io')

const io = new Server(server, {
  cors: {
    origin: "http://localhost:3000" 
  }
});


console.log('MongoDB connected...');



//Connect to MongoDB
const uri = "mongodb+srv://<username/password@cluster0.ebt7p.mongodb.net/?retryWrites=true&w=majority";

const client = new MongoClient(uri);


const simulateAsyncPause = () =>
  new Promise(resolve => {
    setTimeout(() => resolve(), 1000);
  });


let changeStream;


async function run() {

  try {

  // Select DB and Collection
    await client.connect();
   
    const database = client.db("socketIo-MongoDb");

    const chat = database.collection("chat");

    const chatfeed= database.collection("chatfeed");

            
             chatfeed.find().limit(100).sort({_id:1}).toArray((err, res) => {
                      if(err){
                          throw err;
                      }

                      
                      io.on('connection', socket => {

                      // Emit the messages
                      socket.emit('output', res);

                      socket.on('disconnect', () => {
                        console.log(`disconnect: ${socket.id}`);
                      });

                    })
              
               })
          

            // open a Change Stream on the "chatfeed" collection
              changeStream = chatfeed.watch();

              // set up a listener when change events are emitted
              changeStream.on("change", async next => {
                // process any change next
                console.log("received a change to the collection: \t", next);

                         // process any change next
                  switch (next.operationType) {
                      case 'insert':


                         await simulateAsyncPause();
                      
                          io.emit('output', next.fullDocument.message);

                           await simulateAsyncPause();


                          console.log( 'INSERT',next.fullDocument.message);


                          break;


                      case 'update':

                       
                          io.emit('output', next.updateDescription.updatedFields.message);

                       

                          console.log('UPDATE', next.updateDescription.updatedFields.message);
                  }
              });


              await simulateAsyncPause();

              await chat.insertOne({
                name: "John Johnson",
                message: "No bytes, no problem. Just insert a document, in MongoDB",
              });

               await simulateAsyncPause();

            

          io.on('connection',  socket => {
            // Handle input events
           socket.on('input',  async data => {

                

                    let name = data.name;
                    let message = data.message;

                   

                     await chat.insertOne({name: name, message: message}, () => {
                      
                           socket.emit('output', [data])

                             console.log('OUTPUT', [data])
                           
                     })

                   

                    });

                     socket.on('disconnect', () => {
                         console.log(`disconnect: ${socket.id}`);
                       });


         
                  })
              
             await changeStream.close();
              
              console.log("closed the change stream");

            
           


         



  } finally {

    // Ensures that the client will close when you finish/error
    await client.close();

  }
}
run().catch(console.dir);

const PORT = 4000;


server.listen(PORT, console.log('IO RUNNING on PORT 4000')) ```

Thank you for your help

it might be something with how you are leveraging Change Stream, check out this article Change Streams & Triggers with Node.js Tutorial | MongoDB

Hi Ribert_Walters, please can you point me to any docs or resources on multiple kafka source/sink topics - can’t find any from the community or mongodb docs or stackoverflow.

Thank you

What do you mean by multiple source/sink topics?

I mean how can I add more than one topic in the kafka sink connector?

Is it good design to have more than one sink from the one source?

Yes see the Dynamic namespace and topic mapping described here
MongoDB Connector for Apache Kafka 1.4 Available Now | MongoDB Blog

You can scale the sink by increasing the workers read the tuning the sink connector portion of this blog

1 Like

Thank you Robert_Walters, have tried mapping the topics but not working well for me, maybe i have configured the sink incorrectly →

    "config": {
      "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
      "tasks.max":"1",
      "topics": "ChatData.socketIo-MongoDb.chat",
      "connection.uri":"mongodb+srv://username/password@cluster0.ebt7p.mongodb.net/?retryWrites=true",
      "database":"socketIo-MongoDb",
      "collection":"chatfeed",
      "key.converter":"org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable":false,
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":false,
      "publish.full.document.only": true,
      "namespace.mapper":"com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper",
      "namespace.mapper.value.collection.field":{"chat": "ChatData.socketIo-MongoDb.chat", "userProfile": "ChatData.socketIo-MongoDb.userProfile"}
    } 
}```

Thank you