Overview
En esta página, puedes aprender a configurar post-procesadores en tu conector sink de MongoDB Kafka. Los procesadores posteriores modifican los registros de registro que el conector lee desde un tema de Kafka antes de que el conector los almacene en la colección de MongoDB. Algunos ejemplos de modificaciones de datos que los procesadores posteriores pueden realizar incluyen:
Establezca el campo
_iddel documento en un valor personalizadoIncluir o excluir campos clave o de valor del mensaje
Renombrar campos
Puedes usar los post procesadores preconstruidos incluidos en el conector o implementar los tuyos propios.
Consulte las siguientes secciones para obtener más información sobre los postprocesadores:
Cómo los posprocesadores modifican los datos
Los posprocesadores modifican los datos leídos desde un tema de Kafka. El conector almacena el mensaje en una clase SinkDocument que contiene una representación de las claves SinkRecord y los campos de valor de Kafka. El conector aplica secuencialmente cualquier procesador posterior especificado en la configuración y almacena el resultado en una colección de MongoDB.
Los procesadores posteriores realizan tareas de modificación de datos como generar el campo del documento _id, proyectar campos de clave o valor de mensaje, y renombrar campos. Puedes usar los procesadores posteriores predefinidos incluidos en el conector, o puedes implementar tu propio procesador extendiendo la clase PostProcessor.
Importante
Procesadores posteriores y controladores de captura de datos de cambios (CDC)
No puedes aplicar un posprocesador a los datos de eventos del gestor CDC. Si especificas ambos, el conector registra un registro de una advertencia.
Cómo especificar los Posprocesadores
Puedes especificar uno o más procesadores posteriores en la configuración post.processor.chain como una lista separada por comas. Si especificas más de uno, el conector los aplica secuencialmente, de manera que cada posprocesador modifica los datos generados por el anterior.
Para garantizar que los documentos que el conector escribe en MongoDB contengan campos _id únicos, añade automáticamente el posprocesador DocumentIdAdder en la primera posición de la cadena si no lo incluye de otra manera.
El siguiente ejemplo de configuración especifica que el conector debe ejecutar primero el postprocesador KafkaMetaAdder y luego el postprocesador AllowListValueProjector sobre la salida.
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Procesadores posteriores preconstruidos
La siguiente tabla contiene una lista de todos los procesadores posteriores incluidos en el conector del sumidero.
Nombre del posprocesador | Descripción | |
|---|---|---|
DocumentIdAdder | Ruta completa: Inserta un Para obtener información sobre las opciones de estrategia y la configuración, consulte la sección "Configurar el postprocesador del agregador de ID de documento". | |
BlockListKeyProjector | Ruta completa: Elimina los campos clave coincidentes del registro de destino. | |
BlockListValueProjector | Ruta completa: Elimina los campos con valores coincidentes del registro de destino. | |
AllowListKeyProjector | Ruta completa: Incluye únicamente los campos clave coincidentes del registro de destino. | |
AllowListValueProjector | Ruta completa: Incluye únicamente los campos con valores coincidentes del registro de destino. | |
KafkaMetaAdder | Ruta completa: Agrega un campo llamado "topic-partition-offset" y establece el valor en la concatenación de tema, partición y offset (desplazamiento) de Kafka al documento. | |
RenameByMapping | Ruta completa: Cambia el nombre de los campos que coinciden exactamente con un nombre de campo específico en el documento de clave o valor. | |
RenombrarPorRegex | Ruta completa: Cambia el nombre de los campos que coinciden con una expresión regular en el documento clave o valor. | |
Eliminador de valores de campos nulos | Ruta completa: Elimina todos los campos del documento que contienen valores |
Configura el Post Processor para agregar id documentos
El post-procesador DocumentIdAdder utiliza una estrategia para determinar cómo debe formatear el campo _id en el documento MongoDB. Una estrategia define un comportamiento preestablecido que se puede personalizar para un caso de uso determinado.
Puede especificar una estrategia para este post-procesador en la configuración document.id.strategy, como se muestra en el siguiente ejemplo:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
La siguiente tabla muestra una lista de las estrategias que puedes usar para configurar el post-procesador DocumentIdAdder:
Nombre de la estrategia | Descripción | |
|---|---|---|
BsonOidStrategy | Ruta completa: Genera un ObjectId BSON de MongoDB. | |
KafkaMetaDataStrategy | Ruta completa: Compila una string compuesta por la concatenación del tema, partición y desplazamiento de Kafka. | |
FullKeyStrategy | Ruta completa: Utiliza la estructura de clave completa del documento de destino para generar el valor del | |
ProvidedInKeyStrategy | Ruta completa: Utiliza el | |
ProvidedInValueStrategy | Ruta completa: Utiliza el | |
PartialKeyStrategy | Ruta completa: Utiliza una proyección de lista de bloqueo o lista de permisos de la estructura de claves del documento de destino. | |
PartialValueStrategy | Ruta completa: Utiliza una proyección de lista de bloqueo o lista de permisos de la estructura de valores del documento de destino. | |
UuidProvidedInKeyStrategy | Ruta completa: Convierte el | |
UuidProvidedInValueStrategy | Ruta completa: Convierte el | |
UuidStrategy | Ruta completa: Utiliza un UUID generado aleatoriamente en formato de string. |
Crear una estrategia personalizada de identificación de documentos
Si las estrategias de funcionalidad incorporada de adición de ID de documento no cubren tu caso de uso, puedes definir una estrategia de ID de documento personalizada siguiendo los pasos que se indican a continuación:
Crea una clase de Java que implemente la interfaz IdStrategy y contenga tu lógica personalizada de configuración.
Compila la clase en un archivo JAR.
Agrega el JAR compilado a la ruta de clase / ruta del plugin para todos los trabajadores de Kafka. Para obtener más información sobre las rutas de plugins, consulta la documentación de Confluent.
Actualice la configuración de
document.id.strategyal nombre completo de su clase personalizada en todos sus Kafka workers.
Nota
La estrategia seleccionada puede tener implicaciones en la semántica de entrega
Las estrategias BSON ObjectId o UUID solo pueden garantizar la entrega al menos una vez, ya que el conector genera nuevos ID en reintentos o al procesar de nuevo los registros. Otras estrategias permiten la entrega única si puedes garantizar que los campos que forman el ID del documento sean únicos.
Para ejemplos de implementaciones de la interfaz IdStrategy, consulta el directorio de código fuente que contiene implementaciones de estrategia id empaquetadas con el conector.
Ejemplos de postprocesador
Esta sección muestra ejemplos de configuración y salidas de muestra de los siguientes tipos de procesadores posteriores:
Ejemplos de lista de permitidos y lista de bloqueados
Los posprocesadores de proyector de la lista de permitidos y la lista de bloqueos determinan qué campos incluir y excluir en la salida.
Cuando utilizas el proyector de lista de permitidos, el postprocesador solo genera datos de los campos especificados.
Cuando utilices el proyector de lista de bloqueos, el post-proceso omitirá datos sólo de los campos que especifiques.
Nota
Puedes usar el "." notación de punto para referenciar campos anidados en el registro. También puedes usar la notación para hacer referencia a campos de documentos en un arreglo.
Cuando se agrega un proyector a la cadena del procesador posterior, se debe especificar el tipo del proyector y si se aplicará a la clave o al valor del documento de sumidero.
Consulta las siguientes secciones para ejemplos de configuraciones y salidas de proyectores.
Ejemplo de proyector de lista de permisos
Suponga que los documentos de valores de registros de Kafka se parecieran al siguiente dato de perfil de usuario:
{ "name": "Sally Kimball", "age": 10, "address": { "city": "Idaville", "country": "USA" }, "hobbies": [ "reading", "solving crime" ] }
Puedes configurar el proyector de valores AllowList para almacenar datos seleccionados como los campos "nombre", "dirección.ciudad" y "pasatiempos" de tus documentos de valores utilizando los siguientes parámetros:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies
Después de que el postprocesador aplica la proyección, se obtiene el siguiente registro:
{ "name": "Sally Kimball", "address": { "city": "Idaville" }, "hobbies": [ "reading", "solving crime" ] }
Ejemplo de proyector de listas de bloques
Supón que tus documentos de registro de Kafka se asemejan a los siguientes datos de identificación de usuario:
{ "username": "user5983", "registration": { "date": "2021-09-13", "source": "mobile" }, "authToken": { "alg": "HS256", "type": "JWT", "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk" } }
Puedes configurar el proyector de claves BlockList para omitir los campos "authToken" y "registration.source" campos antes de almacenar los datos con la siguiente configuración:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source
Después de que el postprocesador aplica la proyección, se obtiene el siguiente registro:
{ "username": "user5983", "registration": { "date": "2021-09-13", } }
Ejemplos de patrones coincidentes con comodines de proyección
Esta sección muestra cómo se pueden configurar los posprocesadores del proyector para hacer coincidir patrones comodín con nombres de campo.
Patrón | Descripción |
| Coincide con cualquier número de caracteres en el nivel actual. |
| Coincide con cualquier carácter en el nivel actual y en todos los niveles anidados. |
Para los ejemplos de coincidencia de patrones de comodín en la lista permitida y la lista bloqueada en esta sección, consulte el siguiente documento de valores que contiene mediciones meteorológicas:
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" }, "moisture": { "average": 340, "units": "mm" } } }
Permitir ejemplos de comodines de lista
Puedes utilizar el comodín * para hacer coincidir varios nombres de campo. La siguiente configuración de ejemplo coincide con los siguientes campos:
El campo de nivel superior denominado "ciudad"
Los campos llamadas "promedio" que son subdocumentos de cualquier campo de nivel superior que comience con el nombre "velocidad_viento".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average
Después de que el postprocesador aplique la proyección de la lista de permitidos, produce el siguiente registro:
{ "city": "Springfield", "wind_speed_10m": { "average": 3, }, "wind_speed_80m": { "average": 8, } }
Puedes usar el comodín **, que coincide con objetos en cualquier nivel a partir del nivel en el que especifiques el comodín. El siguiente ejemplo de coincidencia comodín proyecta cualquier documento que contenga el campo llamado "low".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low
El procesador posterior que aplica la proyección genera el siguiente registro:
{ "temperature": { "high": 28, "low": 24, "units": "C" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" } } }
Ejemplo de comodín en la lista de bloqueados
Puede utilizar los patrones de comodines para hacer coincidir campos a un nivel de documento específico, como se muestra en el siguiente ejemplo de configuración de lista de bloqueos:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "moisture": { "average": 340, "units": "mm" } } }
Ejemplos de renombramiento de campo
Esta sección muestra cómo puedes configurar los posprocesadores de renombrado de campos RenameByMapping y RenameByRegex para actualizar los nombres de campos en un registro de sumidero. La configuración de cambio de nombre de campo especifica lo siguiente:
Si actualizar la clave o el documento de valor en el registro
Los nombres de los campos a actualizar
Los nuevos nombres de los campos
Debe especificar la configuración de RenameByMapping y RenameByRegex en un arreglo JSON. Puedes especificar campos anidados utilizando la notación de puntos o la coincidencia de patrones.
Los ejemplos del posprocesador que renombra campos usan el siguiente registro de sumidero de ejemplo:
Documento clave
{ "location": "Provence", "date_month": "October", "date_day": 17 }
Documento de valor
{ "flapjacks": { "purchased": 598, "size": "large" } }
Renombrar por ejemplo de mapeo
La configuración del posprocesador RenameByMapping especifica uno o más objetos JSON que asignan los campos que coinciden con una string a un nuevo nombre. Cada objeto contiene el texto para hacer coincidir en el elemento oldName y el texto de reemplazo en el elemento newName como se describe en la tabla a continuación.
Nombre de la clave | Descripción |
|---|---|
oldName | Especifica si se deben coincidir los campos en el documento clave o en el documento valor y el nombre del campo que se reemplazará. La configuración utiliza un "." carácter para separar los dos valores. |
newName | Especifica el nombre del campo de reemplazo para todas las coincidencias del campo. |
La siguiente propiedad de ejemplo coincide con el campo "ubicación" de un documento clave y lo renombra como "país":
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]
Esta configuración da instrucciones al post-procesador RenameByMapping para convertir el documento de clave original en el siguiente documento:
{ "country": "Provence", "date_month": "October", "date_day": 17 }
Puedes realizar una tarea similar de asignación de nombres de campos a documentos de valor, especificando el documento de valor junto con el nombre de campo agregado en el campo oldName, de la siguiente manera:
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]
Esta configuración indica al procesador posterior de RenameByMapping que transforme el documento de valor original al siguiente documento:
{ "crepes": { "purchased": 598, "size": "large" } }
También puedes especificar una o más asignaciones en la propiedad field.renamer.mapping usando un arreglo JSON en formato de string como se muestra en la siguiente configuración:
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]
Renombrar por Expresión Regular
La configuración del posprocesador RenameByRegex especifica los nombres de los campos y los patrones de texto que debe coincidir, así como los valores de reemplazo para el texto coincidente. Puedes especificar una o más expresiones de cambio de nombre en objetos JSON que contengan los campos descritos en la siguiente tabla:
Nombre de la clave | Descripción |
|---|---|
regexp | Contiene una expresión regular que coincide con campos para realizar el reemplazo. |
Patrón | Contiene una expresión regular que coincide con el texto a reemplazar. |
Reemplazar | Contiene el texto de reemplazo para todas las coincidencias de la expresión regular que definiste en el campo |
La siguiente configuración de ejemplo instruye al pos-procesador a realizar lo siguiente:
Coincida con cualquier nombre de campo en el documento clave que comience con "date". En el conjunto de campos coincidentes, reemplaza todo el texto que coincida con el patrón
_por el carácter-.Coincide con cualquier nombre de campo en el documento de valores que sean subdocumentos de
crepes. En el conjunto de campos coincidentes, sustituya todo el texto que coincida con el patrónpurchasedporquantity.
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]
Cuando el conector aplica el posprocesador al documento clave de ejemplo y al documento de valor de ejemplo, produce lo siguiente:
Documento clave
{ "location": "Provence", "date-month": "October", "date-day": 17 }
Documento de valor
{ "crepes": { "quantity": 598, "size": "large" } }
Advertencia
Los procesadores posteriores de cambio de nombre no sobrescriben nombres de campos existentes
Los nombres de campo de destino que estableces en tus posprocesadores de renombrado pueden dar lugar a nombres de campo duplicados en el mismo documento. Para evitar esto, el post-procesador omite el cambio de nombre si se duplica el nombre de un campo existente en el mismo nivel del documento.
Cómo crear un procesador de publicaciones personalizado
Si los procesadores posteriores integrados no cubren tu caso de uso, puedes crear una clase de procesador posterior personalizada utilizando los siguientes pasos:
Crear una clase Java que extienda la clase abstracta PostProcessor.
Sobreescribe el método
process()en tu clase. Puedes actualizar elSinkDocument, una representación BSON de los campos clave y valor del registro de destino, y acceder al Kafka originalSinkRecorden tu método.Compila la clase en un archivo JAR.
Añade el JAR compilado a la ruta de clases/ruta de plugin para todos tus workers de Kafka. Para obtener más información sobre las rutas de los plugins, consulta la documentación de Confluent sobre Instalación manual de conectores comunitarios.
Agrega el nombre de clase completo de tu procesador posterior a la configuración de la cadena de procesadores posteriores.
Para procesadores posteriores, puedes explorar el código fuente de las clases de procesador posterior integradas.