Overview
Esta guía le muestra cómo cambiar la forma en que su conector sink de MongoDB Kafka guarda datos en MongoDB.
Puedes cambiar cómo tu conector escribe datos en MongoDB para casos de uso que incluyen los siguientes:
Inserte documentos en lugar de actualizarlos
Reemplazar o actualizar documentos que coincidan con un filtro que no sea el
_idCampoEliminar documentos que coinciden con un filtro
Puedes configurar cómo se escribe los datos en MongoDB especificando una estrategia de modelo de escritura. Una estrategia de modelo de escritura es una clase que define cómo debe guardar datos el conector de sumidero con el uso de modelos de escritura. Un modelo de escritura es una interfaz de MongoDB Java driver que define la estructura de una operación de escritura.
Para aprender cómo modificar los registros sink que recibe tu conector antes de que tu conector los guarde en MongoDB, lee la guía sobre Procesadores posteriores de conector de salida.
Para ver una implementación de estrategia de modelo de escritura, consulte el código fuente de la Clase InsertOneDefaultStrategy.
Operaciones de escritura masiva
El conector sumidero escribe datos en MongoDB utilizando operaciones de guardar masiva. Los guardados masivos agrupan varias operaciones de guardado, como inserciones, actualizaciones o borrados, juntas.
De forma predeterminada, el conector de sumidero realiza escrituras masivas ordenadas, lo que garantiza el orden de los cambios de datos. En una escritura masiva ordenada, si alguna operación de escritura genera un error, el conector omite las escrituras restantes de ese lote.
Si no necesitas garantizar el orden de los cambios de datos, puedes configurar el ajuste bulk.write.ordered en false para que el conector realice escrituras masivas sin ordenar. El conector de sink realiza guardados masivos desordenados en paralelo, lo que puede mejorar el rendimiento.
Además, cuando activas las escrituras masivas desordenadas y configuras el ajuste errors.tolerance a all, incluso si alguna operación de escritura en tu escritura masiva falla, el conector sigue realizando las operaciones de escritura restantes en el lote que no devuelven errores.
Tip
Para obtener más información sobre la configuración bulk.write.ordered, consulta la página de Propiedades de procesamiento de mensajes del Connector.
Para obtener más información sobre las operaciones de escritura masiva, consulte la siguiente documentación:
Cómo especificar estrategias de modelo de escritura
Para especificar una estrategia de modelo de escritura, utilice la siguiente configuración:
writemodel.strategy=<write model strategy classname>
Para obtener una lista de las estrategias de modelo de escritura prediseñadas incluidas en el conector, consulte la guía sobre configuraciones de estrategias de modelo de escritura.
Especificar una Clave de Negocio
Una clave empresarial es un valor compuesto por uno o más campos en su registro de referencia que lo identifican como único. Por defecto, el conector de sumidero usa el campo _id del registro del sumidero para recuperar la clave empresarial. Para especificar una clave comercial diferente, configure el posprocesador Document Id Adder para usar un valor personalizado.
Puedes configurar el Document Id Adder para establecer el campo _id de la clave del registro sink como se muestra en las siguientes propiedades de ejemplo:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy document.id.strategy.partial.key.projection.list=<comma-separated field names> document.id.strategy.partial.key.projection.type=AllowList
De manera alternativa, puede configurarlo para establecer el campo _id del valor del registro sink, como se muestra en las siguientes propiedades de ejemplo:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=<comma-separated field names> document.id.strategy.partial.value.projection.type=AllowList
Importante
Mejorar el rendimiento de escritura
Cree un índice único en su colección de destino que corresponda a los campos de su clave de empresa. Esto mejora el rendimiento de las operaciones de escritura desde su conector receptor. Consulte la guía sobre índices únicos para obtener más información.
Las siguientes estrategias de modelo de guardado requieren una clave empresarial:
ReplaceOneBusinessKeyStrategyDeleteOneBusinessKeyStrategyUpdateOneBusinessKeyTimestampStrategy
Para obtener más información sobre el procesador posterior documento Id Adder, consulta Configurar el procesador posterior documento Id Adder.
Ejemplos
Esta sección muestra ejemplos de configuración y resultados de las siguientes estrategias del modelo de escritura:
Estrategia de marcas de tiempo de Update One
Puede configurar la estrategia Update One Timestamps para agregar y actualizar marcas de tiempo al guardar documentos en MongoDB. Esta estrategia realiza las siguientes acciones:
Cuando el conector inserta un nuevo documento de MongoDB, establece los campos
_insertedTSy_modifiedTScon la hora actual en el servidor del conector.Cuando el conector actualiza un documento MongoDB existente, actualiza el campo
_modifiedTSa la hora actual en el servidor del conector.
Supongamos que quieres rastrear la posición de un tren a lo largo de una ruta, y tu conector de destino recibe mensajes con la siguiente estructura:
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
Utiliza el ProvidedInValueStrategy para especificar que tu conector debe usar el valor _id del mensaje para asignar el campo _id en tu documento de MongoDB. Especifica tu ID y guardar las propiedades de la estrategia del modelo de la siguiente manera:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy
Después de que el conector de receptor procesa el registro de ejemplo anterior, inserta un documento que contiene los campos _insertedTS y _modifiedTS como se muestra en el siguiente documento:
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T15:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 40, -73 ] }
Después de una hora, el tren informa su nueva ubicación a lo largo de su ruta con una nueva posición como se muestra en el siguiente registro:
{ "_id": "MN-1234", "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
Una vez que el conector de destino procese el registro anterior, insertará un documento que contenga los siguientes datos:
{ "_id": "MN-1234", "_insertedTS": ISODate("2021-09-20T15:08:000Z"), "_modifiedTS": ISODate("2021-09-20T16:08:000Z"), "start": "Beacon", "destination": "Grand Central" "position": [ 42, -75 ] }
Para obtener más información sobre el ProvidedInValueStrategy, consulte la sección sobre Configurar el Procesador Posterior Adder de Id de Documento.
Reemplazar una Estrategia de clave comercial
Puedes configurar la estrategia de Clave Comercial de Reemplazo para reemplazar documentos que coincidan con el valor de la clave comercial. Para definir una clave comercial en varios campos de un registro y configurar el conector para que reemplace documentos que contengan claves comerciales coincidentes, realiza las siguientes tareas:
Crea un índice único en tu colección que corresponda a los campos clave de tu empresa.
Especifica la estrategia de ID
PartialValueStrategypara identificar los campos que pertenecen a la clave empresarial en la configuración del conector.Especifica la estrategia del modelo de guardado
ReplaceOneBusinessKeyStrategyen la configuración del conector.
Suponga que desea rastrear la capacidad de aviones por número de vuelo y ubicación del aeropuerto representados por flight_no y airport_code, respectivamente. Un mensaje de ejemplo contiene la siguiente información:
{ "flight_no": "Z342", "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
Para implementar la estrategia, utilizando flight_no y airport_code como clave comercial, primero cree un índice único en estos campos en el shell de MongoDB:
db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })
A continuación, especifique la PartialValueStrategy estrategia y los campos clave de negocio en una lista de proyección. Especifica el ID y guarda la configuración de la estrategia del modelo de escritura de la siguiente manera:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=flight_no,airport_code document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy
Los datos de muestra insertados en la colección contienen lo siguiente:
{ "flight_no": "Z342" "airport_code": "LAX", "seats": { "capacity": 180, "occupied": 152 } }
Cuando el conector procesa datos de destino que coinciden con la clave de negocio del documento existente, reemplaza el documento con los nuevos valores sin cambiar los campos de la clave de negocio:
{ "flight_no": "Z342" "airport_code": "LAX", "status": "canceled" }
Después de que el conector procese los datos de destino, reemplazará el documento de muestra original en MongoDB por el anterior.
Borrar Una Estrategia Clave Empresarial
Puedes configurar el conector para remover un documento cuando reciba mensajes que coincidan con una clave de negocio utilizando la estrategia Borrar una clave de negocio. Para establecer una clave comercial a partir de varios campos de un registro y configurar el conector para eliminar un documento que contenga una clave comercial coincidente, realice las siguientes tareas:
Cree un índice único en su colección MongoDB que corresponda a los campos clave de su negocio.
Specifica el
PartialValueStrategycomo la estrategia de id para identificar los campos que pertenecen a la clave de negocio en la configuración del conector.Especifica la estrategia del modelo de guardado
DeleteOneBusinessKeyStrategyen la configuración del conector.
Supón que deseas borrar un evento de calendario de un año específico de una colección que contiene un documento que se asemeja a lo siguiente:
{ "year": 2005, "month": 3, "day": 15, "event": "Dentist Appointment" }
Para implementar la estrategia, utilizando year como clave comercial, primero cree un índice único en estos campos en el shell de MongoDB:
db.collection.createIndex({ "year": 1 }, { unique: true })
A continuación, especifique su clave comercial y escriba la estrategia del modelo en su configuración de la siguiente manera:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy document.id.strategy.partial.value.projection.list=year document.id.strategy.partial.value.projection.type=AllowList writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy
Si tu conector procesa un registro de destino que contiene la clave empresarial year, se elimina el primer documento con un valor de campo coincidente devuelto por MongoDB. Supongamos que su conector procesa un registro de sumidero que contiene los siguientes datos de valor:
{ "year": 2005, ... }
Cuando el conector procese el registro anterior, borrará el primer documento de la colección que contenga un campo year con el valor "2005", tal como el documento original de muestra "Dentist Appointment".
Estrategias personalizadas de modelos de guardado
Si ninguna de las estrategias del modelo de escritura incluidas con el conector se adapta a su caso de uso, puede crear la suya propia.
Una estrategia de modelo de escritura es una clase de Java que implementa la interfaz WriteModelStrategy y debe sobreescribir el método createWriteModel().
Consulta el código fuente de la interfaz WriteModelStrategy para la firma del método requerido.
Estrategia de modelo de guardado de muestra
La siguiente estrategia de modelo de escritura personalizada devuelve una operación de escritura que reemplaza un documento MongoDB que coincide con el campo _id de su registro receptor con el valor del campo fullDocument de su registro receptor:
/** * Custom write model strategy * * This class reads the 'fullDocument' field from a change stream and * returns a ReplaceOne operation. */ public class CustomWriteModelStrategy implements WriteModelStrategy { private static String ID = "_id"; public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) { BsonDocument changeStreamDocument = document.getValueDoc() .orElseThrow(() -> new DataException("Missing value document")); BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument()); if (fullDocument.isEmpty()) { return null; // Return null to indicate no op. } return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument); } }
Para otro ejemplo de estrategia personalizada de modelo de guardar, consulta el UpsertAsPartOfDocumentStrategy ejemplo de la estrategia en GitHub.
Cómo instalar su estrategia
Para configurar que tu conector de destino utilice una estrategia de guardado personalizada, debes completar las siguientes acciones:
Compila la clase personalizada de estrategia de guardar en un archivo JAR.
Agrega el JAR compilado al classpath/path de plugins de tus workers de Kafka. Para obtener más información sobre las rutas de plugins, consulta la documentación de Confluent.
Nota
Kafka Connect carga los plugins de forma aislada. Cuando implementes una estrategia de escritura personalizada, tanto el JAR del conector como el JAR de la estrategia del modelo de escritura deben estar en la misma ruta. Tus rutas deben parecerse a las siguientes:
<plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar<plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jarPara aprender más sobre los complementos de Kafka Connect, consulta esta guía de Confluent.
Especifique su clase personalizada la configuración writemodel.strategy.
Para aprender cómo compilar una clase en un archivo JAR, consulta la Guía de implementación de JAR de la documentación de Java SE.