Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /
/ / /

$hoppingWindow Etapa (Stream Processing)

La $hoppingWindow La etapa especifica una ventana desplazable para la agregación de datos. Las ventanas de Atlas Stream Processing son con estado, pueden recuperarse si se interrumpen y disponen de mecanismos para procesar datos que llegan tarde. Debes aplicar todas las demás consultas de agregación a tus datos en transmisión dentro de esta etapa de ventanas.

$hoppingWindow

Una etapa de pipeline $hoppingWindow tiene la siguiente forma de prototipo:

{
"$hoppingWindow": {
"boundary": "eventTime" | "processingTime",
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"hopSize": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
"size": <int>,
"unit": "<unit-of-time>"
},
}
}

La etapa $hoppingWindow procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

boundary

string

Opcional

Cadena que especifica si los límites de la ventana se determinan por el tiempo del evento o el tiempo de procesamiento. El valor puede ser eventTime processingTimeo. Consulte la temporización del procesamiento de flujos para obtener más información. Si se omite, este campo tiene el valor eventTime predeterminado.

idleTimeout y allowedLateness campos no se pueden establecer cuando boundary se establece en processingTime.

interval

Documento

Requerido

Documento que especifica el intervalo de una ventana de salto como una combinación de un tamaño y una unidad de tiempo donde:

  • El valor de size debe ser un número entero positivo distinto de cero.

  • El valor de unit puede ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por ejemplo, un size de 20 y un unit de second establece que cada ventana permanezca abierta durante 20 segundos.

hopSize

Documento

Requerido

Documento que especifica la longitud del salto entre los tiempos de inicio de las ventanas como una combinación de un size y un unit de tiempo donde:

  • El valor de size debe ser un número entero positivo distinto de cero.

  • El valor de unit puede ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por ejemplo, un size de 10 y un unit de second define una ventana de 10segundos entre los horarios de inicio.

pipeline

arreglo

Requerido

pipeline de agregación anidada evaluada contra los mensajes dentro de la ventana.

offset

Documento

Opcional

Documento que especifica un desplazamiento de tiempo para los límites de ventana relativos a UTC. El documento es una combinación del campo de tamaño offsetFromUtc y una unidad de tiempo donde:

  • El valor de offsetFromUtc debe ser un número entero positivo distinto de cero.

  • El valor de unit debe ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

Por ejemplo, un offsetFromUtc de 8 y un unit de hour generan límites que se trasladan ocho horas por delante de UTC. Si no especificas un desplazamiento, los límites de la ventana se alinean con UTC.

idleTimeout

Documento

Opcional

Documento que especifica cuánto tiempo esperar antes de cerrar las ventanas si $source está inactivo. Defina esta configuración como una combinación de un size y un unit de tiempo en donde:

  • El valor de size debe ser un número entero positivo distinto de cero.

  • El valor de unit puede ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Si configura idleTimeout, Atlas Stream Processing cierra las ventanas abiertas solo si $source permanece inactiva durante más tiempo que el mayor entre el tiempo restante de la ventana y el tiempo idleTimeout. El temporizador de inactividad se inicia en cuanto $source queda inactivo.

Por ejemplo, considera una ventana de 12:00 pm a 1:00 pm y un tiempo de idleTimeout de 2 horas. Si el último evento es a las 12:02 p. m. después de lo cual $source se queda inactivo, el tiempo restante de la ventana es de 58 minutos. Atlas Stream Processing cierra la ventana después de 2 horas de inactividad a las 2:02 pm, lo que es más largo que el tiempo restante de la ventana y el tiempo idleTimeout. Si el tiempo de idleTimeout está configurado a solo 10 minutos, Atlas Stream Processing cierra la ventana después de 58 minutos de inactividad a las 1:00 p. m., lo cual es más largo que el tiempo de idleTimeout y el tiempo restante de la ventana.

allowedLateness

Documento

Opcional

Documento que especifica cuánto tiempo mantener abiertas las ventanas generadas a partir de la fuente para aceptar datos que lleguen tarde después de procesar documentos hasta la hora de finalización de la ventana. Si se omite, por defecto es de 3 segundos.

Atlas Stream Processing solo admite una etapa de ventana por canalización.

Cuando aplicas la $group etapa a tu etapa de ventana, una única clave de grupo tiene un límite de 100 megabytes de RAM.

El soporte para ciertas etapas de agregación puede ser limitado o no estar disponible dentro de ventanas. Para obtener más información, consulta Etapas del pipeline de agregación admitidas.

En caso de interrupción del servicio, puedes reanudar el pipeline interno de una ventana desde su estado en el punto de la interrupción. Para obtener más información, consulta Puntos de control.

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:

  1. La etapa $source establece una conexión con Apache Kafka bróker recopilando estos informes en un tema llamado my_weatherdata, exponiendo cada registro conforme se asimila a las etapas de agregación subsiguientes.

  2. La etapa define ventanas de tiempo superpuestas $hoppingWindow de 100 segundos de duración, que comienzan cada 20 segundos. Cada ventana ejecuta un interno pipeline que calcula el liquidPrecipitation.depth promedio, según lo definido en los sample_weatherdata documentos transmitidos desde el agente de Apache Kafka, para la duración de una ventana determinada. El pipeline genera un único documento con un _id equivalente a la marca de tiempo de inicio de la ventana que representa y el correspondiente a averagePrecipitation dicha ventana.

  3. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

pipeline = [
{ $source:
{
"connectionName": "streamsExampleConnectionToKafka",
"topic": "my_weatherdata"
}
},
{ $hoppingWindow:
{
"interval": {
"size": 100,
"unit": "second"
},
"hopSize": {
"size": 20,
"unit": "second"
},
"pipeline" : [
{
$group: {
_id: { $meta: "stream.window.start" },
averagePrecipitation: { $avg: "$liquidPrecipitation.depth" }
}
}
],
}
},
{ $merge:
{
"into":
{
"connectionName":"streamsExampleConnectionToAtlas",
"db":"streamDB",
"coll":"streamCollection"
}
}
}
]

Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-08-28T19:30:20.000Z'),
averagePrecipitation: 2264.3973214285716
},
{
_id: ISODate('2024-08-28T19:30:40.000Z'),
averagePrecipitation: 2285.7061611374406
},
{
_id: ISODate('2024-08-28T19:31:00.000Z'),
averagePrecipitation: 2357.6940154440153
},
{
_id: ISODate('2024-08-28T19:31:20.000Z'),
averagePrecipitation: 2378.374061433447
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.

Volver

$cachedLookup

En esta página