Overview
Esta guía le muestra cómo cambiar la forma en que su conector de receptor MongoDB Kafka escribe datos en MongoDB.
Puede cambiar la forma en que su conector escribe datos en MongoDB para casos de uso que incluyen los siguientes:
Insertar documentos en lugar de sobrescribirlos
Reemplazar o actualizar documentos que coincidan con un filtro distinto al
_idCampoEliminar documentos que coinciden con un filtro
Puede configurar cómo su conector escribe datos en MongoDB especificando una estrategia de modelo de escritura. Una estrategia de modelo de escritura es una clase que define cómo su conector receptor debe escribir datos mediante modelos de escritura. Un modelo de escritura es una interfaz del controlador Java de MongoDB que define la estructura de una operación de escritura.
Para aprender a modificar los registros de receptor que recibe su conector antes de que los escriba en MongoDB, lea la guía en Postprocesadores de conector de sumidero.
Para ver la implementación de una estrategia de modelo de escritura, consulte el código fuente del Clase InsertOneDefaultStrategy.
Operaciones de escritura masiva
El conector de sumidero escribe datos en MongoDB mediante operaciones de escritura masiva. Estas operaciones agrupan varias operaciones de escritura, como inserciones, actualizaciones o eliminaciones.
De forma predeterminada, el conector de sumidero realiza escrituras masivas ordenadas, lo que garantiza el orden de los cambios de datos. En una escritura masiva ordenada, si alguna operación de escritura genera un error, el conector omite las escrituras restantes de ese lote.
Si no necesita garantizar el orden de los cambios de datos, puede establecer el valor bulk.write.ordered en false para que el conector realice escrituras masivas desordenadas. El conector receptor realiza escrituras masivas desordenadas en paralelo, lo que puede mejorar el rendimiento.
Además, cuando habilita escrituras masivas desordenadas y establece la configuración errors.tolerance en all, incluso si alguna operación de escritura en su escritura masiva falla, el conector continúa realizando las operaciones de escritura restantes en el lote que no devuelven errores.
Tip
Para obtener más información sobre la configuración bulk.write.ordered, consulta la página de Propiedades de procesamiento de mensajes del Connector.
Para obtener más información sobre las operaciones de escritura masiva, consulte la siguiente documentación:
Cómo especificar estrategias de modelo de escritura
Para especificar una estrategia de modelo de escritura, utilice la siguiente configuración:
writemodel.strategy=<write model strategy classname>
Para obtener una lista de las estrategias de modelo de escritura prediseñadas incluidas en el conector, consulte la guía sobre configuraciones de estrategias de modelo de escritura.
Especificar una Clave de Negocio
Una clave de negocio es un valor compuesto por uno o más campos en su registro receptor que lo identifica como único. De forma predeterminada, el conector del receptor utiliza el campo _id del registro receptor para recuperar la clave de negocio. Para especificar una clave de negocio diferente, configure el postprocesador del sumador de ID de documento para que utilice un valor personalizado.
Puede configurar el Adder de Id. de documento para establecer el campo _id desde la clave de registro del receptor como se muestra en las siguientes propiedades de ejemplo:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy document.id.strategy.partial.key.projection.list=<comma-separated field names> document.id.strategy.partial.key.projection.type=AllowList
Como alternativa, puede configurarlo para establecer el campo _id a partir del valor del registro del receptor como se muestra en las siguientes propiedades de ejemplo:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=<comma-separated field names> document.id.strategy.partial.value.projection.type=AllowList
Importante
Mejorar el rendimiento de escritura
Cree un índice único en su colección de destino que corresponda a los campos de su clave de empresa. Esto mejora el rendimiento de las operaciones de escritura desde su conector receptor. Consulte la guía sobre índices únicos para obtener más información.
Las siguientes estrategias de modelo de guardado requieren una clave empresarial:
ReplaceOneBusinessKeyStrategyDeleteOneBusinessKeyStrategyUpdateOneBusinessKeyTimestampStrategy
Para obtener más información sobre el posprocesador del sumador de Id. de documento, consulte Configurar el posprocesador del sumador de Id. de documento.
Ejemplos
Esta sección muestra ejemplos de configuración y salida de las siguientes estrategias de modelo de escritura:
Estrategia de marcas de tiempo de Update One
Puede configurar la estrategia "Actualizar una marca de tiempo" para agregar y actualizar marcas de tiempo al escribir documentos en MongoDB. Esta estrategia realiza las siguientes acciones:
Cuando el conector inserta un nuevo documento MongoDB, establece los campos
_insertedTSy_modifiedTSen la hora actual en el servidor del conector.Cuando el conector actualiza un documento MongoDB existente, actualiza el campo
_modifiedTSa la hora actual en el servidor del conector.
Supongamos que desea rastrear la posición de un tren a lo largo de una ruta y su conector receptor recibe mensajes con la siguiente estructura:
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
Use ProvidedInValueStrategy para especificar que su conector debe usar el valor _id del mensaje para asignar el campo _id en su documento de MongoDB. Especifique su ID y escriba las propiedades de la estrategia del modelo como se indica a continuación:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
Después de que el conector de receptor procesa el registro de ejemplo anterior, inserta un documento que contiene los campos _insertedTS y _modifiedTS como se muestra en el siguiente documento:
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T15:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
Después de una hora, el tren informa su nueva ubicación a lo largo de su ruta con una nueva posición como se muestra en el siguiente registro:
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
Una vez que el conector del receptor procesa el registro anterior, inserta un documento que contiene los siguientes datos:
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T16:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
Para obtener más información ProvidedInValueStrategy sobre, consulte la sección sobre cómo configurar el posprocesador del sumador de Id. de documento.
Reemplazar una Estrategia de clave comercial
Puede configurar la estrategia "Reemplazar una clave de empresa" para reemplazar los documentos que coincidan con el valor de la clave de empresa. Para definir una clave de empresa en varios campos de un registro y configurar el conector para que reemplace los documentos que contengan claves de empresa coincidentes, realice las siguientes tareas:
Cree un índice único en su colección que corresponda a los campos clave de su negocio.
Especifique la estrategia de identificación
PartialValueStrategypara identificar los campos que pertenecen a la clave comercial en la configuración del conector.Especifique la estrategia del modelo de escritura
ReplaceOneBusinessKeyStrategyen la configuración del conector.
Suponga que desea rastrear la capacidad de aviones por número de vuelo y ubicación del aeropuerto representados por flight_no y airport_code, respectivamente. Un mensaje de ejemplo contiene la siguiente información:
{ "flight_no": "Z342", "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
Para implementar la estrategia, utilizando flight_no y airport_code como clave comercial, primero cree un índice único en estos campos en el shell de MongoDB:
db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })
A continuación, especifique los campos de estrategia PartialValueStrategy y clave de negocio en la lista de proyección. Especifique el ID y la configuración de la estrategia del modelo de escritura como se indica a continuación:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=flight_no,airport_code document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
Los datos de muestra insertados en la colección contienen lo siguiente:
{ "flight_no": "Z342" "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
Cuando el conector procesa datos de receptor que coinciden con la clave comercial del documento existente, reemplaza el documento con los nuevos valores sin cambiar los campos de la clave comercial:
{ "flight_no": "Z342" "airport_code": "LAX", "status": "canceled" }
Después de que el conector procesa los datos del receptor, reemplaza el documento de muestra original en MongoDB con el anterior.
Eliminar una estrategia clave de negocio
Puede configurar el conector para que elimine un documento cuando reciba mensajes que coincidan con una clave empresarial mediante la estrategia Eliminar una clave empresarial. Para establecer una clave empresarial desde varios campos de un registro y configurar el conector para que elimine un documento que contenga una clave empresarial coincidente, realice las siguientes tareas:
Cree un índice único en su colección MongoDB que corresponda a los campos clave de su negocio.
Especifique
PartialValueStrategycomo la estrategia de identificación para identificar los campos que pertenecen a la clave comercial en la configuración del conector.Especifique la estrategia del modelo de escritura
DeleteOneBusinessKeyStrategyen la configuración del conector.
Supongamos que desea eliminar un evento de calendario de un año específico de una colección que contiene un documento similar al siguiente:
{ "year": 2005, "month": 3, "day": 15, "event": "Dentist Appointment" }
Para implementar la estrategia, utilizando year como clave comercial, primero cree un índice único en estos campos en el shell de MongoDB:
db.collection.createIndex({ "year": 1 }, { unique: true })
A continuación, especifique su clave comercial y escriba la estrategia del modelo en su configuración de la siguiente manera:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=year document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy
Si su conector procesa un registro receptor que contiene la clave de negocio year, elimina el primer documento con un valor de campo coincidente devuelto por MongoDB. Supongamos que su conector procesa un registro receptor que contiene los siguientes datos de valor:
{ "year": 2005, ... }
Cuando el conector procesa el registro anterior, elimina el primer documento de la colección que contiene un year campo con un valor de "2005", como el documento de muestra original "Cita con el dentista".
Estrategias personalizadas de modelos de guardado
Si ninguna de las estrategias del modelo de escritura incluidas con el conector se adapta a su caso de uso, puede crear la suya propia.
Una estrategia de modelo de escritura es una clase de Java que implementa la interfaz WriteModelStrategy y debe sobreescribir el método createWriteModel().
Consulta el código fuente de la interfaz WriteModelStrategy para la firma del método requerido.
Estrategia de modelo de guardado de muestra
La siguiente estrategia de modelo de escritura personalizada devuelve una operación de escritura que reemplaza un documento MongoDB que coincide con el campo _id de su registro receptor con el valor del campo fullDocument de su registro receptor:
/** * Custom write model strategy * * This class reads the 'fullDocument' field from a change stream and * returns a ReplaceOne operation. */ public class CustomWriteModelStrategy implements WriteModelStrategy { private static String ID = "_id"; public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) { BsonDocument changeStreamDocument = document.getValueDoc() .orElseThrow(() -> new DataException("Missing value document")); BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument()); if (fullDocument.isEmpty()) { return null; // Return null to indicate no op. } return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument); } }
Para otro ejemplo de estrategia personalizada de modelo de guardar, consulta el UpsertAsPartOfDocumentStrategy ejemplo de la estrategia en GitHub.
Cómo instalar tu estrategia
Para configurar que tu conector de destino utilice una estrategia de guardado personalizada, debes completar las siguientes acciones:
Compila la clase de estrategia de escritura personalizada en un archivo JAR.
Agregue el JAR compilado a la ruta de clase/complemento de sus workers de Kafka. Para más información sobre las rutas de complementos, consulte la documentación de Confluent.
Nota
Kafka Connect carga los complementos de forma aislada. Al implementar una estrategia de escritura personalizada, tanto el JAR del conector como el JAR de la estrategia del modelo de escritura deben estar en la misma ruta. Las rutas deberían ser similares a las siguientes:
<plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar<plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jarPara obtener más información sobre los complementos de Kafka Connect, consulte esta guía de Confluent.
Especifique su clase personalizada en la configuración writemodel.strategy.
Para aprender a compilar una clase en un archivo JAR, consulte la guía de implementación de JAR en la documentación de Java SE.