Docs Menu
Docs Home
/ /

Escribir estrategias de modelos

Esta guía le muestra cómo cambiar la forma en que su conector de receptor MongoDB Kafka escribe datos en MongoDB.

Puede cambiar la forma en que su conector escribe datos en MongoDB para casos de uso que incluyen los siguientes:

  • Insertar documentos en lugar de sobrescribirlos

  • Reemplazar o actualizar documentos que coincidan con un filtro distinto al _id Campo

  • Eliminar documentos que coinciden con un filtro

Puede configurar cómo su conector escribe datos en MongoDB especificando una estrategia de modelo de escritura. Una estrategia de modelo de escritura es una clase que define cómo su conector receptor debe escribir datos mediante modelos de escritura. Un modelo de escritura es una interfaz del controlador Java de MongoDB que define la estructura de una operación de escritura.

Para aprender a modificar los registros de receptor que recibe su conector antes de que los escriba en MongoDB, lea la guía en Postprocesadores de conector de sumidero.

Para ver la implementación de una estrategia de modelo de escritura, consulte el código fuente del Clase InsertOneDefaultStrategy.

El conector de sumidero escribe datos en MongoDB mediante operaciones de escritura masiva. Estas operaciones agrupan varias operaciones de escritura, como inserciones, actualizaciones o eliminaciones.

De forma predeterminada, el conector de sumidero realiza escrituras masivas ordenadas, lo que garantiza el orden de los cambios de datos. En una escritura masiva ordenada, si alguna operación de escritura genera un error, el conector omite las escrituras restantes de ese lote.

Si no necesita garantizar el orden de los cambios de datos, puede establecer el valor bulk.write.ordered en false para que el conector realice escrituras masivas desordenadas. El conector receptor realiza escrituras masivas desordenadas en paralelo, lo que puede mejorar el rendimiento.

Además, cuando habilita escrituras masivas desordenadas y establece la configuración errors.tolerance en all, incluso si alguna operación de escritura en su escritura masiva falla, el conector continúa realizando las operaciones de escritura restantes en el lote que no devuelven errores.

Tip

Para obtener más información sobre la configuración bulk.write.ordered, consulta la página de Propiedades de procesamiento de mensajes del Connector.

Para obtener más información sobre las operaciones de escritura masiva, consulte la siguiente documentación:

Para especificar una estrategia de modelo de escritura, utilice la siguiente configuración:

writemodel.strategy=<write model strategy classname>

Para obtener una lista de las estrategias de modelo de escritura prediseñadas incluidas en el conector, consulte la guía sobre configuraciones de estrategias de modelo de escritura.

Una clave de negocio es un valor compuesto por uno o más campos en su registro receptor que lo identifica como único. De forma predeterminada, el conector del receptor utiliza el campo _id del registro receptor para recuperar la clave de negocio. Para especificar una clave de negocio diferente, configure el postprocesador del sumador de ID de documento para que utilice un valor personalizado.

Puede configurar el Adder de Id. de documento para establecer el campo _id desde la clave de registro del receptor como se muestra en las siguientes propiedades de ejemplo:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
document.id.strategy.partial.key.projection.list=<comma-separated field names>
document.id.strategy.partial.key.projection.type=AllowList

Como alternativa, puede configurarlo para establecer el campo _id a partir del valor del registro del receptor como se muestra en las siguientes propiedades de ejemplo:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=<comma-separated field names>
document.id.strategy.partial.value.projection.type=AllowList

Importante

Mejorar el rendimiento de escritura

Cree un índice único en su colección de destino que corresponda a los campos de su clave de empresa. Esto mejora el rendimiento de las operaciones de escritura desde su conector receptor. Consulte la guía sobre índices únicos para obtener más información.

Las siguientes estrategias de modelo de guardado requieren una clave empresarial:

  • ReplaceOneBusinessKeyStrategy

  • DeleteOneBusinessKeyStrategy

  • UpdateOneBusinessKeyTimestampStrategy

Para obtener más información sobre el posprocesador del sumador de Id. de documento, consulte Configurar el posprocesador del sumador de Id. de documento.

Esta sección muestra ejemplos de configuración y salida de las siguientes estrategias de modelo de escritura:

Puede configurar la estrategia "Actualizar una marca de tiempo" para agregar y actualizar marcas de tiempo al escribir documentos en MongoDB. Esta estrategia realiza las siguientes acciones:

  • Cuando el conector inserta un nuevo documento MongoDB, establece los campos _insertedTS y _modifiedTS en la hora actual en el servidor del conector.

  • Cuando el conector actualiza un documento MongoDB existente, actualiza el campo _modifiedTS a la hora actual en el servidor del conector.

Supongamos que desea rastrear la posición de un tren a lo largo de una ruta y su conector receptor recibe mensajes con la siguiente estructura:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

Use ProvidedInValueStrategy para especificar que su conector debe usar el valor _id del mensaje para asignar el campo _id en su documento de MongoDB. Especifique su ID y escriba las propiedades de la estrategia del modelo como se indica a continuación:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

Después de que el conector de receptor procesa el registro de ejemplo anterior, inserta un documento que contiene los campos _insertedTS y _modifiedTS como se muestra en el siguiente documento:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T15:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 40, -73 ]
}

Después de una hora, el tren informa su nueva ubicación a lo largo de su ruta con una nueva posición como se muestra en el siguiente registro:

{
"_id": "MN-1234",
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Una vez que el conector del receptor procesa el registro anterior, inserta un documento que contiene los siguientes datos:

{
"_id": "MN-1234",
"_insertedTS": ISODate("2021-09-20T15:08:000Z"),
"_modifiedTS": ISODate("2021-09-20T16:08:000Z"),
"start": "Beacon",
"destination": "Grand Central"
"position": [ 42, -75 ]
}

Para obtener más información ProvidedInValueStrategy sobre, consulte la sección sobre cómo configurar el posprocesador del sumador de Id. de documento.

Puede configurar la estrategia "Reemplazar una clave de empresa" para reemplazar los documentos que coincidan con el valor de la clave de empresa. Para definir una clave de empresa en varios campos de un registro y configurar el conector para que reemplace los documentos que contengan claves de empresa coincidentes, realice las siguientes tareas:

  1. Cree un índice único en su colección que corresponda a los campos clave de su negocio.

  2. Especifique la estrategia de identificación PartialValueStrategy para identificar los campos que pertenecen a la clave comercial en la configuración del conector.

  3. Especifique la estrategia del modelo de escritura ReplaceOneBusinessKeyStrategy en la configuración del conector.

Suponga que desea rastrear la capacidad de aviones por número de vuelo y ubicación del aeropuerto representados por flight_no y airport_code, respectivamente. Un mensaje de ejemplo contiene la siguiente información:

{
"flight_no": "Z342",
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

Para implementar la estrategia, utilizando flight_no y airport_code como clave comercial, primero cree un índice único en estos campos en el shell de MongoDB:

db.collection.createIndex({ "flight_no": 1, "airport_code": 1}, { unique: true })

A continuación, especifique los campos de estrategia PartialValueStrategy y clave de negocio en la lista de proyección. Especifique el ID y la configuración de la estrategia del modelo de escritura como se indica a continuación:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=flight_no,airport_code
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy

Los datos de muestra insertados en la colección contienen lo siguiente:

{
"flight_no": "Z342"
"airport_code": "LAX",
"seats": {
"capacity": 180,
"occupied": 152
}
}

Cuando el conector procesa datos de receptor que coinciden con la clave comercial del documento existente, reemplaza el documento con los nuevos valores sin cambiar los campos de la clave comercial:

{
"flight_no": "Z342"
"airport_code": "LAX",
"status": "canceled"
}

Después de que el conector procesa los datos del receptor, reemplaza el documento de muestra original en MongoDB con el anterior.

Puede configurar el conector para que elimine un documento cuando reciba mensajes que coincidan con una clave empresarial mediante la estrategia Eliminar una clave empresarial. Para establecer una clave empresarial desde varios campos de un registro y configurar el conector para que elimine un documento que contenga una clave empresarial coincidente, realice las siguientes tareas:

  1. Cree un índice único en su colección MongoDB que corresponda a los campos clave de su negocio.

  2. Especifique PartialValueStrategy como la estrategia de identificación para identificar los campos que pertenecen a la clave comercial en la configuración del conector.

  3. Especifique la estrategia del modelo de escritura DeleteOneBusinessKeyStrategy en la configuración del conector.

Supongamos que desea eliminar un evento de calendario de un año específico de una colección que contiene un documento similar al siguiente:

{
"year": 2005,
"month": 3,
"day": 15,
"event": "Dentist Appointment"
}

Para implementar la estrategia, utilizando year como clave comercial, primero cree un índice único en estos campos en el shell de MongoDB:

db.collection.createIndex({ "year": 1 }, { unique: true })

A continuación, especifique su clave comercial y escriba la estrategia del modelo en su configuración de la siguiente manera:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
document.id.strategy.partial.value.projection.list=year
document.id.strategy.partial.value.projection.type=AllowList
writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneBusinessKeyStrategy

Si su conector procesa un registro receptor que contiene la clave de negocio year, elimina el primer documento con un valor de campo coincidente devuelto por MongoDB. Supongamos que su conector procesa un registro receptor que contiene los siguientes datos de valor:

{
"year": 2005,
...
}

Cuando el conector procesa el registro anterior, elimina el primer documento de la colección que contiene un year campo con un valor de "2005", como el documento de muestra original "Cita con el dentista".

Si ninguna de las estrategias del modelo de escritura incluidas con el conector se adapta a su caso de uso, puede crear la suya propia.

Una estrategia de modelo de escritura es una clase de Java que implementa la interfaz WriteModelStrategy y debe sobreescribir el método createWriteModel().

Consulta el código fuente de la interfaz WriteModelStrategy para la firma del método requerido.

La siguiente estrategia de modelo de escritura personalizada devuelve una operación de escritura que reemplaza un documento MongoDB que coincide con el campo _id de su registro receptor con el valor del campo fullDocument de su registro receptor:

/**
* Custom write model strategy
*
* This class reads the 'fullDocument' field from a change stream and
* returns a ReplaceOne operation.
*/
public class CustomWriteModelStrategy implements WriteModelStrategy {
private static String ID = "_id";
@Override
public WriteModel<BsonDocument> createWriteModel(final SinkDocument document) {
BsonDocument changeStreamDocument = document.getValueDoc()
.orElseThrow(() -> new DataException("Missing value document"));
BsonDocument fullDocument = changeStreamDocument.getDocument("fullDocument", new BsonDocument());
if (fullDocument.isEmpty()) {
return null; // Return null to indicate no op.
}
return new ReplaceOneModel<>(Filters.eq(ID, fullDocument.get(ID)), fullDocument);
}
}

Para otro ejemplo de estrategia personalizada de modelo de guardar, consulta el UpsertAsPartOfDocumentStrategy ejemplo de la estrategia en GitHub.

Para configurar que tu conector de destino utilice una estrategia de guardado personalizada, debes completar las siguientes acciones:

  1. Compila la clase de estrategia de escritura personalizada en un archivo JAR.

  2. Agregue el JAR compilado a la ruta de clase/complemento de sus workers de Kafka. Para más información sobre las rutas de complementos, consulte la documentación de Confluent.

    Nota

    Kafka Connect carga los complementos de forma aislada. Al implementar una estrategia de escritura personalizada, tanto el JAR del conector como el JAR de la estrategia del modelo de escritura deben estar en la misma ruta. Las rutas deberían ser similares a las siguientes:

    <plugin.path>/mongo-kafka-connect/mongo-kafka-connect-all.jar
    <plugin.path>/mongo-kafka-connect/custom-write-model-strategy.jar

    Para obtener más información sobre los complementos de Kafka Connect, consulte esta guía de Confluent.

  3. Especifique su clase personalizada en la configuración writemodel.strategy.

Para aprender a compilar una clase en un archivo JAR, consulte la guía de implementación de JAR en la documentación de Java SE.

Volver

Fundamentals

En esta página