Este ejemplo de uso demuestra cómo puedes configurar tu conector de origen Kafka de MongoDB para aplicar un esquema personalizado a tus datos. Un esquema es una definición que especifica la estructura y la información sobre el tipo de datos en un tema de Apache Kafka. Utiliza un esquema cuando debas asegurarte de que los datos sobre el tema que propone tu conector de origen tengan una estructura coherente.
Para obtener más información sobre el uso de esquemas con el conector, consulte la Guía de aplicar esquemas.
Ejemplo
Supongamos que tu aplicación realiza un seguimiento de los datos de los clientes en una colección de MongoDB, y quieres publicar estos datos en un tema de Kafka. Quieres que los suscriptores de los datos de cliente reciban datos formateados de manera coherente. Eliges aplicar un esquema a tus datos.
Tus requerimientos y soluciones son los siguientes:
Requisito | Solución |
|---|---|
Recibir datos de clientes de una colección de MongoDB | Configure a MongoDB source connector to receive updates to data
from a specific database and collection. |
Proporcione el esquema de datos del cliente | Specify a schema that corresponds to the structure and data types of
the customer data. |
Omitir metadatos de Kafka de los datos del cliente | Include only the data from the fullDocument field. |
Para el archivo de configuración completo que cumple con los requisitos anteriores, consulte Especifique la configuración.
Recibir datos de una colección
Para configurar el conector de origen para que reciba datos de una colección de MongoDB, especifique la base de datos y el nombre de la colección. En este ejemplo, puede configurar el conector para que lea de la colección purchases en la base de datos customers de la siguiente manera:
database=customers collection=purchases
Crea un esquema personalizado
Un documento de datos de clientes de muestra de su colección contiene la siguiente información:
{ "name": "Zola", "visits": [ { "$date": "2021-07-25T17:30:00.000Z" }, { "$date": "2021-10-03T14:06:00.000Z" } ], "goods_purchased": { "apples": 1, "bananas": 10 } }
A partir del documento de muestra, decide que su esquema debe presentar los campos utilizando los siguientes tipos de datos:
Nombre del campo | Tipos de datos | Descripción |
|---|---|---|
Nombre | Name of the customer | |
visitas | Fechas en que el cliente visitó | |
bienes adquiridos | Nombres de los productos y la cantidad de cada artículo que el cliente compró |
Puedes describir tus datos utilizando el formato de esquema Apache Avro como se muestra en el esquema de ejemplo a continuación:
{ "type": "record", "name": "Customer", "fields": [{ "name": "name", "type": "string" },{ "name": "visits", "type": { "type": "array", "items": { "type": "long", "logicalType": "timestamp-millis" } } },{ "name": "goods_purchased", "type": { "type": "map", "values": "int" } } ] }
Importante
Conversores
Si desea enviar sus datos a través de Apache Kafka con codificación binaria Avro, debe usar un conversor Avro. Para más información, consulte la guía sobre conversores.
Omitir metadatos de los registros publicados
El conector publica los documentos de datos del cliente y los metadatos que describen el documento en un tema de Kafka. Puedes configurar el conector para incluir solo los datos del documento contenidos en el campo fullDocument del registro utilizando la siguiente configuración:
publish.full.document.only=true
Para más información sobre el campo fullDocument, consulta la guía Change Streams.
Especificar la configuración
La configuración de su conector de esquema personalizado debe parecerse a la siguiente:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your MongoDB connection URI> database=customers collection=purchases publish.full.document.only=true output.format.value=schema output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]} value.converter.schemas.enable=true value.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter
Nota
Esquema integrado
En la configuración anterior, el convertidor de JSON schema de Kafka Connect integra el esquema personalizado en tus mensajes. Para obtener más información sobre el convertidor de JSON Schema, consulta la guía Convertidores.
Para obtener más información sobre cómo especificar esquemas, consulta la guía Aplicar esquemas.