Overview
En esta página, puedes aprender a configurar post-procesadores en tu conector sink de MongoDB Kafka. Los procesadores posteriores modifican los registros de registro que el conector lee desde un tema de Kafka antes de que el conector los almacene en la colección de MongoDB. Algunos ejemplos de modificaciones de datos que los procesadores posteriores pueden realizar incluyen:
Establecer el documento
_idcampo a un valor personalizadoIncluir o excluir campos de clave o valor de mensaje
Cambiar el nombre de los campos
Puedes usar los post procesadores preconstruidos incluidos en el conector o implementar los tuyos propios.
Consulte las siguientes secciones para obtener más información sobre los postprocesadores:
Cómo los posprocesadores modifican los datos
Los posprocesadores modifican los datos leídos desde un tema de Kafka. El conector almacena el mensaje en una clase SinkDocument que contiene una representación de las claves SinkRecord y los campos de valor de Kafka. El conector aplica secuencialmente cualquier procesador posterior especificado en la configuración y almacena el resultado en una colección de MongoDB.
Los postprocesadores realizan tareas de modificación de datos, como generar el _id campo del documento, proyectar campos de clave o valor de mensaje y renombrar campos. Puede usar los postprocesadores predefinidos incluidos en el conector o implementar los suyos propios extendiendo el postprocesador.
clase.
Cómo especificar los Posprocesadores
Puede especificar uno o más posprocesadores en la configuración post.processor.chain como una lista separada por comas. Si especifica más de uno, el conector los aplica secuencialmente, de modo que cada posprocesador modifica la salida de datos según el anterior.
Para garantizar que los documentos que el conector escribe en MongoDB contengan campos _id únicos, añade automáticamente el posprocesador DocumentIdAdder en la primera posición de la cadena si no lo incluye de otra manera.
El siguiente ejemplo de configuración especifica que el conector debe ejecutar primero el postprocesador KafkaMetaAdder y luego el postprocesador AllowListValueProjector sobre la salida.
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Procesadores posteriores preconstruidos
La siguiente tabla contiene una lista de todos los procesadores posteriores incluidos en el conector del sumidero.
Nombre del posprocesador | Descripción | |
|---|---|---|
DocumentIdAdder | Full Path: Inserts an _id field determined by the configured strategy.The default strategy is BsonOidStrategy.For information on strategy options and configuration, see the
Configure the Document Id Adder Post Processor
section. | |
BlockListKeyProjector | Full Path: Removes matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
BlockListValueProjector | Full Path: Removes matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListKeyProjector | Full Path: Includes only matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListValueProjector | Full Path: Includes only matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
KafkaMetaAdder | Full Path: Adds a field named "topic-partition-offset" and sets the value
to the concatenation of Kafka topic, partition, and offset to the
document. | |
RenameByMapping | Full Path: Renames fields that are an exact match to a specified field name in
the key or value document. For information on configuration, see the
Renaming by Mapping Example. | |
RenameByRegex | Full Path: Renames fields that match a regular expression in the key or
value document. For information on configuration, see the
Renaming by Regular Expression Example. | |
NullFieldValueRemover | Full Path: Removes all document fields that contain null values from the sink record. |
Configura el Post Processor para agregar id documentos
El post-procesador DocumentIdAdder utiliza una estrategia para determinar cómo debe formatear el campo _id en el documento MongoDB. Una estrategia define un comportamiento preestablecido que se puede personalizar para un caso de uso determinado.
Puede especificar una estrategia para este postprocesador en la configuración document.id.strategy como se muestra en el siguiente ejemplo:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
La siguiente tabla muestra una lista de las estrategias que puedes usar para configurar el post-procesador DocumentIdAdder:
Nombre de la estrategia | Descripción | |
|---|---|---|
BsonOidStrategy | Full Path: Generates a MongoDB BSON ObjectId. Default strategy for the DocumentIdAdder post processor. | |
KafkaMetaDataStrategy | Full Path: Builds a string composed of the concatenation of Kafka topic,
partition, and offset. | |
FullKeyStrategy | Full Path: Uses the complete key structure of the sink document to generate the
value for the _id field.Defaults to a blank document if no key exists. | |
ProvidedInKeyStrategy | Full Path: Uses the _id field specified in the key structure of the sink
document.Throws an exception if the field is missing from the sink document. | |
ProvidedInValueStrategy | Full Path: Uses the _id field specified in the value structure of the
sink document.Throws an exception if the field is missing from the sink document. | |
PartialKeyStrategy | Full Path: Uses a block list or allow list projection of the sink document key
structure. Defaults to a blank document if no key exists. | |
PartialValueStrategy | Full Path: Uses a block list or allow list projection of the sink document
value structure. Defaults to a blank document if no value exists. | |
UuidProvidedInKeyStrategy | Full Path: Converts the _id key field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidProvidedInValueStrategy | Full Path: Converts the _id value field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidStrategy | Full Path: Uses a randomly generated UUID in string format. |
Crear una estrategia personalizada de identificación de documentos
Si las estrategias de funcionalidad incorporada de adición de ID de documento no cubren tu caso de uso, puedes definir una estrategia de ID de documento personalizada siguiendo los pasos que se indican a continuación:
Crea una clase de Java que implemente la interfaz IdStrategy y contenga tu lógica personalizada de configuración.
Compila la clase en un archivo JAR.
Agrega el JAR compilado a la ruta de clase / ruta del plugin para todos los trabajadores de Kafka. Para obtener más información sobre las rutas de plugins, consulta la documentación de Confluent.
Actualice la configuración
document.id.strategycon el nombre de clase completo de su clase personalizada en todos sus trabajadores de Kafka.
Nota
La estrategia seleccionada puede tener implicaciones en la semántica de entrega
Las estrategias BSON ObjectId o UUID solo pueden garantizar la entrega al menos una vez, ya que el conector genera nuevos ID en reintentos o al procesar de nuevo los registros. Otras estrategias permiten la entrega única si puedes garantizar que los campos que forman el ID del documento sean únicos.
Para ejemplos de implementaciones de la interfaz IdStrategy, consulta el directorio de código fuente que contiene implementaciones de estrategia id empaquetadas con el conector.
Ejemplos de postprocesador
Esta sección muestra ejemplos de configuración y salidas de muestra de los siguientes tipos de procesadores posteriores:
Ejemplos de listas de permitidos y listas de bloqueo
Los posprocesadores de proyector de la lista de permitidos y la lista de bloqueos determinan qué campos incluir y excluir en la salida.
Cuando utiliza el proyector de lista de permitidos, el postprocesador solo genera datos de los campos que usted especifica.
Cuando utilices el proyector de lista de bloqueos, el post-proceso omitirá datos sólo de los campos que especifiques.
Nota
Puedes usar el "." notación de punto para referenciar campos anidados en el registro. También puedes usar la notación para hacer referencia a campos de documentos en un arreglo.
Cuando agrega un proyector a su cadena de postprocesador, debe especificar el tipo de proyector y si desea aplicarlo a la parte de clave o valor del documento receptor.
Consulta las siguientes secciones para ejemplos de configuraciones y salidas de proyectores.
Ejemplo de proyector de lista de permisos
Suponga que los documentos de valores de registros de Kafka se parecieran al siguiente dato de perfil de usuario:
{ "name": "Sally Kimball", "age": 10, "address": { "city": "Idaville", "country": "USA" }, "hobbies": [ "reading", "solving crime" ] }
Puedes configurar el proyector de valores AllowList para almacenar datos seleccionados como los campos "nombre", "dirección.ciudad" y "pasatiempos" de tus documentos de valores utilizando los siguientes parámetros:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies
Después de que el postprocesador aplica la proyección, se obtiene el siguiente registro:
{ "name": "Sally Kimball", "address": { "city": "Idaville" }, "hobbies": [ "reading", "solving crime" ] }
Ejemplo de proyector de listas de bloques
Supongamos que sus documentos clave de registro de Kafka se parecen a los siguientes datos de identificación de usuario:
{ "username": "user5983", "registration": { "date": "2021-09-13", "source": "mobile" }, "authToken": { "alg": "HS256", "type": "JWT", "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk" } }
Puedes configurar el proyector de claves BlockList para omitir los campos "authToken" y "registration.source" campos antes de almacenar los datos con la siguiente configuración:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source
Después de que el postprocesador aplica la proyección, se obtiene el siguiente registro:
{ "username": "user5983", "registration": { "date": "2021-09-13", } }
Ejemplos de patrones coincidentes con comodines de proyección
Esta sección muestra cómo se pueden configurar los posprocesadores del proyector para hacer coincidir patrones comodín con nombres de campo.
Patrón | Descripción |
| Coincide con cualquier número de caracteres en el nivel actual. |
| Coincide con cualquier carácter en el nivel actual y en todos los niveles anidados. |
Para los ejemplos de coincidencia de patrones de comodín en la lista permitida y la lista bloqueada en esta sección, consulte el siguiente documento de valores que contiene mediciones meteorológicas:
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" }, "moisture": { "average": 340, "units": "mm" } } }
Permitir ejemplos de comodines de lista
Puedes utilizar el comodín * para hacer coincidir varios nombres de campo. La siguiente configuración de ejemplo coincide con los siguientes campos:
El campo de nivel superior denominado "ciudad"
Los campos llamadas "promedio" que son subdocumentos de cualquier campo de nivel superior que comience con el nombre "velocidad_viento".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average
Después de que el postprocesador aplique la proyección de la lista de permitidos, produce el siguiente registro:
{ "city": "Springfield", "wind_speed_10m": { "average": 3, }, "wind_speed_80m": { "average": 8, } }
Puedes usar el comodín **, que coincide con objetos en cualquier nivel a partir del nivel en el que especifiques el comodín. El siguiente ejemplo de coincidencia comodín proyecta cualquier documento que contenga el campo llamado "low".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low
El procesador posterior que aplica la proyección genera el siguiente registro:
{ "temperature": { "high": 28, "low": 24, "units": "C" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" } } }
Ejemplo de comodín en la lista de bloqueados
Puede utilizar los patrones de comodines para hacer coincidir campos a un nivel de documento específico, como se muestra en el siguiente ejemplo de configuración de lista de bloqueos:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "moisture": { "average": 340, "units": "mm" } } }
Ejemplos de renombramiento de campo
Esta sección muestra cómo configurar los posprocesadores de renombramiento de campos RenameByMapping y RenameByRegex para actualizar los nombres de campo en un registro receptor. La configuración de renombramiento de campos especifica lo siguiente:
Si se debe actualizar el documento clave o de valor en el registro
Los nombres de los campos a actualizar
Los nuevos nombres de los campos
Debe especificar la configuración de RenameByMapping y RenameByRegex en un arreglo JSON. Puedes especificar campos anidados utilizando la notación de puntos o la coincidencia de patrones.
Los ejemplos de postprocesador de renombramiento de campo utilizan el siguiente registro de receptor de ejemplo:
Documento clave
{ "location": "Provence", "date_month": "October", "date_day": 17 }
Documento de valor
{ "flapjacks": { "purchased": 598, "size": "large" } }
Ejemplo de cambio de nombre por mapeo
La configuración del posprocesador RenameByMapping especifica uno o más objetos JSON que asignan los campos que coinciden con una string a un nuevo nombre. Cada objeto contiene el texto para hacer coincidir en el elemento oldName y el texto de reemplazo en el elemento newName como se describe en la tabla a continuación.
Nombre de la clave | Descripción |
|---|---|
oldName | Especifica si se deben coincidir los campos en el documento clave o en el documento valor y el nombre del campo que se reemplazará. La configuración utiliza un "." carácter para separar los dos valores. |
newName | Especifica el nombre del campo de reemplazo para todas las coincidencias del campo. |
La siguiente propiedad de ejemplo coincide con el campo "ubicación" de un documento clave y lo renombra como "país":
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]
Esta configuración da instrucciones al post-procesador RenameByMapping para convertir el documento de clave original en el siguiente documento:
{ "country": "Provence", "date_month": "October", "date_day": 17 }
Puede realizar una asignación de nombre de campo similar para documentos de valor especificando el documento de valor con el nombre de campo adjunto en el campo oldName de la siguiente manera:
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]
Esta configuración indica al procesador posterior de RenameByMapping que transforme el documento de valor original al siguiente documento:
{ "crepes": { "purchased": 598, "size": "large" } }
También puedes especificar una o más asignaciones en la propiedad field.renamer.mapping usando un arreglo JSON en formato de string como se muestra en la siguiente configuración:
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]
Cambiar nombre por expresión regular
La configuración del posprocesador RenameByRegex especifica los nombres de campo y los patrones de texto que debe coincidir, así como los valores de reemplazo para el texto coincidente. Puede especificar una o más expresiones de cambio de nombre en objetos JSON que contengan los campos descritos en la siguiente tabla:
Nombre de la clave | Descripción |
|---|---|
regexp | Contiene una expresión regular que coincide con campos para realizar el reemplazo. |
Patrón | Contiene una expresión regular que coincide con el texto a reemplazar. |
Reemplazar | Contiene el texto de reemplazo para todas las coincidencias de la expresión regular que definiste en el campo |
La siguiente configuración de ejemplo instruye al pos-procesador a realizar lo siguiente:
Coincida con cualquier nombre de campo en el documento clave que comience con "date". En el conjunto de campos coincidentes, reemplaza todo el texto que coincida con el patrón
_por el carácter-.Coincide con cualquier nombre de campo en el documento de valores que sean subdocumentos de
crepes. En el conjunto de campos coincidentes, sustituya todo el texto que coincida con el patrónpurchasedporquantity.
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]
Cuando el conector aplica el posprocesador al documento clave de ejemplo y al documento de valor de ejemplo, produce lo siguiente:
Documento clave
{ "location": "Provence", "date-month": "October", "date-day": 17 }
Documento de valor
{ "crepes": { "quantity": 598, "size": "large" } }
Advertencia
Los procesadores posteriores de cambio de nombre no sobrescriben nombres de campos existentes
Los nombres de campo de destino que estableces en tus posprocesadores de renombrado pueden dar lugar a nombres de campo duplicados en el mismo documento. Para evitar esto, el post-procesador omite el cambio de nombre si se duplica el nombre de un campo existente en el mismo nivel del documento.
Cómo crear un procesador de publicaciones personalizado
Si los procesadores posteriores integrados no cubren tu caso de uso, puedes crear una clase de procesador posterior personalizada utilizando los siguientes pasos:
Crear una clase Java que extienda la clase abstracta PostProcessor.
Sobreescribe el método
process()en tu clase. Puedes actualizar elSinkDocument, una representación BSON de los campos clave y valor del registro de destino, y acceder al Kafka originalSinkRecorden tu método.Compila la clase en un archivo JAR.
Añade el JAR compilado a la ruta de clases/ruta de plugin para todos tus workers de Kafka. Para obtener más información sobre las rutas de los plugins, consulta la documentación de Confluent sobre Instalación manual de conectores comunitarios.
Agregue el nombre completo de la clase de su postprocesador a la configuración de la cadena de postprocesador.
Para procesadores posteriores, puedes explorar el código fuente de las clases de procesador posterior integradas.