Definición
La etapa $hoppingWindow especifica una ventana de salto para la agregación de datos. Las ventanas de procesamiento de flujos de Atlas tienen estado, se pueden recuperar si se interrumpen y cuentan con mecanismos para procesar datos que llegan tarde. Debe aplicar todas las demás consultas de agregación a sus datos de flujo dentro de esta etapa de ventana.
$hoppingWindowUna etapa de pipeline
$hoppingWindowtiene la siguiente forma de prototipo:{ "$hoppingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "hopSize": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { "size": <int>, "unit": "<unit-of-time>" }, } }
Sintaxis
La etapa $hoppingWindow procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Opcional | string que especifica si los límites de la ventana se determinan por el tiempo del evento o por el tiempo de procesamiento. El valor puede ser
|
| Documento | Requerido | Documento que especifica el intervalo de una ventana de salto como una combinación de un tamaño y una unidad de tiempo donde:
Por ejemplo, una |
| Documento | Requerido | Documento que especifica la longitud del salto entre los tiempos de inicio de ventana como una combinación de un
Por ejemplo, un |
| arreglo | Requerido | pipeline de agregación anidada evaluada contra los mensajes dentro de la ventana. |
| Documento | Opcional | Documento que especifica un desplazamiento de tiempo para los límites de ventana relativos a UTC. El documento es una combinación del campo de tamaño
Por ejemplo, un |
| Documento | Opcional | Documento que especifica cuánto tiempo esperar antes de cerrar las ventanas si
Si se establece Por ejemplo, considera una ventana de 12:00 pm a 1:00 pm y un tiempo de |
| Documento | Opcional | Documento que especifica cuánto tiempo mantener abiertas las ventanas generadas a partir de la fuente para aceptar datos que lleguen tarde después de procesar documentos hasta la hora de finalización de la ventana. Si se omite, por defecto es de 3 segundos. |
Comportamiento
Atlas Stream Processing solo admite una etapa de ventana por pipeline .
Cuando aplicas la $group etapa a tu etapa de ventana, una única clave de grupo tiene un límite de 100 megabytes de RAM.
El soporte para ciertas etapas de agregación puede ser limitado o no estar disponible dentro de ventanas. Para obtener más información, consulta Etapas del pipeline de agregación admitidas.
En caso de interrupción del servicio, puedes reanudar el pipeline interno de una ventana desde su estado en el punto de la interrupción. Para obtener más información, consulta Puntos de control.
Ejemplos
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:
La etapa
$sourceestablece una conexión con el intermediario de Apache Kafka, recolectando estos informes en un tema llamadomy_weatherdatay exponiendo cada registro en cuanto se incorpora a las siguientes etapas de agregación.La etapa define ventanas de tiempo superpuestas
$hoppingWindowde 100 segundos de duración, que comienzan cada 20 segundos. Cada ventana ejecuta un internopipelineque calcula elliquidPrecipitation.depthpromedio, según lo definido en lossample_weatherdatadocumentos transmitidos desde el agente de Apache Kafka, para la duración de una ventana determinada. Elpipelinegenera un único documento con un_idequivalente a la marca de tiempo de inicio de la ventana que representa y el correspondiente aaveragePrecipitationdicha ventana.La fase
$mergeescribe la salida en una colección Atlas denominadastreamen la base de datossample_weatherstream. Si no existe tal base de datos o colección, Atlas los creará.
pipeline = [ { $source: { "connectionName": "streamsExampleConnectionToKafka", "topic": "my_weatherdata" } }, { $hoppingWindow: { "interval": { "size": 100, "unit": "second" }, "hopSize": { "size": 20, "unit": "second" }, "pipeline" : [ { $group: { _id: { $meta: "stream.window.start" }, averagePrecipitation: { $avg: "$liquidPrecipitation.depth" } } } ], } }, { $merge: { "into": { "connectionName":"streamsExampleConnectionToAtlas", "db":"streamDB", "coll":"streamCollection" } } } ]
Para ver los documentos de la colección sample_weatherstream.stream resultante, conéctate a un clúster de Atlas y ejecuta el siguiente comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-08-28T19:30:20.000Z'), averagePrecipitation: 2264.3973214285716 }, { _id: ISODate('2024-08-28T19:30:40.000Z'), averagePrecipitation: 2285.7061611374406 }, { _id: ISODate('2024-08-28T19:31:00.000Z'), averagePrecipitation: 2357.6940154440153 }, { _id: ISODate('2024-08-28T19:31:20.000Z'), averagePrecipitation: 2378.374061433447 }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.