Docs Menu
Docs Home
/ /

Especificar un esquema

Este ejemplo de uso demuestra cómo configurar el conector de origen de MongoDB Kafka para aplicar un esquema personalizado a los datos. Un esquema es una definición que especifica la estructura y el tipo de información de los datos en un tema de Apache Kafka. Utilice un esquema cuando necesite garantizar que los datos del tema que utiliza el conector de origen tengan una estructura coherente.

Para obtener más información sobre el uso de esquemas con el conector, consulte la Guía de aplicación de esquemas.

Supongamos que su aplicación registra los datos de clientes en una colección de MongoDB y desea publicarlos en un tema de Kafka. Quiere que los suscriptores de los datos de clientes reciban datos con un formato uniforme. Decide aplicar un esquema a sus datos.

Sus necesidades y sus soluciones son las siguientes:

Requisito
Solución

Recibir datos de clientes de una colección de MongoDB

Configure a MongoDB source connector to receive updates to data from a specific database and collection.

Proporcionar el esquema de datos del cliente

Specify a schema that corresponds to the structure and data types of the customer data.

Omitir metadatos de Kafka de los datos del cliente

Include only the data from the fullDocument field.

Para obtener el archivo de configuración completo que cumple con los requisitos anteriores, consulte Especifique la configuración.

Para configurar el conector de origen para que reciba datos de una colección de MongoDB, especifique la base de datos y el nombre de la colección. En este ejemplo, puede configurar el conector para que lea de la colección purchases en la base de datos customers de la siguiente manera:

database=customers
collection=purchases

Un documento de datos de clientes de muestra de su colección contiene la siguiente información:

{
"name": "Zola",
"visits": [
{
"$date": "2021-07-25T17:30:00.000Z"
},
{
"$date": "2021-10-03T14:06:00.000Z"
}
],
"goods_purchased": {
"apples": 1,
"bananas": 10
}
}

A partir del documento de muestra, decide que su esquema debe presentar los campos utilizando los siguientes tipos de datos:

Nombre del campo
Tipos de datos
Descripción

Nombre

Name of the customer

visitas

Fechas en que el cliente visitó

bienes_comprados

Mapa de cadena (el tipo asumido) a valores enteros

Nombres de los productos y cantidad de cada artículo que compró el cliente

Puede describir sus datos utilizando el formato de esquema Apache Avro como se muestra en el esquema de ejemplo a continuación:

{
"type": "record",
"name": "Customer",
"fields": [{
"name": "name",
"type": "string"
},{
"name": "visits",
"type": {
"type": "array",
"items": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
},{
"name": "goods_purchased",
"type": {
"type": "map",
"values": "int"
}
}
]
}

Importante

Convertidores

Si desea enviar sus datos a través de Apache Kafka con codificación binaria Avro, debe usar un conversor Avro. Para más información, consulte la guía sobre conversores.

El conector publica los documentos de datos del cliente y los metadatos que describen el documento en un tema de Kafka. Puede configurar el conector para que incluya solo los datos del documento contenidos en el campo fullDocument del registro mediante la siguiente configuración:

publish.full.document.only=true

Para más información sobre el campo fullDocument, consulta la guía Change Streams.

La configuración de su conector de esquema personalizado debe parecerse a la siguiente:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your MongoDB connection URI>
database=customers
collection=purchases
publish.full.document.only=true
output.format.value=schema
output.schema.value={\"type\": \"record\", \"name\": \"Customer\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"visits\", \"type\": {\"type\": \"array\", \"items\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}}, {\"name\": \"goods_purchased\", \"type\": {\"type\": \"map\", \"values\": \"int\"}}]}
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

Nota

Esquema incrustado

En la configuración anterior, el conversor de esquemas JSON de Kafka Connect integra el esquema personalizado en los mensajes. Para obtener más información sobre el conversor de esquemas JSON, consulte la guía de conversores.

Para obtener más información sobre cómo especificar esquemas, consulte la guía Aplicar esquemas.

Volver

Copiar datos existentes