Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Copiar datos existentes

Estos ejemplos de uso demuestran cómo copiar datos de MongoDB a un tema de Apache Kafka utilizando el conector de origen Kafka de MongoDB.

Los siguientes ejemplos muestran cómo configurar tu conector de origen para copiar los datos existentes de una sola colección o de varias colecciones.

Supongamos que deseas copiar una colección de MongoDB a Apache Kafka y filtrar algunos datos.

Tus requerimientos y soluciones son los siguientes:

Requisito
Solución

Copia el customers colección de la base de datos shopping en tu implementación de MongoDB en un tema de Apache Kafka.

See the Copy Data section of this guide.

Copie únicamente los documentos que tengan el valor "México" en el campo country.

See the Filter Data section of this guide.

La colección customers contiene los siguientes documentos:

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

Copia el contenido de la colección customers de la base de datos shopping especificando las siguientes opciones de configuración en tu conector de origen:

database=shopping
collection=customers
startup.mode=copy_existing

Tu conector de origen copia tu colección creando documentos de eventos de cambio que describen la inserción de cada documento en tu colección.

Nota

La copia de datos puede producir eventos duplicados

Si algún sistema cambia los datos en la base de datos mientras el conector de origen convierte los datos existentes de ella, MongoDB puede producir eventos de flujo de cambios duplicados para reflejar los últimos cambios. Dado que los eventos de flujo de cambios de los cuales depende la copia de datos son idempotentes, los datos copiados son eventualmente coherentes.

Para obtener más información sobre los documentos de eventos de cambio, consulta la guía Change Streams.

Para obtener más información sobre la opción startup.mode, consulte Propiedades de empresa emergente.

Puedes filtrar datos especificando un pipeline de agregación en la opción startup.mode.copy.existing.pipeline de tu configuración del conector de origen. La siguiente configuración especifica una pipeline de agregación que coincide con todos los documentos que tienen "México" en el campo country:

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

Para obtener más información sobre la opción startup.mode.copy.existing.pipeline, consulte Propiedades de empresa emergente.

Para aprender más sobre los pipelines de agregación, consulta los siguientes recursos:

  • Personaliza un pipeline para filtrar los eventos de cambio Ejemplo de uso

  • Agregación en el manual de MongoDB.

Su configuración final del conector fuente para copiar la colección customers debe verse así:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

Una vez que el conector copia los datos, verá el siguiente documento de evento de cambio correspondiente a la colección de muestras anterior en el shopping.customers tema de Apache Kafka:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

Nota

Escribe los datos de tu tema en una colección

Usar un manejador de captura de datos de cambios para convertir documentos de eventos de cambios en un tema de Apache Kafka en operaciones de guardar en MongoDB. Para obtener más información, consulta la guía Manejadores de Captura de Cambios de Datos.

Supongamos que quieres copiar datos de otra colección de la base de datos shopping llamada products, que contiene el siguiente documento:

{
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
}

Puedes copiar tanto de las colecciones customers como de las products usando la configuración startup.mode.copy.existing.namespace.regex, como se muestra en el siguiente código:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
startup.mode=copy_existing
startup.mode.copy.existing.namespace.regex=^shopping\.(customers|products)$

Además del documento del evento de cambio en el tema de shopping.customers Apache Kafka, descrito en la sección anterior, puede ver el siguiente documento en el tema shopping.products:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
},
"ns": { "db": "shopping", "coll": "products" }
}

Tip

Para obtener más información sobre la configuración startup.mode.copy.existing.namespace.regex, consulta la tabla de configuraciones en la guía de Propiedades de empresa emergente.

Volver

Nombramiento de temas

En esta página