Mongo-Kafka source connector change stream return string?

Hi,

I’m using MongoSourceConnector, to connect a Kafka I’m getting the message on the topic without a problem, but when I wanna try to do a schema-registry from this I’m getting this:

{“schema”:{“type”:“string”,“optional”:false}

On the schema registry:
curl -X GET http://schema-registry:8081/subjects/mongo.test.pageviews-value/versions/1/schema
“string”

Looks if the change stream returns a JSON string, any way to change this to return a JSON?

Driver connector:

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
	 "name": "sr-connector-test",
	 "config": {
		"key.converter":"io.confluent.connect.avro.AvroConverter",
		"key.converter.schema.registry.url":"http://schema-registry:8081",
		"value.converter":"io.confluent.connect.avro.AvroConverter",
		"value.converter.schema.registry.url":"http://schema-registry:8081",
		"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
		"key.converter.schemas.enable":"true",
		"value.converter.schemas.enable":"true",
		"connection.uri":"mongodb://uri:H666jcPS@mongo-shard-00-00.mongodb.net:27017,mongo-shard-00-01.mongodb.net:27017,mongo-shard-00-02.mongodb.net:27017/db.Collection?ssl=true&authSource=admin&replicaSet=mongo--shard-0",
		"database":"databa-Name",
		"collection":"Collection",
		"topic.prefix": "sr_topic",
		"publish.full.document.one": "true"
	}}'

Connector Status:

{"name":"sr-connector-test","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

Topic returned:
//Always return the msg with double quotes " "
"{"_id": {"_data": {"$binary": "gl5O1i8AARmVwXaNDYq7CWduUosquBA==", "$type": "00"}}, "operationType": "update", "ns": {"db": "name", "coll": "tx"}, "documentKey": {"_id": {"$oid": "5e4de1f01b8406"}}, "updateDescription": {"updatedFields": {"updated": {"$date": 15824943562}}, "removedFields": []}}"

Thanks @Ross_Lawley for you answer, was very helpful.

1 Like

Hi @Ross_Lawley,

I’m currently having similar problems, however I can’t find the issue you refer to.

Do you have any updates regarding this?

Thanks,
Miguel

My Apologies, please check out KAFKA-124 instead.

Ross

Thank you very much @Ross_Lawley, it seems to be exactly what we need to achieve our goal.

Do you have any idea or roadmap on when this would be put into a release? (To be accessed via Confluent Hub for example)

Miguel

I am creating a demo that leverages this enhancement in addition to the other stuff that will be in 1.3. Here is a link to the demo, https://github.com/RWaltersMA/kafka1.3. The JAR file in the github is just a snapshot build and still in development so don’t use in production. If you have any questions LMK, also if it fits your needs or have suggestions we’d love to know too.