Docs Menu
Docs Home
/ /

Postprocesadores de conector de sumidero

En esta página, puede aprender a configurar postprocesadores en su conector receptor de MongoDB Kafka. Los postprocesadores modifican los registros receptores que el conector lee de un tema de Kafka antes de que este los almacene en su colección de MongoDB. Algunos ejemplos de modificaciones de datos que pueden realizar los postprocesadores incluyen:

  • Establecer el documento _id campo a un valor personalizado

  • Incluir o excluir campos de clave o valor de mensaje

  • Cambiar el nombre de los campos

Puede utilizar los posprocesadores prediseñados incluidos en el conector o implementar los suyos propios.

Consulte las siguientes secciones para obtener más información sobre los postprocesadores:

  • Cómo modifican los datos los postprocesadores

  • Cómo especificar postprocesadores

  • Postprocesadores prediseñados

  • Configurar el postprocesador del sumador de ID de documento

  • Ejemplos de configuración de postprocesador

  • Crear un postprocesador personalizado

Los postprocesadores modifican los datos leídos de un tema de Kafka. El conector almacena el mensaje en una clase SinkDocument que contiene una representación de los campos de clave y valor SinkRecord de Kafka. El conector aplica secuencialmente cualquier postprocesador especificado en la configuración y almacena el resultado en una colección de MongoDB.

Los postprocesadores realizan tareas de modificación de datos, como generar el _id campo del documento, proyectar campos de clave o valor de mensaje y renombrar campos. Puede usar los postprocesadores predefinidos incluidos en el conector o implementar los suyos propios extendiendo el postprocesador. clase.

Importante

Postprocesadores y controladores de captura de datos modificados (CDC)

No se puede aplicar un postprocesador a Datos de eventos del controlador CDC. Si se especifican ambos, el conector registra una advertencia.

Puede especificar uno o más posprocesadores en la configuración post.processor.chain como una lista separada por comas. Si especifica más de uno, el conector los aplica secuencialmente, de modo que cada posprocesador modifica la salida de datos según el anterior.

Para garantizar que los documentos que el conector escribe en MongoDB contengan campos _id únicos, agrega automáticamente el postprocesador DocumentIdAdder en la primera posición de la cadena si no lo incluye de otra manera.

La siguiente configuración de ejemplo especifica que el conector debe ejecutar primero el postprocesador KafkaMetaAdder y luego el postprocesador AllowListValueProjector en la salida.

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

La siguiente tabla contiene una lista de todos los postprocesadores incluidos en el conector del sumidero.

Nombre del postprocesador
Descripción
DocumentIdAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
Inserts an _id field determined by the configured strategy.
The default strategy is BsonOidStrategy.
For information on strategy options and configuration, see the Configure the Document Id Adder Post Processor section.
BlockListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
BlockListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListKeyProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
AllowListValueProjector
Full Path:
com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``
Includes only matching value fields from the sink record.
For more information on configuration, see the Allow List and Block List Examples.
KafkaMetaAdder
Full Path:
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document.
RenameByMapping
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.
For information on configuration, see the Renaming by Mapping Example.
RenameByRegex
Full Path:
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression in the key or value document.
For information on configuration, see the Renaming by Regular Expression Example.
NullFieldValueRemover
Full Path:
com.mongodb.kafka.connect.sink.processor.NullFieldValueRemover``
Removes all document fields that contain null values from the sink record.

El post-procesador DocumentIdAdder utiliza una estrategia para determinar cómo debe formatear el campo _id en el documento MongoDB. Una estrategia define un comportamiento preestablecido que se puede personalizar para un caso de uso determinado.

Puede especificar una estrategia para este postprocesador en la configuración document.id.strategy como se muestra en el siguiente ejemplo:

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

La siguiente tabla muestra una lista de las estrategias que puede utilizar para configurar el postprocesador DocumentIdAdder:

Nombre de la estrategia
Descripción
BsonOidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
Generates a MongoDB BSON ObjectId.
Default strategy for the DocumentIdAdder post processor.
KafkaMetaDataStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.
FullKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses the complete key structure of the sink document to generate the value for the _id field.
Defaults to a blank document if no key exists.
ProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure of the sink document.
Throws an exception if the field is missing from the sink document.
ProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the sink document.
Throws an exception if the field is missing from the sink document.
PartialKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the sink document key structure.
Defaults to a blank document if no key exists.
PartialValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the sink document value structure.
Defaults to a blank document if no value exists.
UuidProvidedInKeyStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidProvidedInValueStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.
UuidStrategy
Full Path:
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``
Uses a randomly generated UUID in string format.

Si las estrategias de adición de identificación de documento integradas no cubren su caso de uso, puede definir una estrategia de identificación de documento personalizada siguiendo los pasos a continuación:

  1. Cree una clase Java que implemente la interfaz IdStrategy y contenga su lógica de configuración personalizada.

  2. Compila la clase en un archivo JAR.

  3. Añade el JAR compilado a la ruta de clases o del plugin de todos tus trabajadores de Kafka. Para más información sobre las rutas de plugins, consulta la documentación de Confluent.

  4. Actualice la configuración document.id.strategy con el nombre de clase completo de su clase personalizada en todos sus trabajadores de Kafka.

Nota

La estrategia seleccionada puede tener implicaciones en la semántica de entrega

Las estrategias BSON ObjectId o UUID solo garantizan una entrega única, ya que el conector genera nuevos identificadores en los reintentos o al volver a procesar los registros. Otras estrategias permiten una entrega única si se garantiza la exclusividad de los campos que forman el identificador del documento.

Para ver ejemplos de implementaciones de la IdStrategy interfaz, consulte el directorio del código fuente que contiene las implementaciones de la estrategia de identificación empaquetadas con el conector.

Esta sección muestra ejemplos de configuración y salidas de muestra de los siguientes tipos de procesadores posteriores:

Los posprocesadores del proyector de listas de permitidos y de bloqueos determinan qué campos incluir y excluir de la salida.

Cuando utiliza el proyector de lista de permitidos, el postprocesador solo genera datos de los campos que usted especifica.

Cuando utiliza el proyector de lista de bloqueos, el proceso posterior solo omite los datos de los campos que usted especifica.

Nota

Puede usar la notación "." (punto) para hacer referencia a campos anidados en el registro. También puede usar esta notación para hacer referencia a campos de documentos en una matriz.

Cuando agrega un proyector a su cadena de postprocesador, debe especificar el tipo de proyector y si desea aplicarlo a la parte de clave o valor del documento receptor.

Consulte las siguientes secciones para ver ejemplos de configuraciones y salidas del proyector.

Supongamos que sus documentos de valor de registro de Kafka se parecen a los siguientes datos de perfil de usuario:

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

Puede configurar el proyector de valores AllowList para almacenar datos seleccionados como los campos "nombre", "dirección.ciudad" y "pasatiempos" de sus documentos de valores mediante las siguientes configuraciones:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

Después de que el postprocesador aplica la proyección, genera el siguiente registro:

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

Supongamos que sus documentos clave de registro de Kafka se parecen a los siguientes datos de identificación de usuario:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

Puede configurar el proyector de teclas BlockList para omitir los campos "authToken" y "registration.source" antes de almacenar los datos con las siguientes configuraciones:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

Después de que el postprocesador aplica la proyección, genera el siguiente registro:

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

En esta sección se muestra cómo configurar los postprocesadores del proyector para que coincidan con los patrones comodín para que coincidan con los nombres de campo.

Patrón

Descripción

*

Coincide con cualquier número de caracteres en el nivel actual.

**

Coincide con cualquier personaje del nivel actual y todos los niveles anidados.

Para conocer los ejemplos de coincidencia de patrones comodín de listas de permitidos y listas de bloqueados en esta sección, consulte el siguiente documento de valores que contiene mediciones meteorológicas:

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

Puedes utilizar el comodín * para hacer coincidir varios nombres de campo. La siguiente configuración de ejemplo coincide con los siguientes campos:

  • El campo de nivel superior llamado "ciudad"

  • Los campos denominados "promedio" que son subdocumentos de cualquier campo de nivel superior que comience con el nombre "velocidad_del_viento".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

Después de que el postprocesador aplica la proyección de la lista de permitidos, genera el siguiente registro:

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

Puedes usar el comodín **, que coincide con objetos en cualquier nivel a partir del nivel en el que especifiques el comodín. El siguiente ejemplo de coincidencia comodín proyecta cualquier documento que contenga el campo llamado "low".

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

El postprocesador que aplica la proyección genera el siguiente registro:

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

Puede utilizar patrones comodín para hacer coincidir campos en un nivel de documento específico como se muestra en el siguiente ejemplo de configuración de lista de bloqueo:

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

Esta sección muestra cómo configurar los posprocesadores de renombramiento de campos RenameByMapping y RenameByRegex para actualizar los nombres de campo en un registro receptor. La configuración de renombramiento de campos especifica lo siguiente:

  • Si se debe actualizar el documento clave o de valor en el registro

  • Los nombres de los campos a actualizar

  • Los nuevos nombres de campo

Debe especificar las configuraciones RenameByMapping y RenameByRegex en una matriz JSON. Puede especificar campos anidados mediante notación de puntos o coincidencia de patrones.

Los ejemplos de postprocesador de renombramiento de campo utilizan el siguiente registro de receptor de ejemplo:

Documento clave

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

Documento de valor

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

La configuración del posprocesador RenameByMapping especifica uno o más objetos JSON que asignan un nuevo nombre a los campos que coinciden con una cadena. Cada objeto contiene el texto que debe coincidir en el elemento oldName y el texto de reemplazo en el elemento newName, como se describe en la tabla a continuación.

Nombre de la clave
Descripción

nombre antiguo

Especifica si se deben hacer coincidir los campos del documento de clave o valor y el nombre del campo que se va a reemplazar. La configuración utiliza un carácter "." para separar los dos valores.

nuevoNombre

Especifica el nombre del campo de reemplazo para todas las coincidencias del campo.

La siguiente propiedad de ejemplo coincide con el campo "ubicación" de un documento clave y lo renombra a "país":

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

Esta configuración le indica al RenameByMapping postprocesador que transforme el documento clave original en el siguiente documento:

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

Puede realizar una asignación de nombre de campo similar para documentos de valor especificando el documento de valor con el nombre de campo adjunto en el campo oldName de la siguiente manera:

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

Esta configuración le indica al RenameByMapping postprocesador que transforme el documento de valor original en el siguiente documento:

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

También puede especificar una o más asignaciones en la propiedad field.renamer.mapping utilizando una matriz JSON en formato de cadena como se muestra en la siguiente configuración:

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

La configuración del posprocesador RenameByRegex especifica los nombres de campo y los patrones de texto que debe coincidir, así como los valores de reemplazo para el texto coincidente. Puede especificar una o más expresiones de cambio de nombre en objetos JSON que contengan los campos descritos en la siguiente tabla:

Nombre de la clave
Descripción

regexp

Contiene una expresión regular que coincide con los campos para realizar el reemplazo.

patrón

Contiene una expresión regular que coincide con el texto a reemplazar.

Reemplazar

Contiene el texto de reemplazo para todas las coincidencias de la expresión regular definida en el campo pattern.

La siguiente configuración de ejemplo le indica al postprocesador que realice lo siguiente:

  • Coincida con cualquier nombre de campo del documento clave que comience por "fecha". En el conjunto de campos coincidentes, reemplace todo el texto que coincida con el patrón _ por el carácter -.

  • Coincide con cualquier nombre de campo en el documento de valor que sea subdocumento de crepes. En el conjunto de campos coincidentes, reemplaza todo el texto que coincida con el patrón purchased por quantity.

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

Cuando el conector aplica el postprocesador al documento clave de ejemplo y al documento de valor de ejemplo, genera lo siguiente:

Documento clave

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

Documento de valor

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

Advertencia

Los posprocesadores de renombramiento no sobrescriben los nombres de campos existentes

Los nombres de campo de destino que configure en los posprocesadores de renombrado pueden generar nombres de campo duplicados en el mismo documento. Para evitarlo, el posprocesador omite el cambio de nombre cuando duplica un nombre de campo existente en el mismo nivel del documento.

Si los postprocesadores integrados no cubren su caso de uso, puede crear una clase de postprocesador personalizada siguiendo los siguientes pasos:

  1. Cree una clase Java que extienda la clase abstracta PostProcessor.

  2. Anule el método process() en su clase. Puede actualizar SinkDocument, una representación BSON de los campos clave y valor del registro receptor, y acceder al método Kafka SinkRecord original en su método.

  3. Compila la clase en un archivo JAR.

  4. Agregue el JAR compilado a la ruta de clase o ruta del plugin para todos sus trabajadores de Kafka. Para obtener más información sobre las rutas de los plugins, consulte la documentación de Confluent sobre la instalación manual de conectores comunitarios.

  5. Agregue el nombre completo de la clase de su postprocesador a la configuración de la cadena de postprocesador.

Para procesadores posteriores, puedes explorar el código fuente de las clases de procesador posterior integradas.

Volver

Escribir estrategias de modelos