MongoDB Kafka Connector Sink Fails, Source works

Hi all.

Got the connector installed, the source side works, which tells me I got it correctly installed,

Trying to do a sink, and the connector is failing on the Confluent side.

I’m posting below both my source and sink as examples. Confluent logs are not very helpful,

this is a local Mongodb running in a container and a local deployed Confluent stack, the one outside of containers…

I’ve also tried posting directly to Kafka topic and sinking that into a collection, same error/result, sink fails.

G

curl -X POST \
-H "Content-Type: application/json" \
--data '
     {"name": "mongo-source",
      "config": {
         "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
         "connection.uri":"mongodb://localhost:27017/?directConnection=true",
         "database":"quickstart",
         "collection":"sampleData",
         "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
         }
     }
     ' \
http://localhost:8083/connectors -w "\n"

curl -X POST \
-H "Content-Type: application/json" \
--data '
     {"name": "mongo-sink",
      "config": {
         "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
         "connection.uri":"mongodb://localhost:27017/?directConnection=true",
         "database":"quickstart",
         "collection":"topicData",
         "topics":"quickstart.sampleData",
         "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
         }
     }
     ' \
http://localhost:8083/connectors -w "\n"

Below is the sink connector that takes messages posted directly onto a kafka topic, (not sourced via a source connector, fails similar to above.

curl -X POST \
-H "Content-Type: application/json" \
--data '
     {"name": "mongo-creator-sink",
      "config": {
         "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
         "connection.uri":"mongodb://localhost:27017/?directConnection=true",
         "database":"MongoCom0",
         "collection":"creator_salesbasket",
         "topics":"mongocreator_basket",
         "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
         }
     }
     ' \
http://localhost:8083/connectors -w "\n"
`

increase the connect log level… got the below line…
not sure why it’s trying to convert the data to Avro ?

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mongocreator_basket to Avro:

I modified the sink to the below andre created it… still getting below error ?

curl -X POST \
-H "Content-Type: application/json" \
--data '
     {"name": "mongo-creator-sink",
      "config": {
         "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
         "connection.uri":"mongodb://localhost:27017/?directConnection=true",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "database":"MongoCom0",
         "collection":"creator_salesbasket",
         "topics":"mongocreator_basket",
         "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
         }
     }
     ' \
http://localhost:8083/connectors -w "\n"

Added the value converter above

[2024-05-20 20:13:56,031] ERROR [mongo-sink|task-0] [Consumer clientId=connector-consumer-mongo-sink-0, groupId=connect-mongo-sink] Unable to find FetchSessionHandler for node 0. Ignoring fetch response. (org.apache.kafka.clients.consumer.internals.AbstractFetch:128)
[2024-05-20 20:13:56,035] ERROR [mongo-creator-sink|task-0] [Consumer clientId=connector-consumer-mongo-creator-sink-0, groupId=connect-mongo-creator-sink] Unable to find FetchSessionHandler for node 0. Ignoring fetch response. (org.apache.kafka.clients.consumer.internals.AbstractFetch:128)
[2024-05-20 20:22:58,092] ERROR [mongo-creator-sink|task-0] WorkerSinkTask{id=mongo-creator-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:237)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:166)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:531)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:509)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:243)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:212)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic mongocreator_basket to Avro:
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:148)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:531)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
	... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:603)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:390)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:264)
	at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:199)
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:126)
	... 17 more
[2024-05-20 20:22:58,581] ERROR [mongo-creator-sink|task-0] [Consumer clientId=connector-consumer-mongo-creator-sink-0, groupId=connect-mongo-creator-sink] Unable to find FetchSessionHandler for node 0. Ignoring fetch response. (org.apache.kafka.clients.consumer.internals.AbstractFetch:128)

there is just something about battling with something and getting it working…
below is the sink connector that worked… :slight_smile:

As can be seen added a key converter and a value.converter.schemas.enabled: false


  curl -X POST \
  -H "Content-Type: application/json" \
  --data '
      {"name": "mongo-creator-sink",
        "config": {
          "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
          "connection.uri":"mongodb://localhost:27017/?directConnection=true",
          "key.converter": "org.apache.kafka.connect.storage.StringConverter",
          "value.converter": "org.apache.kafka.connect.json.JsonConverter",
          "value.converter.schemas.enable": false,
          "database":"MongoCom0",
          "collection":"creator_salesbasket",
          "topics":"mongocreator_basket"
          }
      }
      ' \
  http://localhost:8083/connectors -w "\n"