Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Personaliza una pipeline para filtrar eventos de cambio

Este ejemplo de uso demuestra cómo configurar una canalización para personalizar los datos que consume el conector de origen de MongoDB Kafka. Una canalización es una canalización de agregación de MongoDB compuesta por instrucciones dirigidas a la base de datos para filtrar o transformar datos.

MongoDB notifica al conector de los cambios de datos que coinciden con tu pipeline de agregación en un flujo de cambios. Un flujo de cambios es una secuencia de eventos que describe los cambios de datos que un cliente realizó en una implementación de MongoDB en tiempo real. Para obtener más información, consulte la entrada del manual del servidor de MongoDB sobre Cambiar corrientes.

Supongamos que está coordinando un evento y desea recopilar los nombres y las horas de llegada de cada invitado. Cada vez que un invitado se registra, una aplicación inserta un nuevo documento con la siguiente información:

{
"_id": ObjectId(...),
"eventId": 321,
"name": "Dorothy Gale",
"arrivalTime": 2021-10-31T20:30:00.245Z
}

Puedes definir tu conector pipeline configuración para instruir al flujo de cambios que filtre la información del evento de cambio de la siguiente manera:

  • Crea eventos de cambio para las operaciones de inserción y omite eventos para todos los demás tipos de operaciones.

  • Cree eventos de cambio solo para los documentos que coincidan con el valor fullDocument.eventId "321" y omita todos los demás documentos.

  • Omita los campos _id y eventId del objeto fullDocument usando una proyección.

Para aplicar estas transformaciones, asigne la siguiente canalización de agregación a su configuración pipeline:

pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]

Importante

Asegúrese de que los resultados de la canalización contengan _id los ns campos de nivel superior y del payload objeto. MongoDB usa id como valor del token de resumen y ns para generar el nombre del tema de salida de Kafka.

Cuando la aplicación inserta el documento de muestra, el conector configurado publica el siguiente registro en su tema de Kafka:

{
...
"payload": {
_id: { _data: ... },
"operationType": "insert",
"fullDocument": {
"name": "Dorothy Gale",
"arrivalTime": "2021-10-31T20:30:00.245Z",
},
"ns": { ... },
"documentKey": {
_id: {"$oid": ... }
}
}
}

Para obtener más información sobre cómo administrar flujos de cambios con el conector de origen, consulte la documentación del conector en Cambiar corrientes.

Volver

Ejemplos de uso