Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /
/ / /

$validate Etapa (Stream Processing)

La $validate La etapa verifica que los documentos de transmisión cumplan un esquema de rangos previstos, valores o tipos de datos.

$validate

Una etapa de pipeline $validate tiene la siguiente forma de prototipo:

{
"$validate": {
"validator": { <filter> },
"validationAction" : "discard" | "dlq"
}
}

La etapa $validate procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

validator

Documento

Requerido

Documento de expresiones utilizadas para validar los mensajes entrantes en función de un esquema definido por el usuario. Puedes utilizar todos menos los siguientes operadores del query para definir expresiones de validación:

  • $near

  • $nearSphere

  • $text

  • $where

validationAction

string

Opcional

Especifica la acción a tomar cuando un mensaje viola el esquema definido por el usuario. You can specify one of the following values:

  • discard: Descarta el mensaje. Si no especifica un valor para validationAction, este es el comportamiento predeterminado.

  • dlq:Registra la violación en la colección definida en la configuración de su procesador de flujo y realiza un descarte de máximo esfuerzo sin garantías transaccionales.

Puede usar $validate en cualquier punto de un pipeline después de la etapa $source y antes de la etapa $emit o $merge.

Si especificas las opciones discard o dlq para el campo validationAction, Atlas Stream Processing registra mensajes que no pasan la validación en el siguiente formato:

{
"t": <datetime>,
"s": "<severity-level>",
"c": "streams-<job-name>",
"ctx": "<processed-pipeline>",
"msg": "<message-body>",
"attrs": {
<result-of-logAttributes-evaluation>
},
"tags": <array-of-strings>,
"truncated": {
<truncation-description>
},
"size": <size-of-entry>
}

La siguiente tabla describe los campos de la entrada del registro:

Campo
Tipo
Descripción

attrs

Documento

Documento que contiene los resultados de la evaluación del campo logAttributes en la definición $validate. El resultado es una lista de campos.

c

string

Nombre del trabajo específico de Stream Processing en el que ocurrió la falla.

ctx

string

Nombre de la pipeline de datos en transmisión que se está procesando.

msg

string

Cuerpo del mensaje que no superó la validación.

Atlas Stream Processing solo admite JSON Schema Draft 4 o antes.

El siguiente documento muestra un ejemplo de expresión de validador utilizando $and para realizar una operación lógica Y:

{
$validate: {
validator: {
$and: [{
$expr: {
$ne: [
"$Racer_Name",
"Pace Car"
]
}
},
{
$jsonSchema: {
required: [ "Racer_Num", "Racer_Name", "lap", "Corner_Num", "timestamp" ],
properties: {
Racer_Num: {
bsonType: "int",
description: "'Racer_Num' is the integer number of the race car and is required"
},
Racer_Name: {
bsonType: "string",
description: "'Racer_Name' must be a string and is required"
},
lap: {
bsonType: "int",
minimum: 1,
description: "'lap' must be a int and is required"
},
Corner_Num: {
bsonType: "int",
minimum: 1,
maximum: 4,
description: "'Corner_Num' must be a int between 1 and 4 and is required"
},
timestamp: {
bsonType: "string",
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}$",
description: "'timestamp' must be a string matching iso date pattern and is required"
}
}
}
}]
}, validationAction : "dlq"
}
}

Considere una fuente de datos de streaming que genera informes meteorológicos detallados de diversas ubicaciones. En el siguiente ejemplo de canalización de agregación, se incluye una etapa para garantizar que los documentos se ajusten al esquema de $validate la Conjunto de datos meteorológicos de muestra. La agregación consta de cuatro etapas:

  1. La etapa $source establece una conexión con el broker Apache Kafka que recopila estos informes en un tema llamado my_weatherdata, pasando cada registro a medida que se ingiere a las siguientes etapas de agregación.

  2. La etapa $validate verifica si un documento tiene valores de arreglo en los campos position.coordinates y sections, pasando los documentos que los tengan al resto de la pipeline y enviando los que no a un DLQ.

  3. La etapa excluye los documentos que tienen $match un wind.speed.rate valor mayor o igual a 30 y pasa los documentos con un wind.speed.rate valor menor 30 que.

  4. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$validate': {
validator: {
'$jsonSchema': { properties: { position: [Object], sections: [Object] } }
},
validationAction: 'dlq'
}
},
{ '$match': { 'wind.speed.rate': { '$lt': 30 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.sample resultante, puedes conectarte a tu clúster de Atlas usando mongosh para ejecutar el siguiente comando:

Nota

El siguiente es un ejemplo representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66b25302fe8bbac5f39dbdba'),
airTemperature: { quality: '9', value: 3.5 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 10.9 },
tendency: { code: '3', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '9', value: 1022.5 }
},
callLetters: 'JIVX',
dataSource: '4',
dewPoint: { quality: '9', value: 20.5 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-06T16:44:50.322Z'),
liquidPrecipitation: { condition: '9', depth: 7000, period: 12, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '7' },
period: { quality: '1', value: 3 }
},
position: { coordinates: [ 120.7, -98.2 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 999 },
presentWeatherObservationManual: { condition: '90', quality: '1' },
pressure: { quality: '1', value: 1028.2 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 11.1 },
sections: [ 'UG1', 'MA1', 'GA3', 'KA1', 'UA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 390 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '06' },
lowCloudGenus: { quality: '9', value: '07' },
lowestCloudBaseHeight: { quality: '1', value: 800 },
lowestCloudCoverage: { quality: '9', value: '06' },
midCloudGenus: { quality: '9', value: '07' },
totalCoverage: { opaque: '99', quality: '1', value: '99' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 1200 },
cloudType: { quality: '9', value: '04' },
coverage: { quality: '1', value: '09' }
},
st: 'x+36700+144300',
type: 'FM-13',
visibility: {
distance: { quality: '9', value: 9000 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 9.5, period: 4, quality: '9' }
},
wind: {
direction: { angle: 140, quality: '1' },
speed: { quality: '2', rate: 15.9 },
type: 'N'
}
}

Observe que todos los documentos de esta colección tienen los valores de tipo arrayesperados para position.coordinates y sections. Para ver los documentos que no pasaron la validación, dada una cola de mensajes fallidos llamada dlq, ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").dlq.find()
{
_id: ObjectId('66b254d3a045fb1406047394'),
errInfo: { reason: 'Input document found to be invalid in $validate stage' },
doc: {
airTemperature: { quality: '9', value: 7.6 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 0.3 },
tendency: { code: '8', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '9', value: 1015.9 },
stationPressure: { quality: '1', value: 1017 }
},
callLetters: 'WRGL',
dataSource: '4',
dewPoint: { quality: '9', value: 25.3 },
elevation: 9999,
extremeAirTemperature: { code: 'M', period: 99.9, quantity: '1', value: -30.9 },
liquidPrecipitation: { condition: '9', period: 99, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '2' },
period: { quality: '1', value: 6 }
},
position: { coordinates: -100.2, type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 17 },
presentWeatherObservationManual: { condition: '08', quality: '1' },
pressure: { quality: '9', value: 1001 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 10.4 },
sections: [ 'GA2', 'GA1', 'KA1', 'AA1', 'OA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 240 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '02' },
lowCloudGenus: { quality: '9', value: '02' },
lowestCloudBaseHeight: { quality: '1', value: 150 },
lowestCloudCoverage: { quality: '1', value: '03' },
midCloudGenus: { quality: '1', value: '06' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 450 },
cloudType: { quality: '9', value: '03' },
coverage: { quality: '1', value: '07' }
},
st: 'x+20500-074300',
type: 'SAO',
visibility: {
distance: { quality: '9', value: 3800 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 37.5, period: 7, quality: '9' }
},
wind: {
direction: { angle: 230, quality: '1' },
speed: { quality: '1', rate: 46.3 },
type: 'N'
},
ingestionTime: ISODate('2024-08-06T16:52:35.287Z')
},
processorName: 'sampleWeather'
}

Observe que todos los documentos en la cola de mensajes no entregados tienen valores no válidos para position.coordinates, sections o ambos.

Volver

$source

En esta página