MongoDB Kafka Sink Connector does not update documents

0

I am new to Kafka connector. I use Kafka connector to send “events” to Kafka topics with the same name as the entity. After producing that, the events send to mongo by MongoDB Kafka Sink Connector for storing the state of the entity objects in the collection as the same name as the topic.
When I do insert a new object, it works fine but When I send an update event with (_id), this message inserts in the topic and create a new document on MongoDB instead of updating the document

(_id) of this document store as an object instead of objectId .

for id of domain and updatedEvent I use ObjectId :


   [BsonElement("_id")]
    [JsonProperty("_id")]
    [BsonId]
    [BsonRepresentation(BsonType.ObjectId)]
    public ObjectId _id { get; set; }

and My Connector config:


0

I am new to Kafka connector. I use Kafka connector to send “events” to Kafka topics with the same name as the entity. After producing that, the events send to mongo by MongoDB Kafka Sink Connector for storing the state of the entity objects in the collection as the same name as the topic.
When I do insert a new object, it works fine but When I send an update event with (_id), this message inserts in the topic and create a new document on MongoDB instead of updating the document

(_id) of this document store as an object (figure 1) instead of objectId (figure 2).
figure 1 :
image

for id of domain and updatedEvent I use ObjectId :


   [BsonElement("_id")]
    [JsonProperty("_id")]
    [BsonId]
    [BsonRepresentation(BsonType.ObjectId)]
    public ObjectId _id { get; set; }

and My Connector config:


name=mongo-sink
topics.regex=\\w+$

connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1

key.ignore=true
connection.uri=mongodb://localhost:27017
database=aplicationDB

max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Hello sa_N_A,

If I understand correctly, you want to replace the document if the _id already exists. You may not (can’t tell from the details) insert a new document if the _id does not exist.

I would recommend you look into the write strategies that are possible with the connector located here: https://www.mongodb.com/docs/kafka-connector/current/sink-connector/configuration-properties/write-strategies/