Docs Menu
Docs Home
/ /

Copiar datos existentes

Estos ejemplos de uso demuestran cómo copiar datos de MongoDB a un tema de Apache Kafka utilizando el conector de origen MongoDB Kafka.

Los siguientes ejemplos muestran cómo configurar su conector de origen para copiar datos existentes de una sola colección o de varias colecciones.

Supongamos que desea copiar una colección de MongoDB a Apache Kafka y filtrar algunos datos.

Sus necesidades y sus soluciones son las siguientes:

Requisito
Solución

Copiar el customers colección de la base de datos shopping en su implementación de MongoDB en un tema de Apache Kafka.

See the Copy Data section of this guide.

Copie únicamente los documentos que tengan el valor "México" en el campo country.

See the Filter Data section of this guide.

La colección customers contiene los siguientes documentos:

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

Copie el contenido de la colección customers de la base de datos shopping especificando las siguientes opciones de configuración en su conector de origen:

database=shopping
collection=customers
startup.mode=copy_existing

Su conector de origen copia su colección creando documentos de eventos de cambio que describen la inserción de cada documento en su colección.

Nota

La copia de datos puede producir eventos duplicados

Si algún sistema modifica los datos de la base de datos mientras el conector de origen convierte los datos existentes, MongoDB puede generar eventos de flujo de cambios duplicados para reflejar los cambios más recientes. Dado que los eventos de flujo de cambios en los que se basa la copia de datos son idempotentes, los datos copiados son consistentes, cumpliendo con la garantía de entrega "al menos una vez".

Para obtener más información sobre los documentos de eventos de cambio, consulte la guía Flujos de cambio.

Para obtener más información sobre la startup.mode opción, consulte Propiedades de inicio.

Puedes filtrar datos especificando un pipeline de agregación en la opción startup.mode.copy.existing.pipeline de tu configuración del conector de origen. La siguiente configuración especifica una pipeline de agregación que coincide con todos los documentos que tienen "México" en el campo country:

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

Para obtener más información sobre la startup.mode.copy.existing.pipeline opción, consulte Propiedades de inicio.

Para obtener más información sobre las canalizaciones de agregación, consulte los siguientes recursos:

  • Ejemplo de uso parapersonalizar una canalización para filtrar eventos de cambio

  • Agregación en el manual de MongoDB.

La configuración final del conector de origen para copiar la colección customers debería verse así:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

Una vez que su conector copie sus datos, verá el siguiente documento de evento de cambio correspondiente a la colección de muestra anterior en el shopping.customers tema de Apache Kafka:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

Nota

Escribe los datos de tu tema en una colección

Utilice un controlador de captura de datos de cambio para convertir documentos de eventos de cambio en un tema de Apache Kafka en operaciones de escritura de MongoDB. Para obtener más información, consulte la guía Controladores de captura de datos de cambio.

Supongamos que desea copiar datos de otra colección en la base de datos shopping llamada products, que contiene el siguiente documento:

{
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
}

Puedes copiar tanto de las colecciones customers como de las products usando la configuración startup.mode.copy.existing.namespace.regex, como se muestra en el siguiente código:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
startup.mode=copy_existing
startup.mode.copy.existing.namespace.regex=^shopping\.(customers|products)$

Además del documento de evento de cambio shopping.customers en el shopping.products tema de Apache Kafka, descrito en la sección anterior, puede ver el siguiente documento en el tema:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
},
"ns": { "db": "shopping", "coll": "products" }
}

Tip

Para obtener más información sobre la startup.mode.copy.existing.namespace.regex configuración, consulte la tabla Configuraciones en la guía Propiedades de inicio.

Volver

Nombre del tema

En esta página