Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

$hoppingWindow Etapa (Stream Processing)

La etapa $hoppingWindow especifica una ventana de salto para la agregación de datos. Las ventanas de procesamiento de flujos de Atlas tienen estado, se pueden recuperar si se interrumpen y cuentan con mecanismos para procesar datos que llegan tarde. Debe aplicar todas las demás consultas de agregación a sus datos de flujo dentro de esta etapa de ventana.

$hoppingWindow

Una etapa de pipeline $hoppingWindow tiene la siguiente forma de prototipo:

{
"$hoppingWindow": {
"boundary": "eventTime" | "processingTime",
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"hopSize": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
"size": <int>,
"unit": "<unit-of-time>"
},
}
}

La etapa $hoppingWindow procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

boundary

string

Opcional

string que especifica si los límites de la ventana se determinan por el tiempo del evento o por el tiempo de procesamiento. El valor puede ser eventTime o processingTime. Consulta Stream Processing timing para obtener más información. Si se omite, este campo se establece por defecto en eventTime.

idleTimeout y allowedLateness campos no se pueden establecer cuando boundary se establece en processingTime.

interval

Documento

Requerido

Documento que especifica el intervalo de una ventana de salto como una combinación de un tamaño y una unidad de tiempo donde:

  • El valor de size debe ser un número entero positivo distinto de cero.

  • El valor de unit puede ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por ejemplo, una size de 20 y una unit de second hacen que cada ventana permanezca abierta durante 20 segundos.

hopSize

Documento

Requerido

Documento que especifica la longitud del salto entre los tiempos de inicio de ventana como una combinación de un size y un unit de tiempo donde:

  • El valor de size debe ser un número entero positivo distinto de cero.

  • El valor de unit puede ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por ejemplo, un size de 10 y un unit de second define una ventana de 10segundos entre los horarios de inicio.

pipeline

arreglo

Requerido

pipeline de agregación anidada evaluada contra los mensajes dentro de la ventana.

offset

Documento

Opcional

Documento que especifica un desplazamiento de tiempo para los límites de ventana relativos a UTC. El documento es una combinación del campo de tamaño offsetFromUtc y una unidad de tiempo donde:

  • El valor de offsetFromUtc debe ser un número entero positivo distinto de cero.

  • El valor de unit debe ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

Por ejemplo, un offsetFromUtc de 8 y un unit de hour generan límites que se trasladan ocho horas por delante de UTC. Si no especificas un desplazamiento, los límites de la ventana se alinean con UTC.

idleTimeout

Documento

Opcional

Documento que especifica cuánto tiempo esperar antes de cerrar las ventanas si $source está inactivo. Defina esta configuración como una combinación de un size y un unit de tiempo en donde:

  • El valor de size debe ser un número entero positivo distinto de cero.

  • El valor de unit puede ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Si se establece idleTimeout, Atlas Stream Processing cierra las ventanas abiertas solo si $source está inactivo durante más tiempo que el mayor entre el tiempo restante de la ventana o el tiempo idleTimeout. El temporizador de inactividad comienza tan pronto como $source entra en modo inactivo.

Por ejemplo, considera una ventana de 12:00 pm a 1:00 pm y un tiempo de idleTimeout de 2 horas. Si el último evento es a las 12:02 p. m. después de lo cual $source se queda inactivo, el tiempo restante de la ventana es de 58 minutos. Atlas Stream Processing cierra la ventana después de 2 horas de inactividad a las 2:02 pm, lo que es más largo que el tiempo restante de la ventana y el tiempo idleTimeout. Si el tiempo de idleTimeout está configurado a solo 10 minutos, Atlas Stream Processing cierra la ventana después de 58 minutos de inactividad a las 1:00 p. m., lo cual es más largo que el tiempo de idleTimeout y el tiempo restante de la ventana.

allowedLateness

Documento

Opcional

Documento que especifica cuánto tiempo mantener abiertas las ventanas generadas a partir de la fuente para aceptar datos que lleguen tarde después de procesar documentos hasta la hora de finalización de la ventana. Si se omite, por defecto es de 3 segundos.

Atlas Stream Processing solo admite una etapa de ventana por pipeline .

Cuando aplicas la $group etapa a tu etapa de ventana, una única clave de grupo tiene un límite de 100 megabytes de RAM.

El soporte para ciertas etapas de agregación puede ser limitado o no estar disponible dentro de ventanas. Para obtener más información, consulta Etapas del pipeline de agregación admitidas.

En caso de interrupción del servicio, puedes reanudar el pipeline interno de una ventana desde su estado en el punto de la interrupción. Para obtener más información, consulta Puntos de control.

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:

  1. La etapa $source establece una conexión con el intermediario de Apache Kafka, recolectando estos informes en un tema llamado my_weatherdata y exponiendo cada registro en cuanto se incorpora a las siguientes etapas de agregación.

  2. La etapa define ventanas de tiempo superpuestas $hoppingWindow de 100 segundos de duración, que comienzan cada 20 segundos. Cada ventana ejecuta un interno pipeline que calcula el liquidPrecipitation.depth promedio, según lo definido en los sample_weatherdata documentos transmitidos desde el agente de Apache Kafka, para la duración de una ventana determinada. El pipeline genera un único documento con un _id equivalente a la marca de tiempo de inicio de la ventana que representa y el correspondiente a averagePrecipitation dicha ventana.

  3. La fase $merge escribe la salida en una colección Atlas denominada stream en la base de datos sample_weatherstream. Si no existe tal base de datos o colección, Atlas los creará.

pipeline = [
{ $source:
{
"connectionName": "streamsExampleConnectionToKafka",
"topic": "my_weatherdata"
}
},
{ $hoppingWindow:
{
"interval": {
"size": 100,
"unit": "second"
},
"hopSize": {
"size": 20,
"unit": "second"
},
"pipeline" : [
{
$group: {
_id: { $meta: "stream.window.start" },
averagePrecipitation: { $avg: "$liquidPrecipitation.depth" }
}
}
],
}
},
{ $merge:
{
"into":
{
"connectionName":"streamsExampleConnectionToAtlas",
"db":"streamDB",
"coll":"streamCollection"
}
}
}
]

Para ver los documentos de la colección sample_weatherstream.stream resultante, conéctate a un clúster de Atlas y ejecuta el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-08-28T19:30:20.000Z'),
averagePrecipitation: 2264.3973214285716
},
{
_id: ISODate('2024-08-28T19:30:40.000Z'),
averagePrecipitation: 2285.7061611374406
},
{
_id: ISODate('2024-08-28T19:31:00.000Z'),
averagePrecipitation: 2357.6940154440153
},
{
_id: ISODate('2024-08-28T19:31:20.000Z'),
averagePrecipitation: 2378.374061433447
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.