Para agentes de IA: hay un índice de documentación disponible en https://www.mongodb.com/es/docs/llms.txt — versiones en markdown de todas las páginas están disponibles agregando .md a cualquier ruta URL.
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

Sink Connector Procesadores posteriores

En esta página, puedes aprender a configurar post-procesadores en tu conector sink de MongoDB Kafka. Los procesadores posteriores modifican los registros de registro que el conector lee desde un tema de Kafka antes de que el conector los almacene en la colección de MongoDB. Algunos ejemplos de modificaciones de datos que los procesadores posteriores pueden realizar incluyen:

  • Establezca el campo _id del documento en un valor personalizado

  • Incluir o excluir campos clave o de valor del mensaje

  • Renombrar campos

Puedes usar los post procesadores preconstruidos incluidos en el conector o implementar los tuyos propios.

Consulte las siguientes secciones para obtener más información sobre los postprocesadores:

Los posprocesadores modifican los datos leídos desde un tema de Kafka. El conector almacena el mensaje en una clase SinkDocument que contiene una representación de las claves SinkRecord y los campos de valor de Kafka. El conector aplica secuencialmente cualquier procesador posterior especificado en la configuración y almacena el resultado en una colección de MongoDB.

Los procesadores posteriores realizan tareas de modificación de datos como generar el campo del documento _id, proyectar campos de clave o valor de mensaje, y renombrar campos. Puedes usar los procesadores posteriores predefinidos incluidos en el conector, o puedes implementar tu propio procesador extendiendo la clase PostProcessor.

Importante

Procesadores posteriores y controladores de captura de datos de cambios (CDC)

No puedes aplicar un posprocesador a los datos de eventos del gestor CDC. Si especificas ambos, el conector registra un registro de una advertencia.

Puedes especificar uno o más procesadores posteriores en la configuración post.processor.chain como una lista separada por comas. Si especificas más de uno, el conector los aplica secuencialmente, de manera que cada posprocesador modifica los datos generados por el anterior.

Para garantizar que los documentos que el conector escribe en MongoDB contengan campos _id únicos, añade automáticamente el posprocesador DocumentIdAdder en la primera posición de la cadena si no lo incluye de otra manera.

El siguiente ejemplo de configuración especifica que el conector debe ejecutar primero el postprocesador KafkaMetaAdder y luego el postprocesador AllowListValueProjector sobre la salida.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

La siguiente tabla contiene una lista de todos los procesadores posteriores incluidos en el conector del sumidero.

Nombre del posprocesador
Descripción

DocumentIdAdder

Ruta completa:

com.mongodb.kafka.connect.sink.processor.DocumentIdAdder

Inserta un _id campo determinado por la estrategia configurada.
La estrategia predeterminada BsonOidStrategy es.

Para obtener información sobre las opciones de estrategia y la configuración, consulte la sección "Configurar el postprocesador del agregador de ID de documento".

BlockListKeyProjector

Ruta completa:

com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector

Elimina los campos clave coincidentes del registro de destino.
Para obtener más información sobre la configuración, consulte los ejemplos de listas de permitidos y bloqueados.

BlockListValueProjector

Ruta completa:

com.mongodb.kafka.connect.sink.processor.BlockListValueProjector

Elimina los campos con valores coincidentes del registro de destino.
Para obtener más información sobre la configuración, consulte los ejemplos de listas de permitidos y de bloqueados.

AllowListKeyProjector

Ruta completa:

com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector

Incluye únicamente los campos clave coincidentes del registro de destino.
Para obtener más información sobre la configuración, consulte los ejemplos de listas de permitidos y de bloqueados.

AllowListValueProjector

Ruta completa:

com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``

Incluye únicamente los campos con valores coincidentes del registro de destino.
Para obtener más información sobre la configuración, consulte los ejemplos de listas de permitidos y de bloqueados.

KafkaMetaAdder

Ruta completa:

com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder

Agrega un campo llamado "topic-partition-offset" y establece el valor en la concatenación de tema, partición y offset (desplazamiento) de Kafka al documento.

RenameByMapping

Ruta completa:

com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping

Cambia el nombre de los campos que coinciden exactamente con un nombre de campo específico en el documento de clave o valor.
Para obtener información sobre la configuración, consulte el ejemplo de cambio de nombre mediante asignación.

RenombrarPorRegex

Ruta completa:

com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex

Cambia el nombre de los campos que coinciden con una expresión regular en el documento clave o valor.
Para obtener información sobre la configuración, consulte el ejemplo de cambio de nombre mediante expresiones regulares.

Eliminador de valores de campos nulos

Ruta completa:

com.mongodb.kafka.connect.sink.processor.NullFieldValueRemover``

Elimina todos los campos del documento que contienen valores null del registro de destino.

El post-procesador DocumentIdAdder utiliza una estrategia para determinar cómo debe formatear el campo _id en el documento MongoDB. Una estrategia define un comportamiento preestablecido que se puede personalizar para un caso de uso determinado.

Puede especificar una estrategia para este post-procesador en la configuración document.id.strategy, como se muestra en el siguiente ejemplo:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

La siguiente tabla muestra una lista de las estrategias que puedes usar para configurar el post-procesador DocumentIdAdder:

Nombre de la estrategia
Descripción

BsonOidStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy

Genera un ObjectId BSON de MongoDB.
Estrategia predeterminada para el DocumentIdAdder postprocesador.

KafkaMetaDataStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy

Compila una string compuesta por la concatenación del tema, partición y desplazamiento de Kafka.

FullKeyStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy

Utiliza la estructura de clave completa del documento de destino para generar el valor del _id campo.
Si no existe ninguna clave, se utiliza un documento en blanco por defecto.

ProvidedInKeyStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy

Utiliza el _id campo especificado en la estructura de clave del documento de destino.
Genera una excepción si el campo no está presente en el documento de destino.

ProvidedInValueStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy

Utiliza el _id campo especificado en la estructura de valor del documento de destino.
Genera una excepción si el campo no está presente en el documento de destino.

PartialKeyStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy

Utiliza una proyección de lista de bloqueo o lista de permisos de la estructura de claves del documento de destino.
Si no existe ninguna clave, se utiliza un documento en blanco por defecto.

PartialValueStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy

Utiliza una proyección de lista de bloqueo o lista de permisos de la estructura de valores del documento de destino.
Si no existe ningún valor, se utiliza un documento en blanco por defecto.

UuidProvidedInKeyStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy

Convierte el _id campo clave en un UUID. El valor debe ser de tipo cadena o binario y debe ajustarse al formato UUID.

UuidProvidedInValueStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy

Convierte el _id campo de valor en un UUID. El valor debe ser de tipo cadena o binario y debe ajustarse al formato UUID.

UuidStrategy

Ruta completa:

com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``

Utiliza un UUID generado aleatoriamente en formato de string.

Si las estrategias de funcionalidad incorporada de adición de ID de documento no cubren tu caso de uso, puedes definir una estrategia de ID de documento personalizada siguiendo los pasos que se indican a continuación:

  1. Crea una clase de Java que implemente la interfaz IdStrategy y contenga tu lógica personalizada de configuración.

  2. Compila la clase en un archivo JAR.

  3. Agrega el JAR compilado a la ruta de clase / ruta del plugin para todos los trabajadores de Kafka. Para obtener más información sobre las rutas de plugins, consulta la documentación de Confluent.

  4. Actualice la configuración de document.id.strategy al nombre completo de su clase personalizada en todos sus Kafka workers.

Nota

La estrategia seleccionada puede tener implicaciones en la semántica de entrega

Las estrategias BSON ObjectId o UUID solo pueden garantizar la entrega al menos una vez, ya que el conector genera nuevos ID en reintentos o al procesar de nuevo los registros. Otras estrategias permiten la entrega única si puedes garantizar que los campos que forman el ID del documento sean únicos.

Para ejemplos de implementaciones de la interfaz IdStrategy, consulta el directorio de código fuente que contiene implementaciones de estrategia id empaquetadas con el conector.

Esta sección muestra ejemplos de configuración y salidas de muestra de los siguientes tipos de procesadores posteriores:

Los posprocesadores de proyector de la lista de permitidos y la lista de bloqueos determinan qué campos incluir y excluir en la salida.

Cuando utilizas el proyector de lista de permitidos, el postprocesador solo genera datos de los campos especificados.

Cuando utilices el proyector de lista de bloqueos, el post-proceso omitirá datos sólo de los campos que especifiques.

Nota

Puedes usar el "." notación de punto para referenciar campos anidados en el registro. También puedes usar la notación para hacer referencia a campos de documentos en un arreglo.

Cuando se agrega un proyector a la cadena del procesador posterior, se debe especificar el tipo del proyector y si se aplicará a la clave o al valor del documento de sumidero.

Consulta las siguientes secciones para ejemplos de configuraciones y salidas de proyectores.

Suponga que los documentos de valores de registros de Kafka se parecieran al siguiente dato de perfil de usuario:

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

Puedes configurar el proyector de valores AllowList para almacenar datos seleccionados como los campos "nombre", "dirección.ciudad" y "pasatiempos" de tus documentos de valores utilizando los siguientes parámetros:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

Después de que el postprocesador aplica la proyección, se obtiene el siguiente registro:

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

Supón que tus documentos de registro de Kafka se asemejan a los siguientes datos de identificación de usuario:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

Puedes configurar el proyector de claves BlockList para omitir los campos "authToken" y "registration.source" campos antes de almacenar los datos con la siguiente configuración:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

Después de que el postprocesador aplica la proyección, se obtiene el siguiente registro:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

Esta sección muestra cómo se pueden configurar los posprocesadores del proyector para hacer coincidir patrones comodín con nombres de campo.

Patrón

Descripción

*

Coincide con cualquier número de caracteres en el nivel actual.

**

Coincide con cualquier carácter en el nivel actual y en todos los niveles anidados.

Para los ejemplos de coincidencia de patrones de comodín en la lista permitida y la lista bloqueada en esta sección, consulte el siguiente documento de valores que contiene mediciones meteorológicas:

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

Puedes utilizar el comodín * para hacer coincidir varios nombres de campo. La siguiente configuración de ejemplo coincide con los siguientes campos:

  • El campo de nivel superior denominado "ciudad"

  • Los campos llamadas "promedio" que son subdocumentos de cualquier campo de nivel superior que comience con el nombre "velocidad_viento".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

Después de que el postprocesador aplique la proyección de la lista de permitidos, produce el siguiente registro:

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

Puedes usar el comodín **, que coincide con objetos en cualquier nivel a partir del nivel en el que especifiques el comodín. El siguiente ejemplo de coincidencia comodín proyecta cualquier documento que contenga el campo llamado "low".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

El procesador posterior que aplica la proyección genera el siguiente registro:

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

Puede utilizar los patrones de comodines para hacer coincidir campos a un nivel de documento específico, como se muestra en el siguiente ejemplo de configuración de lista de bloqueos:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

Esta sección muestra cómo puedes configurar los posprocesadores de renombrado de campos RenameByMapping y RenameByRegex para actualizar los nombres de campos en un registro de sumidero. La configuración de cambio de nombre de campo especifica lo siguiente:

  • Si actualizar la clave o el documento de valor en el registro

  • Los nombres de los campos a actualizar

  • Los nuevos nombres de los campos

Debe especificar la configuración de RenameByMapping y RenameByRegex en un arreglo JSON. Puedes especificar campos anidados utilizando la notación de puntos o la coincidencia de patrones.

Los ejemplos del posprocesador que renombra campos usan el siguiente registro de sumidero de ejemplo:

Documento clave

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

Documento de valor

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

La configuración del posprocesador RenameByMapping especifica uno o más objetos JSON que asignan los campos que coinciden con una string a un nuevo nombre. Cada objeto contiene el texto para hacer coincidir en el elemento oldName y el texto de reemplazo en el elemento newName como se describe en la tabla a continuación.

Nombre de la clave
Descripción

oldName

Especifica si se deben coincidir los campos en el documento clave o en el documento valor y el nombre del campo que se reemplazará. La configuración utiliza un "." carácter para separar los dos valores.

newName

Especifica el nombre del campo de reemplazo para todas las coincidencias del campo.

La siguiente propiedad de ejemplo coincide con el campo "ubicación" de un documento clave y lo renombra como "país":

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

Esta configuración da instrucciones al post-procesador RenameByMapping para convertir el documento de clave original en el siguiente documento:

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

Puedes realizar una tarea similar de asignación de nombres de campos a documentos de valor, especificando el documento de valor junto con el nombre de campo agregado en el campo oldName, de la siguiente manera:

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

Esta configuración indica al procesador posterior de RenameByMapping que transforme el documento de valor original al siguiente documento:

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

También puedes especificar una o más asignaciones en la propiedad field.renamer.mapping usando un arreglo JSON en formato de string como se muestra en la siguiente configuración:

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

La configuración del posprocesador RenameByRegex especifica los nombres de los campos y los patrones de texto que debe coincidir, así como los valores de reemplazo para el texto coincidente. Puedes especificar una o más expresiones de cambio de nombre en objetos JSON que contengan los campos descritos en la siguiente tabla:

Nombre de la clave
Descripción

regexp

Contiene una expresión regular que coincide con campos para realizar el reemplazo.

Patrón

Contiene una expresión regular que coincide con el texto a reemplazar.

Reemplazar

Contiene el texto de reemplazo para todas las coincidencias de la expresión regular que definiste en el campo pattern.

La siguiente configuración de ejemplo instruye al pos-procesador a realizar lo siguiente:

  • Coincida con cualquier nombre de campo en el documento clave que comience con "date". En el conjunto de campos coincidentes, reemplaza todo el texto que coincida con el patrón _ por el carácter -.

  • Coincide con cualquier nombre de campo en el documento de valores que sean subdocumentos de crepes. En el conjunto de campos coincidentes, sustituya todo el texto que coincida con el patrón purchased por quantity.

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

Cuando el conector aplica el posprocesador al documento clave de ejemplo y al documento de valor de ejemplo, produce lo siguiente:

Documento clave

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

Documento de valor

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

Advertencia

Los procesadores posteriores de cambio de nombre no sobrescriben nombres de campos existentes

Los nombres de campo de destino que estableces en tus posprocesadores de renombrado pueden dar lugar a nombres de campo duplicados en el mismo documento. Para evitar esto, el post-procesador omite el cambio de nombre si se duplica el nombre de un campo existente en el mismo nivel del documento.

Si los procesadores posteriores integrados no cubren tu caso de uso, puedes crear una clase de procesador posterior personalizada utilizando los siguientes pasos:

  1. Crear una clase Java que extienda la clase abstracta PostProcessor.

  2. Sobreescribe el método process() en tu clase. Puedes actualizar el SinkDocument, una representación BSON de los campos clave y valor del registro de destino, y acceder al Kafka original SinkRecord en tu método.

  3. Compila la clase en un archivo JAR.

  4. Añade el JAR compilado a la ruta de clases/ruta de plugin para todos tus workers de Kafka. Para obtener más información sobre las rutas de los plugins, consulta la documentación de Confluent sobre Instalación manual de conectores comunitarios.

  5. Agrega el nombre de clase completo de tu procesador posterior a la configuración de la cadena de procesadores posteriores.

Para procesadores posteriores, puedes explorar el código fuente de las clases de procesador posterior integradas.