Overview
En esta página, puede aprender a configurar postprocesadores en su conector receptor de MongoDB Kafka. Los postprocesadores modifican los registros receptores que el conector lee de un tema de Kafka antes de que este los almacene en su colección de MongoDB. Algunos ejemplos de modificaciones de datos que pueden realizar los postprocesadores incluyen:
Establecer el documento
_idcampo a un valor personalizadoIncluir o excluir campos de clave o valor de mensaje
Cambiar el nombre de los campos
Puede utilizar los posprocesadores prediseñados incluidos en el conector o implementar los suyos propios.
Consulte las siguientes secciones para obtener más información sobre los postprocesadores:
Cómo modifican los datos los postprocesadores
Los postprocesadores modifican los datos leídos de un tema de Kafka. El conector almacena el mensaje en una clase SinkDocument que contiene una representación de los campos de clave y valor SinkRecord de Kafka. El conector aplica secuencialmente cualquier postprocesador especificado en la configuración y almacena el resultado en una colección de MongoDB.
Los postprocesadores realizan tareas de modificación de datos, como generar el _id campo del documento, proyectar campos de clave o valor de mensaje y renombrar campos. Puede usar los postprocesadores predefinidos incluidos en el conector o implementar los suyos propios extendiendo el postprocesador.
clase.
Cómo especificar postprocesadores
Puede especificar uno o más posprocesadores en la configuración post.processor.chain como una lista separada por comas. Si especifica más de uno, el conector los aplica secuencialmente, de modo que cada posprocesador modifica la salida de datos según el anterior.
Para garantizar que los documentos que el conector escribe en MongoDB contengan campos _id únicos, agrega automáticamente el postprocesador DocumentIdAdder en la primera posición de la cadena si no lo incluye de otra manera.
La siguiente configuración de ejemplo especifica que el conector debe ejecutar primero el postprocesador KafkaMetaAdder y luego el postprocesador AllowListValueProjector en la salida.
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Postprocesadores prediseñados
La siguiente tabla contiene una lista de todos los postprocesadores incluidos en el conector del sumidero.
Nombre del postprocesador | Descripción | |
|---|---|---|
DocumentIdAdder | Full Path: Inserts an _id field determined by the configured strategy.The default strategy is BsonOidStrategy.For information on strategy options and configuration, see the
Configure the Document Id Adder Post Processor
section. | |
BlockListKeyProjector | Full Path: Removes matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
BlockListValueProjector | Full Path: Removes matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListKeyProjector | Full Path: Includes only matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
AllowListValueProjector | Full Path: Includes only matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
KafkaMetaAdder | Full Path: Adds a field named "topic-partition-offset" and sets the value
to the concatenation of Kafka topic, partition, and offset to the
document. | |
RenameByMapping | Full Path: Renames fields that are an exact match to a specified field name in
the key or value document. For information on configuration, see the
Renaming by Mapping Example. | |
RenameByRegex | Full Path: Renames fields that match a regular expression in the key or
value document. For information on configuration, see the
Renaming by Regular Expression Example. | |
NullFieldValueRemover | Full Path: Removes all document fields that contain null values from the sink record. |
Configurar el postprocesador del sumador de ID de documento
El post-procesador DocumentIdAdder utiliza una estrategia para determinar cómo debe formatear el campo _id en el documento MongoDB. Una estrategia define un comportamiento preestablecido que se puede personalizar para un caso de uso determinado.
Puede especificar una estrategia para este postprocesador en la configuración document.id.strategy como se muestra en el siguiente ejemplo:
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
La siguiente tabla muestra una lista de las estrategias que puede utilizar para configurar el postprocesador DocumentIdAdder:
Nombre de la estrategia | Descripción | |
|---|---|---|
BsonOidStrategy | Full Path: Generates a MongoDB BSON ObjectId. Default strategy for the DocumentIdAdder post processor. | |
KafkaMetaDataStrategy | Full Path: Builds a string composed of the concatenation of Kafka topic,
partition, and offset. | |
FullKeyStrategy | Full Path: Uses the complete key structure of the sink document to generate the
value for the _id field.Defaults to a blank document if no key exists. | |
ProvidedInKeyStrategy | Full Path: Uses the _id field specified in the key structure of the sink
document.Throws an exception if the field is missing from the sink document. | |
ProvidedInValueStrategy | Full Path: Uses the _id field specified in the value structure of the
sink document.Throws an exception if the field is missing from the sink document. | |
PartialKeyStrategy | Full Path: Uses a block list or allow list projection of the sink document key
structure. Defaults to a blank document if no key exists. | |
PartialValueStrategy | Full Path: Uses a block list or allow list projection of the sink document
value structure. Defaults to a blank document if no value exists. | |
UuidProvidedInKeyStrategy | Full Path: Converts the _id key field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidProvidedInValueStrategy | Full Path: Converts the _id value field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
UuidStrategy | Full Path: Uses a randomly generated UUID in string format. |
Crear una estrategia de identificación de documento personalizada
Si las estrategias de adición de identificación de documento integradas no cubren su caso de uso, puede definir una estrategia de identificación de documento personalizada siguiendo los pasos a continuación:
Cree una clase Java que implemente la interfaz IdStrategy y contenga su lógica de configuración personalizada.
Compila la clase en un archivo JAR.
Añade el JAR compilado a la ruta de clases o del plugin de todos tus trabajadores de Kafka. Para más información sobre las rutas de plugins, consulta la documentación de Confluent.
Actualice la configuración
document.id.strategycon el nombre de clase completo de su clase personalizada en todos sus trabajadores de Kafka.
Nota
La estrategia seleccionada puede tener implicaciones en la semántica de entrega
Las estrategias BSON ObjectId o UUID solo garantizan una entrega única, ya que el conector genera nuevos identificadores en los reintentos o al volver a procesar los registros. Otras estrategias permiten una entrega única si se garantiza la exclusividad de los campos que forman el identificador del documento.
Para ver ejemplos de implementaciones de la IdStrategy interfaz, consulte el directorio del código fuente que contiene las implementaciones de la estrategia de identificación empaquetadas con el conector.
Ejemplos de postprocesadores
Esta sección muestra ejemplos de configuración y salidas de muestra de los siguientes tipos de procesadores posteriores:
Ejemplos de listas de permitidos y listas de bloqueo
Los posprocesadores del proyector de listas de permitidos y de bloqueos determinan qué campos incluir y excluir de la salida.
Cuando utiliza el proyector de lista de permitidos, el postprocesador solo genera datos de los campos que usted especifica.
Cuando utiliza el proyector de lista de bloqueos, el proceso posterior solo omite los datos de los campos que usted especifica.
Nota
Puede usar la notación "." (punto) para hacer referencia a campos anidados en el registro. También puede usar esta notación para hacer referencia a campos de documentos en una matriz.
Cuando agrega un proyector a su cadena de postprocesador, debe especificar el tipo de proyector y si desea aplicarlo a la parte de clave o valor del documento receptor.
Consulte las siguientes secciones para ver ejemplos de configuraciones y salidas del proyector.
Ejemplo de proyector de lista de permitidos
Supongamos que sus documentos de valor de registro de Kafka se parecen a los siguientes datos de perfil de usuario:
{ "name": "Sally Kimball", "age": 10, "address": { "city": "Idaville", "country": "USA" }, "hobbies": [ "reading", "solving crime" ] }
Puede configurar el proyector de valores AllowList para almacenar datos seleccionados como los campos "nombre", "dirección.ciudad" y "pasatiempos" de sus documentos de valores mediante las siguientes configuraciones:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies
Después de que el postprocesador aplica la proyección, genera el siguiente registro:
{ "name": "Sally Kimball", "address": { "city": "Idaville" }, "hobbies": [ "reading", "solving crime" ] }
Ejemplo de proyector de lista de bloques
Supongamos que sus documentos clave de registro de Kafka se parecen a los siguientes datos de identificación de usuario:
{ "username": "user5983", "registration": { "date": "2021-09-13", "source": "mobile" }, "authToken": { "alg": "HS256", "type": "JWT", "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk" } }
Puede configurar el proyector de teclas BlockList para omitir los campos "authToken" y "registration.source" antes de almacenar los datos con las siguientes configuraciones:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source
Después de que el postprocesador aplica la proyección, genera el siguiente registro:
{ "username": "user5983", "registration": { "date": "2021-09-13", } }
Ejemplos de coincidencia de patrones de proyección comodín
En esta sección se muestra cómo configurar los postprocesadores del proyector para que coincidan con los patrones comodín para que coincidan con los nombres de campo.
Patrón | Descripción |
| Coincide con cualquier número de caracteres en el nivel actual. |
| Coincide con cualquier personaje del nivel actual y todos los niveles anidados. |
Para conocer los ejemplos de coincidencia de patrones comodín de listas de permitidos y listas de bloqueados en esta sección, consulte el siguiente documento de valores que contiene mediciones meteorológicas:
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" }, "moisture": { "average": 340, "units": "mm" } } }
Ejemplos de comodines de listas de permitidos
Puedes utilizar el comodín * para hacer coincidir varios nombres de campo. La siguiente configuración de ejemplo coincide con los siguientes campos:
El campo de nivel superior llamado "ciudad"
Los campos denominados "promedio" que son subdocumentos de cualquier campo de nivel superior que comience con el nombre "velocidad_del_viento".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average
Después de que el postprocesador aplica la proyección de la lista de permitidos, genera el siguiente registro:
{ "city": "Springfield", "wind_speed_10m": { "average": 3, }, "wind_speed_80m": { "average": 8, } }
Puedes usar el comodín **, que coincide con objetos en cualquier nivel a partir del nivel en el que especifiques el comodín. El siguiente ejemplo de coincidencia comodín proyecta cualquier documento que contenga el campo llamado "low".
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low
El postprocesador que aplica la proyección genera el siguiente registro:
{ "temperature": { "high": 28, "low": 24, "units": "C" }, "soil_conditions": { "temperature": { "high": 22, "low": 17, "units": "C" } } }
Ejemplo de comodín de lista de bloqueo
Puede utilizar patrones comodín para hacer coincidir campos en un nivel de documento específico como se muestra en el siguiente ejemplo de configuración de lista de bloqueo:
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature
{ "city": "Springfield", "temperature": { "high": 28, "low": 24, "units": "C" }, "wind_speed_10m": { "average": 3, "units": "km/h" }, "wind_speed_80m": { "average": 8, "units": "km/h" }, "soil_conditions": { "moisture": { "average": 340, "units": "mm" } } }
Ejemplos de cambio de nombre de campos
Esta sección muestra cómo configurar los posprocesadores de renombramiento de campos RenameByMapping y RenameByRegex para actualizar los nombres de campo en un registro receptor. La configuración de renombramiento de campos especifica lo siguiente:
Si se debe actualizar el documento clave o de valor en el registro
Los nombres de los campos a actualizar
Los nuevos nombres de campo
Debe especificar las configuraciones RenameByMapping y RenameByRegex en una matriz JSON. Puede especificar campos anidados mediante notación de puntos o coincidencia de patrones.
Los ejemplos de postprocesador de renombramiento de campo utilizan el siguiente registro de receptor de ejemplo:
Documento clave
{ "location": "Provence", "date_month": "October", "date_day": 17 }
Documento de valor
{ "flapjacks": { "purchased": 598, "size": "large" } }
Ejemplo de cambio de nombre por mapeo
La configuración del posprocesador RenameByMapping especifica uno o más objetos JSON que asignan un nuevo nombre a los campos que coinciden con una cadena. Cada objeto contiene el texto que debe coincidir en el elemento oldName y el texto de reemplazo en el elemento newName, como se describe en la tabla a continuación.
Nombre de la clave | Descripción |
|---|---|
nombre antiguo | Especifica si se deben hacer coincidir los campos del documento de clave o valor y el nombre del campo que se va a reemplazar. La configuración utiliza un carácter "." para separar los dos valores. |
nuevoNombre | Especifica el nombre del campo de reemplazo para todas las coincidencias del campo. |
La siguiente propiedad de ejemplo coincide con el campo "ubicación" de un documento clave y lo renombra a "país":
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]
Esta configuración le indica al RenameByMapping postprocesador que transforme el documento clave original en el siguiente documento:
{ "country": "Provence", "date_month": "October", "date_day": 17 }
Puede realizar una asignación de nombre de campo similar para documentos de valor especificando el documento de valor con el nombre de campo adjunto en el campo oldName de la siguiente manera:
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]
Esta configuración le indica al RenameByMapping postprocesador que transforme el documento de valor original en el siguiente documento:
{ "crepes": { "purchased": 598, "size": "large" } }
También puede especificar una o más asignaciones en la propiedad field.renamer.mapping utilizando una matriz JSON en formato de cadena como se muestra en la siguiente configuración:
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]
Cambiar nombre por expresión regular
La configuración del posprocesador RenameByRegex especifica los nombres de campo y los patrones de texto que debe coincidir, así como los valores de reemplazo para el texto coincidente. Puede especificar una o más expresiones de cambio de nombre en objetos JSON que contengan los campos descritos en la siguiente tabla:
Nombre de la clave | Descripción |
|---|---|
regexp | Contiene una expresión regular que coincide con los campos para realizar el reemplazo. |
patrón | Contiene una expresión regular que coincide con el texto a reemplazar. |
Reemplazar | Contiene el texto de reemplazo para todas las coincidencias de la expresión regular definida en el campo |
La siguiente configuración de ejemplo le indica al postprocesador que realice lo siguiente:
Coincida con cualquier nombre de campo del documento clave que comience por "fecha". En el conjunto de campos coincidentes, reemplace todo el texto que coincida con el patrón
_por el carácter-.Coincide con cualquier nombre de campo en el documento de valor que sea subdocumento de
crepes. En el conjunto de campos coincidentes, reemplaza todo el texto que coincida con el patrónpurchasedporquantity.
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]
Cuando el conector aplica el postprocesador al documento clave de ejemplo y al documento de valor de ejemplo, genera lo siguiente:
Documento clave
{ "location": "Provence", "date-month": "October", "date-day": 17 }
Documento de valor
{ "crepes": { "quantity": 598, "size": "large" } }
Advertencia
Los posprocesadores de renombramiento no sobrescriben los nombres de campos existentes
Los nombres de campo de destino que configure en los posprocesadores de renombrado pueden generar nombres de campo duplicados en el mismo documento. Para evitarlo, el posprocesador omite el cambio de nombre cuando duplica un nombre de campo existente en el mismo nivel del documento.
Cómo crear un procesador de publicaciones personalizado
Si los postprocesadores integrados no cubren su caso de uso, puede crear una clase de postprocesador personalizada siguiendo los siguientes pasos:
Cree una clase Java que extienda la clase abstracta PostProcessor.
Anule el método
process()en su clase. Puede actualizarSinkDocument, una representación BSON de los campos clave y valor del registro receptor, y acceder al método KafkaSinkRecordoriginal en su método.Compila la clase en un archivo JAR.
Agregue el JAR compilado a la ruta de clase o ruta del plugin para todos sus trabajadores de Kafka. Para obtener más información sobre las rutas de los plugins, consulte la documentación de Confluent sobre la instalación manual de conectores comunitarios.
Agregue el nombre completo de la clase de su postprocesador a la configuración de la cadena de postprocesador.
Para procesadores posteriores, puedes explorar el código fuente de las clases de procesador posterior integradas.