Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Escribir estrategias de modelos

Esta guía le muestra cómo cambiar la forma en que su conector sink de MongoDB Kafka guarda datos en MongoDB.

Puedes cambiar cómo tu conector escribe datos en MongoDB para casos de uso que incluyen los siguientes:

  • Inserte documentos en lugar de actualizarlos

  • Reemplazar o actualizar documentos que coincidan con un filtro que no sea el _id Campo

  • Eliminar documentos que coinciden con un filtro

Puedes configurar cómo se escribe los datos en MongoDB especificando una estrategia de modelo de escritura. Una estrategia de modelo de escritura es una clase que define cómo debe guardar datos el conector de sumidero con el uso de modelos de escritura. Un modelo de escritura es una interfaz de MongoDB Java driver que define la estructura de una operación de escritura.

Para aprender cómo modificar los registros sink que recibe tu conector antes de que tu conector los guarde en MongoDB, lee la guía sobre Procesadores posteriores de conector de salida.

Para ver una implementación de estrategia de modelo de escritura, consulte el código fuente de la Clase InsertOneDefaultStrategy.

El conector sumidero escribe datos en MongoDB utilizando operaciones de guardar masiva. Los guardados masivos agrupan varias operaciones de guardado, como inserciones, actualizaciones o borrados, juntas.

De forma predeterminada, el conector de sumidero realiza escrituras masivas ordenadas, lo que garantiza el orden de los cambios de datos. En una escritura masiva ordenada, si alguna operación de escritura genera un error, el conector omite las escrituras restantes de ese lote.

Si no necesitas garantizar el orden de los cambios de datos, puedes configurar el ajuste bulk.write.ordered en false para que el conector realice escrituras masivas sin ordenar. El conector de sink realiza guardados masivos desordenados en paralelo, lo que puede mejorar el rendimiento.

Además, cuando activas las escrituras masivas desordenadas y configuras el ajuste errors.tolerance a all, incluso si alguna operación de escritura en tu escritura masiva falla, el conector sigue realizando las operaciones de escritura restantes en el lote que no devuelven errores.

Tip

Para obtener más información sobre la configuración bulk.write.ordered, consulta la página de Propiedades de procesamiento de mensajes del Connector.

Para obtener más información sobre las operaciones de escritura masiva, consulte la siguiente documentación:

Para especificar una estrategia de modelo de escritura, utilice la siguiente configuración:

writemodel.strategy=<write model strategy classname>

Para obtener una lista de las estrategias de modelo de escritura prediseñadas incluidas en el conector, consulte la guía sobre configuraciones de estrategias de modelo de escritura.

Una clave empresarial es un valor compuesto por uno o más campos en su registro de referencia que lo identifican como único. Por defecto, el conector de sumidero usa el campo _id del registro del sumidero para recuperar la clave empresarial. Para especificar una clave comercial diferente, configure el posprocesador Document Id Adder para usar un valor personalizado.

Puedes configurar el Document Id Adder para establecer el campo _id de la clave del registro sink como se muestra en las siguientes propiedades de ejemplo:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
document.id.strategy.partial.key.projection.list=<comma-separated field names>
document.id.strategy.partial.key.projection.type=AllowList

De manera alternativa, puede configurarlo para establecer el campo _id del valor del registro sink, como se muestra en las siguientes propiedades de ejemplo:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList

Importante

Mejorar el rendimiento de escritura

Cree un índice único en su colección de destino que corresponda a los campos de su clave de empresa. Esto mejora el rendimiento de las operaciones de escritura desde su conector receptor. Consulte la guía sobre índices únicos para obtener más información.

Las siguientes estrategias de modelo de guardado requieren una clave empresarial:

  • ReplaceOneBusinessKeyStrategy

  • DeleteOneBusinessKeyStrategy

  • UpdateOneBusinessKeyTimestampStrategy

Para obtener más información sobre el procesador posterior documento Id Adder, consulta Configurar el procesador posterior documento Id Adder.

Esta sección muestra ejemplos de configuración y resultados de las siguientes estrategias del modelo de escritura:

Puede configurar la estrategia Update One Timestamps para agregar y actualizar marcas de tiempo al guardar documentos en MongoDB. Esta estrategia realiza las siguientes acciones:

  • Cuando el conector inserta un nuevo documento de MongoDB, establece los campos _insertedTS y _modifiedTS con la hora actual en el servidor del conector.

  • Cuando el conector actualiza un documento MongoDB existente, actualiza el campo _modifiedTS a la hora actual en el servidor del conector.

Supongamos que quieres rastrear la posición de un tren a lo largo de una ruta, y tu conector de destino recibe mensajes con la siguiente estructura:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

Utiliza el ProvidedInValueStrategy para especificar que tu conector debe usar el valor _id del mensaje para asignar el campo _id en tu documento de MongoDB. Especifica tu ID y guardar las propiedades de la estrategia del modelo de la siguiente manera:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

Después de que el conector de receptor procesa el registro de ejemplo anterior, inserta un documento que contiene los campos _insertedTS y _modifiedTS como se muestra en el siguiente documento:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

Después de una hora, el tren informa su nueva ubicación a lo largo de su ruta con una nueva posición como se muestra en el siguiente registro:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Una vez que el conector de destino procese el registro anterior, insertará un documento que contenga los siguientes datos:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Para obtener más información sobre el ProvidedInValueStrategy, consulte la sección sobre Configurar el Procesador Posterior Adder de Id de Documento.

Puedes configurar la estrategia de Clave Comercial de Reemplazo para reemplazar documentos que coincidan con el valor de la clave comercial. Para definir una clave comercial en varios campos de un registro y configurar el conector para que reemplace documentos que contengan claves comerciales coincidentes, realiza las siguientes tareas:

  1. Crea un índice único en tu colección que corresponda a los campos clave de tu empresa.

  2. Especifica la estrategia de ID PartialValueStrategy para identificar los campos que pertenecen a la clave empresarial en la configuración del conector.

  3. Especifica la estrategia del modelo de guardado ReplaceOneBusinessKeyStrategy en la configuración del conector.

Suponga que desea rastrear la capacidad de aviones por número de vuelo y ubicación del aeropuerto representados por flight_no y airport_code, respectivamente. Un mensaje de ejemplo contiene la siguiente información:

{
"flight_no": "Z342",
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

Para implementar la estrategia, utilizando flight_no y airport_code como clave comercial, primero cree un índice único en estos campos en el shell de MongoDB:

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

A continuación, especifique la PartialValueStrategy estrategia y los campos clave de negocio en una lista de proyección. Especifica el ID y guarda la configuración de la estrategia del modelo de escritura de la siguiente manera:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

Los datos de muestra insertados en la colección contienen lo siguiente:

{
"flight_no": "Z342"
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

Cuando el conector procesa datos de destino que coinciden con la clave de negocio del documento existente, reemplaza el documento con los nuevos valores sin cambiar los campos de la clave de negocio:

{
"flight_no": "Z342"
"airport_code": "LAX",
"status": "canceled"
}

Después de que el conector procese los datos de destino, reemplazará el documento de muestra original en MongoDB por el anterior.

Puedes configurar el conector para remover un documento cuando reciba mensajes que coincidan con una clave de negocio utilizando la estrategia Borrar una clave de negocio. Para establecer una clave comercial a partir de varios campos de un registro y configurar el conector para eliminar un documento que contenga una clave comercial coincidente, realice las siguientes tareas:

  1. Cree un índice único en su colección MongoDB que corresponda a los campos clave de su negocio.

  2. Specifica el PartialValueStrategy como la estrategia de id para identificar los campos que pertenecen a la clave de negocio en la configuración del conector.

  3. Especifica la estrategia del modelo de guardado DeleteOneBusinessKeyStrategy en la configuración del conector.

Supón que deseas borrar un evento de calendario de un año específico de una colección que contiene un documento que se asemeja a lo siguiente:

{
"year": 2005,
"month": 3,
"day": 15,
"event": "Dentist Appointment"
}

Para implementar la estrategia, utilizando year como clave comercial, primero cree un índice único en estos campos en el shell de MongoDB:

db.collection.createIndex({ "year": 1 }, { unique: true })

A continuación, especifique su clave comercial y escriba la estrategia del modelo en su configuración de la siguiente manera:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=year
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

Si tu conector procesa un registro de destino que contiene la clave empresarial year, se elimina el primer documento con un valor de campo coincidente devuelto por MongoDB. Supongamos que su conector procesa un registro de sumidero que contiene los siguientes datos de valor:

{
"year": 2005,
...
}

Cuando el conector procese el registro anterior, borrará el primer documento de la colección que contenga un campo year con el valor "2005", tal como el documento original de muestra "Dentist Appointment".

Si ninguna de las estrategias del modelo de escritura incluidas con el conector se adapta a su caso de uso, puede crear la suya propia.

Una estrategia de modelo de escritura es una clase de Java que implementa la interfaz WriteModelStrategy y debe sobreescribir el método createWriteModel().

Consulta el código fuente de la interfaz WriteModelStrategy para la firma del método requerido.

La siguiente estrategia de modelo de escritura personalizada devuelve una operación de escritura que reemplaza un documento MongoDB que coincide con el campo _id de su registro receptor con el valor del campo fullDocument de su registro receptor:

/**
* Custom write model strategy
*
* This class reads the 'fullDocument' field from a change stream and
* returns a ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

Para otro ejemplo de estrategia personalizada de modelo de guardar, consulta el UpsertAsPartOfDocumentStrategy ejemplo de la estrategia en GitHub.

Para configurar que tu conector de destino utilice una estrategia de guardado personalizada, debes completar las siguientes acciones:

  1. Compila la clase personalizada de estrategia de guardar en un archivo JAR.

  2. Agrega el JAR compilado al classpath/path de plugins de tus workers de Kafka. Para obtener más información sobre las rutas de plugins, consulta la documentación de Confluent.

    Nota

    Kafka Connect carga los plugins de forma aislada. Cuando implementes una estrategia de escritura personalizada, tanto el JAR del conector como el JAR de la estrategia del modelo de escritura deben estar en la misma ruta. Tus rutas deben parecerse a las siguientes:

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar

    Para aprender más sobre los complementos de Kafka Connect, consulta esta guía de Confluent.

  3. Especifique su clase personalizada la configuración writemodel.strategy.

Para aprender cómo compilar una clase en un archivo JAR, consulta la Guía de implementación de JAR de la documentación de Java SE.

Volver

Fundamentals

En esta página