Docs Menu
Docs Home
/ /

Personalizar una canalización para filtrar eventos de cambio

Este ejemplo de uso demuestra cómo configurar una canalización para personalizar los datos que consume el conector de origen de MongoDB Kafka. Una canalización es una canalización de agregación de MongoDB compuesta por instrucciones dirigidas a la base de datos para filtrar o transformar datos.

MongoDB notifica al conector los cambios de datos que coinciden con su canalización de agregación en un flujo de cambios. Un flujo de cambios es una secuencia de eventos que describe los cambios de datos que un cliente realizó en una implementación de MongoDB en tiempo real. Para obtener más información, consulte la entrada del manual del servidor MongoDB en Cambiar corrientes.

Supongamos que está coordinando un evento y desea recopilar los nombres y las horas de llegada de cada invitado. Cada vez que un invitado se registra, una aplicación inserta un nuevo documento con la siguiente información:

{
"_id": ObjectId(...),
"eventId": 321,
"name": "Dorothy Gale",
"arrivalTime": 2021-10-31T20:30:00.245Z
}

Puedes definir tu conector pipeline configuración para indicar al flujo de cambios que filtre la información del evento de cambio de la siguiente manera:

  • Cree eventos de cambio para operaciones de inserción y omita eventos para todos los demás tipos de operaciones.

  • Cree eventos de cambio solo para los documentos que coincidan con el valor fullDocument.eventId "321" y omita todos los demás documentos.

  • Omita los campos _id y eventId del objeto fullDocument usando una proyección.

Para aplicar estas transformaciones, asigne la siguiente canalización de agregación a su configuración pipeline:

pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]

Importante

Asegúrese de que los resultados de la canalización contengan _id los ns campos de nivel superior y del payload objeto. MongoDB usa id como valor del token de resumen y ns para generar el nombre del tema de salida de Kafka.

Cuando la aplicación inserta el documento de muestra, el conector configurado publica el siguiente registro en su tema de Kafka:

{
...
"payload": {
_id: { _data: ... },
"operationType": "insert",
"fullDocument": {
"name": "Dorothy Gale",
"arrivalTime": "2021-10-31T20:30:00.245Z",
},
"ns": { ... },
"documentKey": {
_id: {"$oid": ... }
}
}
}

Para obtener más información sobre cómo administrar flujos de cambios con el conector de origen, consulte la documentación del conector en Cambiar corrientes.

Volver

Ejemplos de uso