Definición
El $hoppingWindow La etapa especifica una
Ventana desalto para la agregación de datos. Las ventanas de procesamiento de flujos de Atlas tienen estado, se pueden recuperar en caso de interrupción y cuentan con mecanismos para procesar datos que llegan tarde. Debe aplicar todas las demás consultas de agregación a sus datos de flujo dentro de esta etapa de la ventana.
$hoppingWindowUna etapa de pipeline
$hoppingWindowtiene la siguiente forma de prototipo:{ "$hoppingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "hopSize": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { "size": <int>, "unit": "<unit-of-time>" }, } }
Sintaxis
La etapa $hoppingWindow procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Opcional | Cadena que especifica si los límites de la ventana se determinan por el tiempo del evento o el tiempo de procesamiento. El valor puede ser
|
| Documento | Requerido | Documento que especifica el intervalo de una ventana de salto como una combinación de un tamaño y una unidad de tiempo donde:
Por ejemplo, un |
| Documento | Requerido | Documento que especifica la longitud del salto entre los tiempos de inicio de las ventanas como una combinación de un
Por ejemplo, un |
| arreglo | Requerido | Canal de agregación anidada evaluado contra los mensajes dentro de la ventana. |
| Documento | Opcional | Documento que especifica un desplazamiento de tiempo para los límites de ventana relativos a UTC. El documento es una combinación del campo de tamaño
Por ejemplo, un |
| Documento | Opcional | Documento que especifica cuánto tiempo debe esperarse antes de cerrar las ventanas si
Si configura Por ejemplo, considere una ventana de 12:00 p. m. a 1:00 p. m. y un tiempo |
| Documento | Opcional | Documento que especifica cuánto tiempo se mantienen abiertas las ventanas generadas desde el origen para aceptar datos que llegan tarde después de procesar los documentos para la hora de finalización de la ventana. Si se omite, el valor predeterminado es 3 segundos. |
Comportamiento
Atlas Stream Processing solo admite una etapa de ventana por canalización.
Cuando aplica la etapa a su $group etapa de ventana, una sola clave de grupo tiene un límite de 100 megabytes de RAM.
La compatibilidad con ciertas etapas de agregación podría ser limitada o no estar disponible en Windows. Para obtener más información, consulte Etapas de canalización de agregación compatibles.
En caso de interrupción del servicio, puede reanudar el flujo de trabajo interno de una ventana desde el estado en que se encontraba en el punto de interrupción. Para obtener más información, consulte Puntos de control.
Ejemplos
Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación consta de tres etapas:
La etapa establece una conexión
$sourcecon Apache KafkaEl agente recopila estos informes en un tema llamadomy_weatherdatay expone cada registro a medida que se ingiere a las etapas de agregación posteriores.La etapa define ventanas de tiempo superpuestas
$hoppingWindowde 100 segundos de duración, que comienzan cada 20 segundos. Cada ventana ejecuta un internopipelineque calcula elliquidPrecipitation.depthpromedio, según lo definido en lossample_weatherdatadocumentos transmitidos desde el agente de Apache Kafka, para la duración de una ventana determinada. Elpipelinegenera un único documento con un_idequivalente a la marca de tiempo de inicio de la ventana que representa y el correspondiente aaveragePrecipitationdicha ventana.La etapa escribe la salida en una colección de Atlas
$mergellamadastreamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
pipeline = [ { $source: { "connectionName": "streamsExampleConnectionToKafka", "topic": "my_weatherdata" } }, { $hoppingWindow: { "interval": { "size": 100, "unit": "second" }, "hopSize": { "size": 20, "unit": "second" }, "pipeline" : [ { $group: { _id: { $meta: "stream.window.start" }, averagePrecipitation: { $avg: "$liquidPrecipitation.depth" } } } ], } }, { $merge: { "into": { "connectionName":"streamsExampleConnectionToAtlas", "db":"streamDB", "coll":"streamCollection" } } } ]
Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-08-28T19:30:20.000Z'), averagePrecipitation: 2264.3973214285716 }, { _id: ISODate('2024-08-28T19:30:40.000Z'), averagePrecipitation: 2285.7061611374406 }, { _id: ISODate('2024-08-28T19:31:00.000Z'), averagePrecipitation: 2357.6940154440153 }, { _id: ISODate('2024-08-28T19:31:20.000Z'), averagePrecipitation: 2378.374061433447 }
Nota
El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.