Este ejemplo de uso demuestra cómo configurar un pipeline para personalizar los datos que consume tu conector de origen MongoDB Kafka. Un pipeline es una pipeline de agregación de MongoDB compuesta por instrucciones para la base de datos con el fin de filtrar o transformar datos.
MongoDB notifica al conector de los cambios de datos que coinciden con tu pipeline de agregación en un flujo de cambios. Un flujo de cambios es una secuencia de eventos que describe los cambios de datos que un cliente realizó en una implementación de MongoDB en tiempo real. Para obtener más información, consulte la entrada del manual del servidor de MongoDB sobre Flujos de cambio.
Ejemplo
Supongamos que estás coordinando un evento y deseas recopilar los nombres y horas de llegada de cada invitado a un evento específico. Cada vez que un invitado se registra en el evento, una aplicación inserta un nuevo documento que contiene los siguientes detalles:
{ "_id": ObjectId(...), "eventId": 321, "name": "Dorothy Gale", "arrivalTime": 2021-10-31T20:30:00.245Z }
Puedes definir tu conector pipeline configuración para instruir al flujo de cambios que filtre la información del evento de cambio de la siguiente manera:
Crea eventos de cambio para las operaciones de inserción y omite eventos para todos los demás tipos de operaciones.
Crear eventos de cambio sólo para los documentos que coincidan con el valor
fullDocument.eventId"321" y omitir todos los demás documentos.Omitir los campos
_idyeventIddel objetofullDocumentusando una proyección.
Para aplicar estas transformaciones, asigne la siguiente pipeline de agregación a su configuración pipeline :
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]
Importante
Asegúrese de que los resultados de la pipeline contengan el campo _id de nivel superior del objeto payload, que MongoDB utiliza como el valor del token de reanudación.
Cuando la aplicación inserta el documento de muestra, el conector configurado publica el siguiente registro en su tema de Kafka:
{ ... "payload": { _id: { _data: ... }, "operationType": "insert", "fullDocument": { "name": "Dorothy Gale", "arrivalTime": "2021-10-31T20:30:00.245Z", }, "ns": { ... }, "documentKey": { _id: {"$oid": ... } } } }
Para obtener más información sobre la gestión de flujos de cambio con el conector de origen, consulta la documentación del conector en Flujos de cambio.