Hi,
I am trying to set up a sync from MongoDBSourceConnector to ElasticSearch. I am able to run them independently, but running them concurrently is generating an exception at elastic search level.
MongoDb-coonect-source.properties
name=MongoDBSourceConnector
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=mongodb://localhost:27017
database=test
collection=inventory
pipeline=[{"$match": { “$or”: [{“operationType”: “insert”}, {“operationType”: “update”}, {“operationType”: “delete”}]}}]
initial.sync.max.threads=1
tasks.max=1
change.stream.full.document=updateLookup
publish.full.document.only=true
topic.namespace.map={"*": “connect24-topic”}
errors.log.enable=true
elasticsearch-connect.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=connect24-topic
topic.index.map=logs:transactions
connection.url=http://localhost:9200
type.name=log
key.ignore=true
schema.ignore=true
errors.tolerance=all
behavior.on.malformed.documents=ignore
connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
When I insert some document into mongo Db - the topic receives the data as below
“{”_id": {"$oid": “6257e54bc35c3a5edfdbb364”}, “value”: “Kainth”}"
because of the extra " present at start and end elastic search is failing with below exception
Failed to execute bulk request due to 'org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
what configs do i need to provide in which properties file to expect the outcome as below
{"_id": {"$oid": “6257e54bc35c3a5edfdbb364”}, “value”: “Kainth”}
or
{"_id": {"$oid": “6257e54bc35c3a5edfdbb364”}, “value”: “Kainth”}
Regards:
Harinder Singh