Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Sink Connector Procesadores posteriores

En esta página, puedes aprender a configurar post-procesadores en tu conector sink de MongoDB Kafka. Los procesadores posteriores modifican los registros de registro que el conector lee desde un tema de Kafka antes de que el conector los almacene en la colección de MongoDB. Algunos ejemplos de modificaciones de datos que los procesadores posteriores pueden realizar incluyen:

  • Establecer el documento _id campo a un valor personalizado

  • Incluir o excluir campos de clave o valor de mensaje

  • Cambiar el nombre de los campos

Puedes usar los post procesadores preconstruidos incluidos en el conector o implementar los tuyos propios.

Consulte las siguientes secciones para obtener más información sobre los postprocesadores:

  • Cómo los posprocesadores modifican los datos

  • Cómo especificar los Posprocesadores

  • Procesadores posteriores preconstruidos

  • Configura el Post Processor para agregar id documentos

  • Ejemplos de configuración del postprocesador

  • Crear un procesador de publicaciones personalizado

Los posprocesadores modifican los datos leídos desde un tema de Kafka. El conector almacena el mensaje en una clase SinkDocument que contiene una representación de las claves SinkRecord y los campos de valor de Kafka. El conector aplica secuencialmente cualquier procesador posterior especificado en la configuración y almacena el resultado en una colección de MongoDB.

Los postprocesadores realizan tareas de modificación de datos, como generar el _id campo del documento, proyectar campos de clave o valor de mensaje y renombrar campos. Puede usar los postprocesadores predefinidos incluidos en el conector o implementar los suyos propios extendiendo el postprocesador. clase.

Importante

Procesadores posteriores y controladores de captura de datos de cambios (CDC)

No puedes aplicar un post-procesador a Datos de eventos del controlador CDC. Si se especifican ambos, el conector registra una advertencia.

Puede especificar uno o más posprocesadores en la configuración post.processor.chain como una lista separada por comas. Si especifica más de uno, el conector los aplica secuencialmente, de modo que cada posprocesador modifica la salida de datos según el anterior.

Para garantizar que los documentos que el conector escribe en MongoDB contengan campos _id únicos, añade automáticamente el posprocesador DocumentIdAdder en la primera posición de la cadena si no lo incluye de otra manera.

El siguiente ejemplo de configuración especifica que el conector debe ejecutar primero el postprocesador KafkaMetaAdder y luego el postprocesador AllowListValueProjector sobre la salida.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

La siguiente tabla contiene una lista de todos los procesadores posteriores incluidos en el conector del sumidero.

Nombre del posprocesador
Descripción
DocumentIdAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Inserts an _id field determined by the configured strategy.
The default strategy is BsonOidStrategy.
For information on strategy options and configuration, see the Configure the Document Id Adder Post Processor section.
BlockListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
BlockListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``
Includes only matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
KafkaMetaAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.
For information on configuration, see the Renaming by Mapping Example.
RenameByRegex
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression in the key or value document.
For information on configuration, see the Renaming by Regular Expression Example.
NullFieldValueRemover
Full Path:
com.mongodb.kafka.connect.sink.processor.NullFieldValueRemover``
Removes all document fields that contain null values from the sink record.

El post-procesador DocumentIdAdder utiliza una estrategia para determinar cómo debe formatear el campo _id en el documento MongoDB. Una estrategia define un comportamiento preestablecido que se puede personalizar para un caso de uso determinado.

Puede especificar una estrategia para este postprocesador en la configuración document.id.strategy como se muestra en el siguiente ejemplo:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

La siguiente tabla muestra una lista de las estrategias que puedes usar para configurar el post-procesador DocumentIdAdder:

Nombre de la estrategia
Descripción
BsonOidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Generates a MongoDB BSON ObjectId.
Default strategy for the DocumentIdAdder post processor.
KafkaMetaDataStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses the complete key structure of the sink document to generate the value for the _id field.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the sink document.
Throws an exception if the field is missing from the sink document.
ProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the sink document.
Throws an exception if the field is missing from the sink document.
PartialKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the sink document key structure.
Defaults to a blank document if no key exists.
PartialValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the sink document value structure.
Defaults to a blank document if no value exists.
UuidProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``
Uses a randomly generated UUID in string format.

Si las estrategias de funcionalidad incorporada de adición de ID de documento no cubren tu caso de uso, puedes definir una estrategia de ID de documento personalizada siguiendo los pasos que se indican a continuación:

  1. Crea una clase de Java que implemente la interfaz IdStrategy y contenga tu lógica personalizada de configuración.

  2. Compila la clase en un archivo JAR.

  3. Agrega el JAR compilado a la ruta de clase / ruta del plugin para todos los trabajadores de Kafka. Para obtener más información sobre las rutas de plugins, consulta la documentación de Confluent.

  4. Actualice la configuración document.id.strategy con el nombre de clase completo de su clase personalizada en todos sus trabajadores de Kafka.

Nota

La estrategia seleccionada puede tener implicaciones en la semántica de entrega

Las estrategias BSON ObjectId o UUID solo pueden garantizar la entrega al menos una vez, ya que el conector genera nuevos ID en reintentos o al procesar de nuevo los registros. Otras estrategias permiten la entrega única si puedes garantizar que los campos que forman el ID del documento sean únicos.

Para ejemplos de implementaciones de la interfaz IdStrategy, consulta el directorio de código fuente que contiene implementaciones de estrategia id empaquetadas con el conector.

Esta sección muestra ejemplos de configuración y salidas de muestra de los siguientes tipos de procesadores posteriores:

Los posprocesadores de proyector de la lista de permitidos y la lista de bloqueos determinan qué campos incluir y excluir en la salida.

Cuando utiliza el proyector de lista de permitidos, el postprocesador solo genera datos de los campos que usted especifica.

Cuando utilices el proyector de lista de bloqueos, el post-proceso omitirá datos sólo de los campos que especifiques.

Nota

Puedes usar el "." notación de punto para referenciar campos anidados en el registro. También puedes usar la notación para hacer referencia a campos de documentos en un arreglo.

Cuando agrega un proyector a su cadena de postprocesador, debe especificar el tipo de proyector y si desea aplicarlo a la parte de clave o valor del documento receptor.

Consulta las siguientes secciones para ejemplos de configuraciones y salidas de proyectores.

Suponga que los documentos de valores de registros de Kafka se parecieran al siguiente dato de perfil de usuario:

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

Puedes configurar el proyector de valores AllowList para almacenar datos seleccionados como los campos "nombre", "dirección.ciudad" y "pasatiempos" de tus documentos de valores utilizando los siguientes parámetros:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

Después de que el postprocesador aplica la proyección, se obtiene el siguiente registro:

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

Supongamos que sus documentos clave de registro de Kafka se parecen a los siguientes datos de identificación de usuario:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

Puedes configurar el proyector de claves BlockList para omitir los campos "authToken" y "registration.source" campos antes de almacenar los datos con la siguiente configuración:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

Después de que el postprocesador aplica la proyección, se obtiene el siguiente registro:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

Esta sección muestra cómo se pueden configurar los posprocesadores del proyector para hacer coincidir patrones comodín con nombres de campo.

Patrón

Descripción

*

Coincide con cualquier número de caracteres en el nivel actual.

**

Coincide con cualquier carácter en el nivel actual y en todos los niveles anidados.

Para los ejemplos de coincidencia de patrones de comodín en la lista permitida y la lista bloqueada en esta sección, consulte el siguiente documento de valores que contiene mediciones meteorológicas:

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

Puedes utilizar el comodín * para hacer coincidir varios nombres de campo. La siguiente configuración de ejemplo coincide con los siguientes campos:

  • El campo de nivel superior denominado "ciudad"

  • Los campos llamadas "promedio" que son subdocumentos de cualquier campo de nivel superior que comience con el nombre "velocidad_viento".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

Después de que el postprocesador aplique la proyección de la lista de permitidos, produce el siguiente registro:

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

Puedes usar el comodín **, que coincide con objetos en cualquier nivel a partir del nivel en el que especifiques el comodín. El siguiente ejemplo de coincidencia comodín proyecta cualquier documento que contenga el campo llamado "low".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

El procesador posterior que aplica la proyección genera el siguiente registro:

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

Puede utilizar los patrones de comodines para hacer coincidir campos a un nivel de documento específico, como se muestra en el siguiente ejemplo de configuración de lista de bloqueos:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

Esta sección muestra cómo configurar los posprocesadores de renombramiento de campos RenameByMapping y RenameByRegex para actualizar los nombres de campo en un registro receptor. La configuración de renombramiento de campos especifica lo siguiente:

  • Si se debe actualizar el documento clave o de valor en el registro

  • Los nombres de los campos a actualizar

  • Los nuevos nombres de los campos

Debe especificar la configuración de RenameByMapping y RenameByRegex en un arreglo JSON. Puedes especificar campos anidados utilizando la notación de puntos o la coincidencia de patrones.

Los ejemplos de postprocesador de renombramiento de campo utilizan el siguiente registro de receptor de ejemplo:

Documento clave

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

Documento de valor

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

La configuración del posprocesador RenameByMapping especifica uno o más objetos JSON que asignan los campos que coinciden con una string a un nuevo nombre. Cada objeto contiene el texto para hacer coincidir en el elemento oldName y el texto de reemplazo en el elemento newName como se describe en la tabla a continuación.

Nombre de la clave
Descripción

oldName

Especifica si se deben coincidir los campos en el documento clave o en el documento valor y el nombre del campo que se reemplazará. La configuración utiliza un "." carácter para separar los dos valores.

newName

Especifica el nombre del campo de reemplazo para todas las coincidencias del campo.

La siguiente propiedad de ejemplo coincide con el campo "ubicación" de un documento clave y lo renombra como "país":

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

Esta configuración da instrucciones al post-procesador RenameByMapping para convertir el documento de clave original en el siguiente documento:

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

Puede realizar una asignación de nombre de campo similar para documentos de valor especificando el documento de valor con el nombre de campo adjunto en el campo oldName de la siguiente manera:

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

Esta configuración indica al procesador posterior de RenameByMapping que transforme el documento de valor original al siguiente documento:

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

También puedes especificar una o más asignaciones en la propiedad field.renamer.mapping usando un arreglo JSON en formato de string como se muestra en la siguiente configuración:

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

La configuración del posprocesador RenameByRegex especifica los nombres de campo y los patrones de texto que debe coincidir, así como los valores de reemplazo para el texto coincidente. Puede especificar una o más expresiones de cambio de nombre en objetos JSON que contengan los campos descritos en la siguiente tabla:

Nombre de la clave
Descripción

regexp

Contiene una expresión regular que coincide con campos para realizar el reemplazo.

Patrón

Contiene una expresión regular que coincide con el texto a reemplazar.

Reemplazar

Contiene el texto de reemplazo para todas las coincidencias de la expresión regular que definiste en el campo pattern.

La siguiente configuración de ejemplo instruye al pos-procesador a realizar lo siguiente:

  • Coincida con cualquier nombre de campo en el documento clave que comience con "date". En el conjunto de campos coincidentes, reemplaza todo el texto que coincida con el patrón _ por el carácter -.

  • Coincide con cualquier nombre de campo en el documento de valores que sean subdocumentos de crepes. En el conjunto de campos coincidentes, sustituya todo el texto que coincida con el patrón purchased por quantity.

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

Cuando el conector aplica el posprocesador al documento clave de ejemplo y al documento de valor de ejemplo, produce lo siguiente:

Documento clave

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

Documento de valor

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

Advertencia

Los procesadores posteriores de cambio de nombre no sobrescriben nombres de campos existentes

Los nombres de campo de destino que estableces en tus posprocesadores de renombrado pueden dar lugar a nombres de campo duplicados en el mismo documento. Para evitar esto, el post-procesador omite el cambio de nombre si se duplica el nombre de un campo existente en el mismo nivel del documento.

Si los procesadores posteriores integrados no cubren tu caso de uso, puedes crear una clase de procesador posterior personalizada utilizando los siguientes pasos:

  1. Crear una clase Java que extienda la clase abstracta PostProcessor.

  2. Sobreescribe el método process() en tu clase. Puedes actualizar el SinkDocument, una representación BSON de los campos clave y valor del registro de destino, y acceder al Kafka original SinkRecord en tu método.

  3. Compila la clase en un archivo JAR.

  4. Añade el JAR compilado a la ruta de clases/ruta de plugin para todos tus workers de Kafka. Para obtener más información sobre las rutas de los plugins, consulta la documentación de Confluent sobre Instalación manual de conectores comunitarios.

  5. Agregue el nombre completo de la clase de su postprocesador a la configuración de la cadena de postprocesador.

Para procesadores posteriores, puedes explorar el código fuente de las clases de procesador posterior integradas.

Volver

Escribir estrategias de modelos