Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

$validate Etapa (Stream Processing)

La etapa verifica que los documentos en streaming se ajusten a un esquema de rangos, valores o tipos de datos $validate esperados.

$validate

Una etapa de pipeline $validate tiene la siguiente forma de prototipo:

{
"$validate": {
"validator": { <filter> },
"validationAction" : "discard" | "dlq"
}
}

La etapa $validate procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

validator

Documento

Requerido

Documento de expresiones utilizadas para validar los mensajes entrantes en función de un esquema definido por el usuario. Puedes utilizar todos menos los siguientes operadores del query para definir expresiones de validación:

  • $near

  • $nearSphere

  • $text

  • $where

validationAction

string

Opcional

Especifica la acción a tomar cuando un mensaje viola el esquema definido por el usuario. You can specify one of the following values:

  • discard: Descarta el mensaje. Si no especifica un valor para validationAction, este es el comportamiento por defecto.

  • dlq: Registra la infracción en la colección definida en la configuración de tu Procesador de Stream y realiza un descarte del mejor esfuerzo sin garantías transaccionales.

Puede usar $validate en cualquier punto de un pipeline después de la etapa $source y antes de la etapa $emit o $merge.

Si especificas las opciones discard o dlq para el campo validationAction, Atlas Stream Processing registra mensajes que no pasan la validación en el siguiente formato:

{
"t": <datetime>,
"s": "<severity-level>",
"c": "streams-<job-name>",
"ctx": "<processed-pipeline>",
"msg": "<message-body>",
"attrs": {
<result-of-logAttributes-evaluation>
},
"tags": <array-of-strings>,
"truncated": {
<truncation-description>
},
"size": <size-of-entry>
}

La siguiente tabla describe los campos de la entrada del registro:

Campo
Tipo
Descripción

attrs

Documento

Documento que contiene los resultados de la evaluación del campo logAttributes en la definición $validate. El resultado es una lista de campos.

c

string

Nombre del trabajo específico de Stream Processing en el que ocurrió la falla.

ctx

string

Nombre de la pipeline de datos en transmisión que se está procesando.

msg

string

Cuerpo del mensaje que no superó la validación.

Atlas Stream Processing solo admite JSON Schema Draft 4 o anteriores.

El siguiente documento muestra un ejemplo de expresión de validador utilizando $and para realizar una operación lógica Y:

{
$validate: {
validator: {
$and: [{
$expr: {
$ne: [
"$Racer_Name",
"Pace Car"
]
}
},
{
$jsonSchema: {
required: [ "Racer_Num", "Racer_Name", "lap", "Corner_Num", "timestamp" ],
properties: {
Racer_Num: {
bsonType: "int",
description: "'Racer_Num' is the integer number of the race car and is required"
},
Racer_Name: {
bsonType: "string",
description: "'Racer_Name' must be a string and is required"
},
lap: {
bsonType: "int",
minimum: 1,
description: "'lap' must be a int and is required"
},
Corner_Num: {
bsonType: "int",
minimum: 1,
maximum: 4,
description: "'Corner_Num' must be a int between 1 and 4 and is required"
},
timestamp: {
bsonType: "string",
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}$",
description: "'timestamp' must be a string matching iso date pattern and is required"
}
}
}
}]
}, validationAction : "dlq"
}
}

Consideremos una fuente de datos en tiempo real que genera informes meteorológicos detallados desde diversas ubicaciones. En el siguiente ejemplo de canalización de agregación, se incluye una $validate etapa para garantizar que los documentos se ajusten al esquema del conjunto de datos meteorológicos de muestra. La agregación consta de cuatro etapas:

  1. La etapa $source establece una conexión con el broker Apache Kafka que recopila estos informes en un tema llamado my_weatherdata, pasando cada registro a medida que se ingiere a las siguientes etapas de agregación.

  2. La etapa comprueba si un documento tiene valores de matriz para $validate los position.coordinates sections campos y, pasando los documentos que los tienen al resto de la canalización y los documentos que no los tienen a una DLQ.

  3. La etapa $match excluye los documentos que tienen un valor de wind.speed.rate mayor o igual que 30 y se pasa a los documentos con un valor de wind.speed.rate menor que 30.

  4. La fase $merge escribe la salida en una colección Atlas denominada stream en la base de datos sample_weatherstream. Si no existe tal base de datos o colección, Atlas los creará.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$validate': {
validator: {
'$jsonSchema': { properties: { position: [Object], sections: [Object] } }
},
validationAction: 'dlq'
}
},
{ '$match': { 'wind.speed.rate': { '$lt': 30 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.sample resultante, puedes conectarte a tu clúster de Atlas usando mongosh para ejecutar el siguiente comando:

Nota

El siguiente es un ejemplo representativo. Los datos de transmisión no son estáticos y cada usuario ve documentos distintos.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66b25302fe8bbac5f39dbdba'),
airTemperature: { quality: '9', value: 3.5 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 10.9 },
tendency: { code: '3', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '9', value: 1022.5 }
},
callLetters: 'JIVX',
dataSource: '4',
dewPoint: { quality: '9', value: 20.5 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-06T16:44:50.322Z'),
liquidPrecipitation: { condition: '9', depth: 7000, period: 12, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '7' },
period: { quality: '1', value: 3 }
},
position: { coordinates: [ 120.7, -98.2 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 999 },
presentWeatherObservationManual: { condition: '90', quality: '1' },
pressure: { quality: '1', value: 1028.2 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 11.1 },
sections: [ 'UG1', 'MA1', 'GA3', 'KA1', 'UA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 390 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '06' },
lowCloudGenus: { quality: '9', value: '07' },
lowestCloudBaseHeight: { quality: '1', value: 800 },
lowestCloudCoverage: { quality: '9', value: '06' },
midCloudGenus: { quality: '9', value: '07' },
totalCoverage: { opaque: '99', quality: '1', value: '99' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 1200 },
cloudType: { quality: '9', value: '04' },
coverage: { quality: '1', value: '09' }
},
st: 'x+36700+144300',
type: 'FM-13',
visibility: {
distance: { quality: '9', value: 9000 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 9.5, period: 4, quality: '9' }
},
wind: {
direction: { angle: 140, quality: '1' },
speed: { quality: '2', rate: 15.9 },
type: 'N'
}
}

Observe que todos los documentos de esta colección contienen los valores esperados de tipo arraypara position.coordinates y sections. Para ver los documentos que fallaron la validación, dada una fila de letra muerta llamada dlq, ejecuta el siguiente comando:

db.getSiblingDB("sample_weatherstream").dlq.find()
{
_id: ObjectId('66b254d3a045fb1406047394'),
errInfo: { reason: 'Input document found to be invalid in $validate stage' },
doc: {
airTemperature: { quality: '9', value: 7.6 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 0.3 },
tendency: { code: '8', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '9', value: 1015.9 },
stationPressure: { quality: '1', value: 1017 }
},
callLetters: 'WRGL',
dataSource: '4',
dewPoint: { quality: '9', value: 25.3 },
elevation: 9999,
extremeAirTemperature: { code: 'M', period: 99.9, quantity: '1', value: -30.9 },
liquidPrecipitation: { condition: '9', period: 99, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '2' },
period: { quality: '1', value: 6 }
},
position: { coordinates: -100.2, type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 17 },
presentWeatherObservationManual: { condition: '08', quality: '1' },
pressure: { quality: '9', value: 1001 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 10.4 },
sections: [ 'GA2', 'GA1', 'KA1', 'AA1', 'OA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 240 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '02' },
lowCloudGenus: { quality: '9', value: '02' },
lowestCloudBaseHeight: { quality: '1', value: 150 },
lowestCloudCoverage: { quality: '1', value: '03' },
midCloudGenus: { quality: '1', value: '06' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 450 },
cloudType: { quality: '9', value: '03' },
coverage: { quality: '1', value: '07' }
},
st: 'x+20500-074300',
type: 'SAO',
visibility: {
distance: { quality: '9', value: 3800 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 37.5, period: 7, quality: '9' }
},
wind: {
direction: { angle: 230, quality: '1' },
speed: { quality: '1', rate: 46.3 },
type: 'N'
},
ingestionTime: ISODate('2024-08-06T16:52:35.287Z')
},
processorName: 'sampleWeather'
}

Observa que todos los documentos en la fila de letra muerta tienen valores no válidos para position.coordinates, sections o ambos.