I’m currently integrating MongoDB’s Kafka Source Connector with a Confluent Kafka cluster. My source connector sends the change events stream data from my database into Kafka, however I would like to know how I could integrate this connector with Schema Registry.
My setup is using Kafka from a Confluent server, then I have a docker container with KSQL and Kafka Connect embedded. This Kafka Connect currently only has the MongoDB Source Connector.
This is my connector.properties file to configure my Kafka Connect:
# Generic Connector Configs group.id=ksql-connect-cluster bootstrap.servers="https://confluent:broker:uri" security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config= org.apache.kafka.common.security.plain.PlainLoginModule required \ username="USERNAME" password="Password; producer.ssl.endpoint.identification.algorithm=https producer.sasl.mechanism=PLAIN producer.request.timeout.ms=20000 producer.retry.backoff.ms=500 producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="USERNAME" password="Password"; producer.security.protocol=SASL_SSL key.converter=io.confluent.connect.avro.AvroConverter value.converter=io.confluent.connect.avro.AvroConverter key.converter.schemas.enable=true value.converter.schemas.enable=true value.converter.enhanced.avro.schema.support=true internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter # Schema Registry credentials value.converter.schema.registry.url=https://schemaregistryurl value.converter.basic.auth.credentials.source=USER_INFO value.converter.schema.registry.basic.auth.user.info=USERNAME:PASSWORD
This is how I set up MongoDB Source Connector properties:
I configured the converters to use the AvroConverter and also gave the credentials for the Schema Registry, however, when I check the Kafka’s topic to which the events are sent, instead of the schema of the change event streams data, Confluent Schema Registry shows me the following schema:
We want to use KSQL to apply transformations to the messages running through this topic that receives the change events streams, however, when I try to create a stream listening to one of these topics I receive the following error message:
The schema of the full document sent in these change events streams is extremely complex with many levels of nested objects and arrays, so having to set these schemas in AVRO manually would be very hard and error prone so we wanted to use KSQL schema inference to create these streams. This is currently not being possible due to the error displayed above which leads me to believe the problem may be in how we’re setting up our connector and consequently how we’re creating our topics and their respective AVRO schemas.
Our goal here would be to have an AVRO schema compatible with our change stream events. Is this possible to achieve automatically through the MongoDB Source connector or will I have to create the schemas manually so I can use KSQL schema inference?