Definición
La etapa verifica que los documentos en streaming se ajusten a un esquema de rangos, valores o tipos de datos $validate esperados.
Sintaxis
La etapa $validate procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| Documento | Requerido | Documento de expresiones utilizadas para validar los mensajes entrantes en función de un esquema definido por el usuario. Puedes utilizar todos menos los siguientes operadores del query para definir expresiones de validación:
|
| string | Opcional | Especifica la acción a tomar cuando un mensaje viola el esquema definido por el usuario. You can specify one of the following values:
|
Comportamiento
Puede usar $validate en cualquier punto de un pipeline después de la etapa $source y antes de la etapa $emit o $merge.
Si especificas las opciones discard o dlq para el campo validationAction, Atlas Stream Processing registra mensajes que no pasan la validación en el siguiente formato:
{ "t": <datetime>, "s": "<severity-level>", "c": "streams-<job-name>", "ctx": "<processed-pipeline>", "msg": "<message-body>", "attrs": { <result-of-logAttributes-evaluation> }, "tags": <array-of-strings>, "truncated": { <truncation-description> }, "size": <size-of-entry> }
La siguiente tabla describe los campos de la entrada del registro:
Campo | Tipo | Descripción |
|---|---|---|
| Documento | Documento que contiene los resultados de la evaluación del campo |
| string | Nombre del trabajo específico de Stream Processing en el que ocurrió la falla. |
| string | Nombre de la pipeline de datos en transmisión que se está procesando. |
| string | Cuerpo del mensaje que no superó la validación. |
Atlas Stream Processing solo admite JSON Schema Draft 4 o anteriores.
Ejemplo de validador
El siguiente documento muestra un ejemplo de expresión de validador utilizando $and para realizar una operación lógica Y:
{ $validate: { validator: { $and: [{ $expr: { $ne: [ "$Racer_Name", "Pace Car" ] } }, { $jsonSchema: { required: [ "Racer_Num", "Racer_Name", "lap", "Corner_Num", "timestamp" ], properties: { Racer_Num: { bsonType: "int", description: "'Racer_Num' is the integer number of the race car and is required" }, Racer_Name: { bsonType: "string", description: "'Racer_Name' must be a string and is required" }, lap: { bsonType: "int", minimum: 1, description: "'lap' must be a int and is required" }, Corner_Num: { bsonType: "int", minimum: 1, maximum: 4, description: "'Corner_Num' must be a int between 1 and 4 and is required" }, timestamp: { bsonType: "string", pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}$", description: "'timestamp' must be a string matching iso date pattern and is required" } } } }] }, validationAction : "dlq" } }
Ejemplos
Consideremos una fuente de datos en tiempo real que genera informes meteorológicos detallados desde diversas ubicaciones. En el siguiente ejemplo de canalización de agregación, se incluye una $validate etapa para garantizar que los documentos se ajusten al esquema del conjunto de datos meteorológicos de muestra. La agregación consta de cuatro etapas:
La etapa
$sourceestablece una conexión con el broker Apache Kafka que recopila estos informes en un tema llamadomy_weatherdata, pasando cada registro a medida que se ingiere a las siguientes etapas de agregación.La etapa
$matchexcluye los documentos que tienen un valor dewind.speed.ratemayor o igual que30y se pasa a los documentos con un valor dewind.speed.ratemenor que30.La fase
$mergeescribe la salida en una colección Atlas denominadastreamen la base de datossample_weatherstream. Si no existe tal base de datos o colección, Atlas los creará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$validate': { validator: { '$jsonSchema': { properties: { position: [Object], sections: [Object] } } }, validationAction: 'dlq' } }, { '$match': { 'wind.speed.rate': { '$lt': 30 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para ver los documentos en la colección sample_weatherstream.sample resultante, puedes conectarte a tu clúster de Atlas usando mongosh para ejecutar el siguiente comando:
Nota
El siguiente es un ejemplo representativo. Los datos de transmisión no son estáticos y cada usuario ve documentos distintos.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66b25302fe8bbac5f39dbdba'), airTemperature: { quality: '9', value: 3.5 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1', value: 10.9 }, tendency: { code: '3', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '9', value: 1022.5 } }, callLetters: 'JIVX', dataSource: '4', dewPoint: { quality: '9', value: 20.5 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-06T16:44:50.322Z'), liquidPrecipitation: { condition: '9', depth: 7000, period: 12, quality: '9' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '7' }, period: { quality: '1', value: 3 } }, position: { coordinates: [ 120.7, -98.2 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 999 }, presentWeatherObservationManual: { condition: '90', quality: '1' }, pressure: { quality: '1', value: 1028.2 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 11.1 }, sections: [ 'UG1', 'MA1', 'GA3', 'KA1', 'UA1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 390 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '06' }, lowCloudGenus: { quality: '9', value: '07' }, lowestCloudBaseHeight: { quality: '1', value: 800 }, lowestCloudCoverage: { quality: '9', value: '06' }, midCloudGenus: { quality: '9', value: '07' }, totalCoverage: { opaque: '99', quality: '1', value: '99' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 1200 }, cloudType: { quality: '9', value: '04' }, coverage: { quality: '1', value: '09' } }, st: 'x+36700+144300', type: 'FM-13', visibility: { distance: { quality: '9', value: 9000 }, variability: { quality: '9', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '00', quality: '9' }, waves: { height: 9.5, period: 4, quality: '9' } }, wind: { direction: { angle: 140, quality: '1' }, speed: { quality: '2', rate: 15.9 }, type: 'N' } }
Observe que todos los documentos de esta colección contienen los valores esperados de tipo arraypara position.coordinates y sections. Para ver los documentos que fallaron la validación, dada una fila de letra muerta llamada dlq, ejecuta el siguiente comando:
db.getSiblingDB("sample_weatherstream").dlq.find()
{ _id: ObjectId('66b254d3a045fb1406047394'), errInfo: { reason: 'Input document found to be invalid in $validate stage' }, doc: { airTemperature: { quality: '9', value: 7.6 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1', value: 0.3 }, tendency: { code: '8', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '9', value: 1015.9 }, stationPressure: { quality: '1', value: 1017 } }, callLetters: 'WRGL', dataSource: '4', dewPoint: { quality: '9', value: 25.3 }, elevation: 9999, extremeAirTemperature: { code: 'M', period: 99.9, quantity: '1', value: -30.9 }, liquidPrecipitation: { condition: '9', period: 99, quality: '9' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '2' }, period: { quality: '1', value: 6 } }, position: { coordinates: -100.2, type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 17 }, presentWeatherObservationManual: { condition: '08', quality: '1' }, pressure: { quality: '9', value: 1001 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 10.4 }, sections: [ 'GA2', 'GA1', 'KA1', 'AA1', 'OA1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 240 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '02' }, lowCloudGenus: { quality: '9', value: '02' }, lowestCloudBaseHeight: { quality: '1', value: 150 }, lowestCloudCoverage: { quality: '1', value: '03' }, midCloudGenus: { quality: '1', value: '06' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 450 }, cloudType: { quality: '9', value: '03' }, coverage: { quality: '1', value: '07' } }, st: 'x+20500-074300', type: 'SAO', visibility: { distance: { quality: '9', value: 3800 }, variability: { quality: '9', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '00', quality: '9' }, waves: { height: 37.5, period: 7, quality: '9' } }, wind: { direction: { angle: 230, quality: '1' }, speed: { quality: '1', rate: 46.3 }, type: 'N' }, ingestionTime: ISODate('2024-08-06T16:52:35.287Z') }, processorName: 'sampleWeather' }
Observa que todos los documentos en la fila de letra muerta tienen valores no válidos para position.coordinates, sections o ambos.