Mongo Db sync to Elastic

Hi,

I am trying to set up a sync from MongoDBSourceConnector to ElasticSearch. I am able to run them independently, but running them concurrently is generating an exception at elastic search level.

MongoDb-coonect-source.properties

name=MongoDBSourceConnector
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=mongodb://localhost:27017
database=test
collection=inventory
pipeline=[{"$match": { “$or”: [{“operationType”: “insert”}, {“operationType”: “update”}, {“operationType”: “delete”}]}}]
initial.sync.max.threads=1
tasks.max=1
change.stream.full.document=updateLookup
publish.full.document.only=true
topic.namespace.map={"*": “connect24-topic”}
errors.log.enable=true

elasticsearch-connect.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=connect24-topic
topic.index.map=logs:transactions
connection.url=http://localhost:9200
type.name=log
key.ignore=true
schema.ignore=true
errors.tolerance=all
behavior.on.malformed.documents=ignore

connect-standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets

When I insert some document into mongo Db - the topic receives the data as below
“{”_id": {"$oid": “6257e54bc35c3a5edfdbb364”}, “value”: “Kainth”}"

because of the extra " present at start and end elastic search is failing with below exception
Failed to execute bulk request due to 'org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes

what configs do i need to provide in which properties file to expect the outcome as below

{"_id": {"$oid": “6257e54bc35c3a5edfdbb364”}, “value”: “Kainth”}
or
{"_id": {"$oid": “6257e54bc35c3a5edfdbb364”}, “value”: “Kainth”}

Regards:
Harinder Singh