I want to send data from one collection in mongodb to another collection using kafka.
The source connector is debezium, and the sink connector is mongodb sink connector.
I used handlers for update and delete, but an error [ Error: operationType
field is doc is missing.] occurs.
When sending from the debezium source connector to kafka, the operationType field seems to be sent to ‘op’ field, and update and delete are sent to u and d respectively.
.......
"source":{"version":"1.9.2.Final","connector":"mongodb","name":"metaDB","ts_ms":1660575739000,"snapshot":"false","db":"metaDB","sequence":null,"rs":"rs01","collection":"conntest","ord":1,"h":null,"tord":null,"stxnid":null,"lsid":null,"txnNumber":null},"op":"d","ts_ms":1660575739313,"transaction":null}}
.......
Is there an option to make this information known to the mongodb sink connector?
The options I used for the connector are:
[source]
{
"name":"mongosrc02",
"config":{
"connector.class":"io.debezium.connector.mongodb.MongoDbConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schema.enable" : "true",
"value.converter.schema.enable" : "true",
"mongodb.hosts":"rs01/10.20.19.172:27017",
"mongodb.name":"metadbtest",
"mongodb.user" : "mongoadm",
"mongodb.password" : "mongoadm",
"collection.include.list": "metaDB.conntest",
"tombstones.on.delete" : "false"
}
}
[sink]
{
"name": "mongo-sk-002",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://10.20.19.172:27017",
"database":"metaDB",
"collection":"newconntesthdr",
"topics":"metaDB.metaDB.conntest",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schema.enable" : "true",
"value.converter.schema.enable" : "true",
"mongo.errors.tolerance": "all",
"mongo.errors.log.enable": "true",
"mongodb.user": "mongoadm",
"mongodb.password": "mongoadm",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}