Docs Menu
Docs Home
/ /
/ / /

$validate Etapa (procesamiento de flujo)

El $validate La etapa verifica que los documentos de transmisión se ajusten a un esquema de rangos, valores o tipos de datos esperados.

$validate

Una etapa de pipeline $validate tiene la siguiente forma de prototipo:

{
"$validate": {
"validator": { <filter> },
"validationAction" : "discard" | "dlq"
}
}

La etapa $validate procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

validator

Documento

Requerido

Documento de expresiones utilizadas para validar los mensajes entrantes con respecto a un esquema definido por el usuario. Puede usar todos los operadores de consulta, excepto los siguientes, para definir expresiones de validación:

  • $near

  • $nearSphere

  • $text

  • $where

validationAction

string

Opcional

Especifica la acción que se debe tomar cuando un mensaje infringe el esquema definido por el usuario. Puede especificar uno de los siguientes valores:

  • discard: Descarta el mensaje. Si no especifica un valor para validationAction, este es el comportamiento predeterminado.

  • dlq:Registra la violación en la colección definida en la configuración de su procesador de flujo y realiza un descarte de máximo esfuerzo sin garantías transaccionales.

Puede usar $validate en cualquier punto de un pipeline después de la etapa $source y antes de la etapa $emit o $merge.

Si especificas las opciones discard o dlq para el campo validationAction, Atlas Stream Processing registra mensajes que no pasan la validación en el siguiente formato:

{
"t": <datetime>,
"s": "<severity-level>",
"c": "streams-<job-name>",
"ctx": "<processed-pipeline>",
"msg": "<message-body>",
"attrs": {
<result-of-logAttributes-evaluation>
},
"tags": <array-of-strings>,
"truncated": {
<truncation-description>
},
"size": <size-of-entry>
}

La siguiente tabla describe los campos de entrada del registro:

Campo
Tipo
Descripción

attrs

Documento

Documento que contiene los resultados de la evaluación del campo logAttributes en la definición $validate. El resultado es una lista de campos.

c

string

Nombre del trabajo de procesamiento de flujo específico en el que ocurrió la falla.

ctx

string

Nombre de la canalización de datos de transmisión que se está procesando.

msg

string

Cuerpo del mensaje que falló la validación.

Atlas Stream Processing solo admite el borrador de esquema JSON 4 o antes.

El siguiente documento muestra un ejemplo de expresión de validación que utiliza $and para realizar una operación AND lógica:

{
$validate: {
validator: {
$and: [{
$expr: {
$ne: [
"$Racer_Name",
"Pace Car"
]
}
},
{
$jsonSchema: {
required: [ "Racer_Num", "Racer_Name", "lap", "Corner_Num", "timestamp" ],
properties: {
Racer_Num: {
bsonType: "int",
description: "'Racer_Num' is the integer number of the race car and is required"
},
Racer_Name: {
bsonType: "string",
description: "'Racer_Name' must be a string and is required"
},
lap: {
bsonType: "int",
minimum: 1,
description: "'lap' must be a int and is required"
},
Corner_Num: {
bsonType: "int",
minimum: 1,
maximum: 4,
description: "'Corner_Num' must be a int between 1 and 4 and is required"
},
timestamp: {
bsonType: "string",
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}$",
description: "'timestamp' must be a string matching iso date pattern and is required"
}
}
}
}]
}, validationAction : "dlq"
}
}

Considere una fuente de datos de streaming que genera informes meteorológicos detallados de diversas ubicaciones. En el siguiente ejemplo de canalización de agregación, se incluye una etapa para garantizar que los documentos se ajusten al esquema de $validate la Conjunto de datos meteorológicos de muestra. La agregación consta de cuatro etapas:

  1. La etapa establece $source una conexión con el agente Apache Kafka que recopila estos informes en un tema llamado my_weatherdata y pasa cada registro a medida que se ingiere a las etapas de agregación posteriores.

  2. La etapa verifica si un documento tiene valores de matriz para $validate los position.coordinates sections campos y, pasando los documentos que los tienen al resto del pipeline y pasando los documentos que no los tienen a un DLQ.

  3. La etapa excluye los documentos que tienen $match un wind.speed.rate valor mayor o igual a 30 y pasa los documentos con un wind.speed.rate valor menor 30 que.

  4. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$validate': {
validator: {
'$jsonSchema': { properties: { position: [Object], sections: [Object] } }
},
validationAction: 'dlq'
}
},
{ '$match': { 'wind.speed.rate': { '$lt': 30 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.sample resultante, puedes conectarte a tu clúster de Atlas usando mongosh para ejecutar el siguiente comando:

Nota

El siguiente es un ejemplo representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66b25302fe8bbac5f39dbdba'),
airTemperature: { quality: '9', value: 3.5 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 10.9 },
tendency: { code: '3', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '9', value: 1022.5 }
},
callLetters: 'JIVX',
dataSource: '4',
dewPoint: { quality: '9', value: 20.5 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-06T16:44:50.322Z'),
liquidPrecipitation: { condition: '9', depth: 7000, period: 12, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '7' },
period: { quality: '1', value: 3 }
},
position: { coordinates: [ 120.7, -98.2 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 999 },
presentWeatherObservationManual: { condition: '90', quality: '1' },
pressure: { quality: '1', value: 1028.2 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 11.1 },
sections: [ 'UG1', 'MA1', 'GA3', 'KA1', 'UA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 390 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '06' },
lowCloudGenus: { quality: '9', value: '07' },
lowestCloudBaseHeight: { quality: '1', value: 800 },
lowestCloudCoverage: { quality: '9', value: '06' },
midCloudGenus: { quality: '9', value: '07' },
totalCoverage: { opaque: '99', quality: '1', value: '99' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 1200 },
cloudType: { quality: '9', value: '04' },
coverage: { quality: '1', value: '09' }
},
st: 'x+36700+144300',
type: 'FM-13',
visibility: {
distance: { quality: '9', value: 9000 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 9.5, period: 4, quality: '9' }
},
wind: {
direction: { angle: 140, quality: '1' },
speed: { quality: '2', rate: 15.9 },
type: 'N'
}
}

Observe que todos los documentos de esta colección tienen los valores de tipo arrayesperados para position.coordinates y sections. Para ver los documentos que no pasaron la validación, dada una cola de mensajes fallidos llamada dlq, ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").dlq.find()
{
_id: ObjectId('66b254d3a045fb1406047394'),
errInfo: { reason: 'Input document found to be invalid in $validate stage' },
doc: {
airTemperature: { quality: '9', value: 7.6 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 0.3 },
tendency: { code: '8', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '9', value: 1015.9 },
stationPressure: { quality: '1', value: 1017 }
},
callLetters: 'WRGL',
dataSource: '4',
dewPoint: { quality: '9', value: 25.3 },
elevation: 9999,
extremeAirTemperature: { code: 'M', period: 99.9, quantity: '1', value: -30.9 },
liquidPrecipitation: { condition: '9', period: 99, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '2' },
period: { quality: '1', value: 6 }
},
position: { coordinates: -100.2, type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 17 },
presentWeatherObservationManual: { condition: '08', quality: '1' },
pressure: { quality: '9', value: 1001 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 10.4 },
sections: [ 'GA2', 'GA1', 'KA1', 'AA1', 'OA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 240 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '02' },
lowCloudGenus: { quality: '9', value: '02' },
lowestCloudBaseHeight: { quality: '1', value: 150 },
lowestCloudCoverage: { quality: '1', value: '03' },
midCloudGenus: { quality: '1', value: '06' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 450 },
cloudType: { quality: '9', value: '03' },
coverage: { quality: '1', value: '07' }
},
st: 'x+20500-074300',
type: 'SAO',
visibility: {
distance: { quality: '9', value: 3800 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 37.5, period: 7, quality: '9' }
},
wind: {
direction: { angle: 230, quality: '1' },
speed: { quality: '1', rate: 46.3 },
type: 'N'
},
ingestionTime: ISODate('2024-08-06T16:52:35.287Z')
},
processorName: 'sampleWeather'
}

Observe que todos los documentos en la cola de mensajes no entregados tienen valores no válidos para position.coordinates, sections o ambos.

Volver

$fuente

En esta página