Definición
La $tumblingWindow La etapa especifica una
ventana giratoria para la agregación de datos. Las ventanas de Atlas Stream Processing son con estado, pueden recuperarse si se interrumpen y disponen de mecanismos para procesar datos que llegan tarde. Debes aplicar todas las demás consultas de agregación a tus datos en transmisión dentro de esta etapa de ventanas.
$tumblingWindowUna etapa de pipeline
$tumblingWindowtiene la siguiente forma de prototipo:{ "$tumblingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { size: <int>, unit: "<unit-of-time>" } } } Alternativamente, una etapa de pipeline
$tumblingWindowpuede tener los camposallowedLatenessyidleTimeoutcon un valor entero de 0, como se muestra a continuación:{ "$tumblingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": 0, "allowedLateness": 0 } }
Sintaxis
La etapa $tumblingWindow procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Opcional | Cadena que especifica si los límites de la ventana se determinan por el tiempo del evento o el tiempo de procesamiento. El valor puede ser
|
| Documento | Requerido | Documento que especifica el intervalo de una ventana de salto como una combinación de un tamaño y una unidad de tiempo donde:
Por ejemplo, un |
| arreglo | Requerido | pipeline de agregación anidada evaluada contra los mensajes dentro de la ventana. |
| Documento | Opcional | Documento que especifica un desplazamiento de tiempo para los límites de ventana relativos a UTC. El documento es una combinación del campo de tamaño
Por ejemplo, un |
| Documento | Opcional | Documento que especifica cuánto tiempo esperar antes de cerrar las ventanas si
Si configura Por ejemplo, considera una ventana de 12:00 pm a 1:00 pm y un tiempo de También puede definir esta configuración con un valor entero 0 de. Consulte la definición de la canalización para obtener más información. |
| Documento | Opcional | Documento que especifica cuánto tiempo mantener abiertas las ventanas generadas a partir de la fuente para aceptar datos que lleguen tarde después de procesar documentos hasta la hora de finalización de la ventana. Si se omite, por defecto es de 3 segundos. También puede definir esta configuración con un valor entero 0 de. Consulte la definición de la canalización para obtener más información. |
Comportamiento
Atlas Stream Processing solo admite una etapa de ventana por canalización.
Cuando aplicas la $group etapa a tu etapa de ventana, una única clave de grupo tiene un límite de 100 megabytes de RAM.
El soporte para ciertas etapas de agregación puede ser limitado o no estar disponible dentro de ventanas. Para obtener más información, consulta Etapas del pipeline de agregación admitidas.
En caso de interrupción del servicio, puedes reanudar el pipeline interno de una ventana desde su estado en el punto de la interrupción. Para obtener más información, consulta Puntos de control.
Ejemplos
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:
La etapa
$sourceestablece una conexión con Apache Kafka bróker recopilando estos informes en un tema llamadomy_weatherdata, exponiendo cada registro conforme se asimila a las etapas de agregación subsiguientes.La etapa
$tumblingWindowdefine ventanas consecutivas con una duración de 30segundos. Cada ventana ejecuta un(a) interno(a)pipeline, que encuentra el promedio, la mediana, el máximo y el mínimoatmosphericPressureObservation.altimeterSetting.valuedurante el período de esa ventana. A continuación, elpipelineproduce un solo documento con un_idequivalente a la marca de tiempo de inicio de la ventana que representa y los valores especificados para esa ventana.La etapa escribe la salida en una colección de Atlas
$mergellamadastreamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$tumblingWindow': { interval: { size: 30, unit: "second" }, pipeline: [{ $group: { _id: { $meta: "stream.window.start"}, averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" }, medianPressure: { $median: { input: "$atmosphericPressureObservation.altimeterSetting.value", method: "approximate" } }, maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" }, minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" } } }] } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-09-26T16:34:00.000Z'), averagePressure: 5271.47894736842, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }, { _id: ISODate('2024-09-26T16:34:30.000Z'), averagePressure: 5507.9, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.