Hi,
I am trying to configure Kafka Sink connector with avro schema value.
This is my configuration :
{
"name": "mongodb_user_management",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max":"1",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.enable":"false",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://127.0.0.1:8081",
"connection.uri":"mongodb+srv://arkcase:password@cluster0-elrzn.mongodb.net/test?retryWrites=true&w=majority",
"database":"UserDB",
"collection":"users",
"topics":"newuser",
"database.history.kafka.bootstrap.servers":"http://127.0.0.1:9092"
}
}
As you can see I am using StringConverter for key and AvroConverter for Value.
I am sending avro serialized message whit this avro schema :
{
"name": "User",
"namespace": "org.arkcase.avro",
"type": "record"
"fields": [
{
"name": "userId",
"type": "string"
}
{
"name": "name",
"type": "string"
},
{
"name": "lastName",
"type": "string"
},
{
"name": "audit",
"type": {
"fields": [
{
"name": "userId",
"type": "string"
},
{
"default": "127.0.0.1",
"name": "ipAddress",
"type": "string"
},
{
"name": "requestId",
"type": "string"
}
],
"name": "Audit",
"type": "record"
}
}
]
}
I am loading the connector and everything is okey until the first message is sent.
After that connector is down with the following exception :
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'user': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"user-001"; line: 1, column: 6]
Looks like avro converter is not working.
Any suggestions ?