Definición
La etapa $tumblingWindow especifica una ventana de agregación de datos. Las ventanas de procesamiento de flujos de Atlas tienen estado, se pueden recuperar si se interrumpen y cuentan con mecanismos para procesar datos que llegan tarde. Debe aplicar todas las demÔs consultas de agregación a sus datos de flujo dentro de esta etapa de ventana.
$tumblingWindowUna etapa de pipeline
$tumblingWindowtiene la siguiente forma de prototipo:{ "$tumblingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { size: <int>, unit: "<unit-of-time>" } } } Alternativamente, una etapa de pipeline
$tumblingWindowpuede tener los camposallowedLatenessyidleTimeoutcon un valor entero de 0, como se muestra a continuación:{ "$tumblingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": 0, "allowedLateness": 0 } }
Sintaxis
La etapa $tumblingWindow procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Opcional | string que especifica si los lĆmites de la ventana se determinan por el tiempo del evento o por el tiempo de procesamiento. El valor puede ser
|
| Documento | Requerido | Documento que especifica el intervalo de una ventana de salto como una combinación de un tamaño y una unidad de tiempo donde:
Por ejemplo, una |
| arreglo | Requerido | pipeline de agregación anidada evaluada contra los mensajes dentro de la ventana. |
| Documento | Opcional | Documento que especifica un desplazamiento de tiempo para los lĆmites de ventana relativos a UTC. El documento es una combinación del campo de tamaƱo
Por ejemplo, un |
| Documento | Opcional | Documento que especifica cuƔnto tiempo esperar antes de cerrar las ventanas si
Si se establece Por ejemplo, considera una ventana de 12:00 pm a 1:00 pm y un tiempo de Como alternativa, puedes definir este ajuste con un valor entero de 0. Consulta la definición de pipeline para mÔs información. |
| Documento | Opcional | Documento que especifica cuÔnto tiempo mantener abiertas las ventanas generadas a partir de la fuente para aceptar datos que lleguen tarde después de procesar documentos hasta la hora de finalización de la ventana. Si se omite, por defecto es de 3 segundos. Como alternativa, puedes definir este ajuste con un valor entero de 0. Consulta la definición de pipeline para mÔs información. |
Comportamiento
Atlas Stream Processing solo admite una etapa de ventana por pipeline .
Cuando aplicas la $group etapa a tu etapa de ventana, una Ćŗnica clave de grupo tiene un lĆmite de 100 megabytes de RAM.
El soporte para ciertas etapas de agregación puede ser limitado o no estar disponible dentro de ventanas. Para obtener mÔs información, consulta Etapas del pipeline de agregación admitidas.
En caso de interrupción del servicio, puedes reanudar el pipeline interno de una ventana desde su estado en el punto de la interrupción. Para obtener mÔs información, consulta Puntos de control.
Ejemplos
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:
La etapa
$sourceestablece una conexión con el intermediario de Apache Kafka, recolectando estos informes en un tema llamadomy_weatherdatay exponiendo cada registro en cuanto se incorpora a las siguientes etapas de agregación.La etapa define ventanas consecutivas
$tumblingWindowcon 30una duración de segundos. Cada ventana ejecuta unpipelineinterno, que calcula el promedio, la mediana, el mĆ”ximo yatmosphericPressureObservation.altimeterSetting.valueel mĆnimo de la duración de esa ventana. Elpipelinegenera un Ćŗnico documento con un_idequivalente a la marca de tiempo de inicio de la ventana que representa y los valores especificados para dicha ventana.La fase
$mergeescribe la salida en una colección Atlas denominadastreamen la base de datossample_weatherstream. Si no existe tal base de datos o colección, Atlas los crearÔ.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$tumblingWindow': { interval: { size: 30, unit: "second" }, pipeline: [{ $group: { _id: { $meta: "stream.window.start"}, averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" }, medianPressure: { $median: { input: "$atmosphericPressureObservation.altimeterSetting.value", method: "approximate" } }, maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" }, minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" } } }] } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para ver los documentos de la colección sample_weatherstream.stream resultante, conéctate a un clúster de Atlas y ejecuta el siguiente comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-09-26T16:34:00.000Z'), averagePressure: 5271.47894736842, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }, { _id: ISODate('2024-09-26T16:34:30.000Z'), averagePressure: 5507.9, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estÔticos, y cada usuario ve documentos distintos.