Definición
El $validate La etapa verifica que los documentos de transmisión se ajusten a un esquema de rangos, valores o tipos de datos esperados.
Sintaxis
La etapa $validate procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| Documento | Requerido | Documento de expresiones utilizadas para validar los mensajes entrantes con respecto a un esquema definido por el usuario. Puede usar todos los operadores de consulta, excepto los siguientes, para definir expresiones de validación:
|
| string | Opcional | Especifica la acción que se debe tomar cuando un mensaje infringe el esquema definido por el usuario. Puede especificar uno de los siguientes valores:
|
Comportamiento
Puede usar $validate en cualquier punto de un pipeline después de la etapa $source y antes de la etapa $emit o $merge.
Si especificas las opciones discard o dlq para el campo validationAction, Atlas Stream Processing registra mensajes que no pasan la validación en el siguiente formato:
{ "t": <datetime>, "s": "<severity-level>", "c": "streams-<job-name>", "ctx": "<processed-pipeline>", "msg": "<message-body>", "attrs": { <result-of-logAttributes-evaluation> }, "tags": <array-of-strings>, "truncated": { <truncation-description> }, "size": <size-of-entry> }
La siguiente tabla describe los campos de entrada del registro:
Campo | Tipo | Descripción |
|---|---|---|
| Documento | Documento que contiene los resultados de la evaluación del campo |
| string | Nombre del trabajo de procesamiento de flujo específico en el que ocurrió la falla. |
| string | Nombre de la canalización de datos de transmisión que se está procesando. |
| string | Cuerpo del mensaje que falló la validación. |
Atlas Stream Processing solo admite el borrador de esquema JSON 4 o antes.
Ejemplo de validador
El siguiente documento muestra un ejemplo de expresión de validación que utiliza $and para realizar una operación AND lógica:
{ $validate: { validator: { $and: [{ $expr: { $ne: [ "$Racer_Name", "Pace Car" ] } }, { $jsonSchema: { required: [ "Racer_Num", "Racer_Name", "lap", "Corner_Num", "timestamp" ], properties: { Racer_Num: { bsonType: "int", description: "'Racer_Num' is the integer number of the race car and is required" }, Racer_Name: { bsonType: "string", description: "'Racer_Name' must be a string and is required" }, lap: { bsonType: "int", minimum: 1, description: "'lap' must be a int and is required" }, Corner_Num: { bsonType: "int", minimum: 1, maximum: 4, description: "'Corner_Num' must be a int between 1 and 4 and is required" }, timestamp: { bsonType: "string", pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}$", description: "'timestamp' must be a string matching iso date pattern and is required" } } } }] }, validationAction : "dlq" } }
Ejemplos
Considere una fuente de datos de streaming que genera informes meteorológicos detallados de diversas ubicaciones. En el siguiente ejemplo de canalización de agregación, se incluye una etapa para garantizar que los documentos se ajusten al esquema de $validate la Conjunto de datos meteorológicos de muestra. La agregación consta de cuatro etapas:
La etapa establece
$sourceuna conexión con el agente Apache Kafka que recopila estos informes en un tema llamadomy_weatherdatay pasa cada registro a medida que se ingiere a las etapas de agregación posteriores.La etapa verifica si un documento tiene valores de matriz para
$validatelosposition.coordinatessectionscampos y, pasando los documentos que los tienen al resto del pipeline y pasando los documentos que no los tienen a un DLQ.La etapa excluye los documentos que tienen
$matchunwind.speed.ratevalor mayor o igual a30y pasa los documentos con unwind.speed.ratevalor menor30que.La etapa escribe la salida en una colección de Atlas
$mergellamadastreamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$validate': { validator: { '$jsonSchema': { properties: { position: [Object], sections: [Object] } } }, validationAction: 'dlq' } }, { '$match': { 'wind.speed.rate': { '$lt': 30 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para ver los documentos en la colección sample_weatherstream.sample resultante, puedes conectarte a tu clúster de Atlas usando mongosh para ejecutar el siguiente comando:
Nota
El siguiente es un ejemplo representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66b25302fe8bbac5f39dbdba'), airTemperature: { quality: '9', value: 3.5 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1', value: 10.9 }, tendency: { code: '3', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '9', value: 1022.5 } }, callLetters: 'JIVX', dataSource: '4', dewPoint: { quality: '9', value: 20.5 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-06T16:44:50.322Z'), liquidPrecipitation: { condition: '9', depth: 7000, period: 12, quality: '9' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '7' }, period: { quality: '1', value: 3 } }, position: { coordinates: [ 120.7, -98.2 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 999 }, presentWeatherObservationManual: { condition: '90', quality: '1' }, pressure: { quality: '1', value: 1028.2 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 11.1 }, sections: [ 'UG1', 'MA1', 'GA3', 'KA1', 'UA1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 390 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '06' }, lowCloudGenus: { quality: '9', value: '07' }, lowestCloudBaseHeight: { quality: '1', value: 800 }, lowestCloudCoverage: { quality: '9', value: '06' }, midCloudGenus: { quality: '9', value: '07' }, totalCoverage: { opaque: '99', quality: '1', value: '99' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 1200 }, cloudType: { quality: '9', value: '04' }, coverage: { quality: '1', value: '09' } }, st: 'x+36700+144300', type: 'FM-13', visibility: { distance: { quality: '9', value: 9000 }, variability: { quality: '9', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '00', quality: '9' }, waves: { height: 9.5, period: 4, quality: '9' } }, wind: { direction: { angle: 140, quality: '1' }, speed: { quality: '2', rate: 15.9 }, type: 'N' } }
Observe que todos los documentos de esta colección tienen los valores de tipo arrayesperados para position.coordinates y sections. Para ver los documentos que no pasaron la validación, dada una cola de mensajes fallidos llamada dlq, ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").dlq.find()
{ _id: ObjectId('66b254d3a045fb1406047394'), errInfo: { reason: 'Input document found to be invalid in $validate stage' }, doc: { airTemperature: { quality: '9', value: 7.6 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1', value: 0.3 }, tendency: { code: '8', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '9', value: 1015.9 }, stationPressure: { quality: '1', value: 1017 } }, callLetters: 'WRGL', dataSource: '4', dewPoint: { quality: '9', value: 25.3 }, elevation: 9999, extremeAirTemperature: { code: 'M', period: 99.9, quantity: '1', value: -30.9 }, liquidPrecipitation: { condition: '9', period: 99, quality: '9' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '2' }, period: { quality: '1', value: 6 } }, position: { coordinates: -100.2, type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 17 }, presentWeatherObservationManual: { condition: '08', quality: '1' }, pressure: { quality: '9', value: 1001 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 10.4 }, sections: [ 'GA2', 'GA1', 'KA1', 'AA1', 'OA1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 240 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '02' }, lowCloudGenus: { quality: '9', value: '02' }, lowestCloudBaseHeight: { quality: '1', value: 150 }, lowestCloudCoverage: { quality: '1', value: '03' }, midCloudGenus: { quality: '1', value: '06' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 450 }, cloudType: { quality: '9', value: '03' }, coverage: { quality: '1', value: '07' } }, st: 'x+20500-074300', type: 'SAO', visibility: { distance: { quality: '9', value: 3800 }, variability: { quality: '9', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '00', quality: '9' }, waves: { height: 37.5, period: 7, quality: '9' } }, wind: { direction: { angle: 230, quality: '1' }, speed: { quality: '1', rate: 46.3 }, type: 'N' }, ingestionTime: ISODate('2024-08-06T16:52:35.287Z') }, processorName: 'sampleWeather' }
Observe que todos los documentos en la cola de mensajes no entregados tienen valores no válidos para position.coordinates, sections o ambos.