Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /
/ / /

$tumblingWindow Etapa (Stream Processing)

La $tumblingWindow La etapa especifica una ventana giratoria para la agregación de datos. Las ventanas de Atlas Stream Processing son con estado, pueden recuperarse si se interrumpen y disponen de mecanismos para procesar datos que llegan tarde. Debes aplicar todas las demás consultas de agregación a tus datos en transmisión dentro de esta etapa de ventanas.

$tumblingWindow

Una etapa de pipeline $tumblingWindow tiene la siguiente forma de prototipo:

{
"$tumblingWindow": {
"boundary": "eventTime" | "processingTime",
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
size: <int>,
unit: "<unit-of-time>"
}
}
}

Alternativamente, una etapa de pipeline $tumblingWindow puede tener los campos allowedLateness y idleTimeout con un valor entero de 0, como se muestra a continuación:

{
"$tumblingWindow": {
"boundary": "eventTime" | "processingTime",
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": 0,
"allowedLateness": 0
}
}

La etapa $tumblingWindow procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

boundary

string

Opcional

Cadena que especifica si los límites de la ventana se determinan por el tiempo del evento o el tiempo de procesamiento. El valor puede ser eventTime processingTimeo. Consulte la temporización del procesamiento de flujos para obtener más información. Si se omite, este campo tiene el valor eventTime predeterminado.

idleTimeout y allowedLateness campos no se pueden establecer cuando boundary se establece en processingTime.

interval

Documento

Requerido

Documento que especifica el intervalo de una ventana de salto como una combinación de un tamaño y una unidad de tiempo donde:

  • El valor de size debe ser un número entero positivo distinto de cero.

  • El valor de unit debe ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por ejemplo, un size de 20 y un unit de second establece que cada ventana permanezca abierta durante 20 segundos.

pipeline

arreglo

Requerido

pipeline de agregación anidada evaluada contra los mensajes dentro de la ventana.

offset

Documento

Opcional

Documento que especifica un desplazamiento de tiempo para los límites de ventana relativos a UTC. El documento es una combinación del campo de tamaño offsetFromUtc y una unidad de tiempo donde:

  • El valor de offsetFromUtc debe ser un número entero positivo distinto de cero.

  • El valor de unit debe ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

Por ejemplo, un offsetFromUtc de 8 y un unit de hour generan límites que se trasladan ocho horas por delante de UTC. Si no especificas un desplazamiento, los límites de la ventana se alinean con UTC.

idleTimeout

Documento

Opcional

Documento que especifica cuánto tiempo esperar antes de cerrar las ventanas si $source está inactivo. Defina esta configuración como una combinación de un size y un unit de tiempo en donde:

  • El valor de size debe ser un número entero positivo distinto de cero.

  • El valor de unit puede ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Si configura idleTimeout, Atlas Stream Processing cierra las ventanas abiertas solo si $source permanece inactiva durante más tiempo que el mayor entre el tiempo restante de la ventana y el tiempo idleTimeout. El temporizador de inactividad se inicia en cuanto $source queda inactivo.

Por ejemplo, considera una ventana de 12:00 pm a 1:00 pm y un tiempo de idleTimeout de 2 horas. Si el último evento es a las 12:02 p. m. después de lo cual $source se queda inactivo, el tiempo restante de la ventana es de 58 minutos. Atlas Stream Processing cierra la ventana después de 2 horas de inactividad a las 2:02 pm, lo que es más largo que el tiempo restante de la ventana y el tiempo idleTimeout. Si el tiempo de idleTimeout está configurado a solo 10 minutos, Atlas Stream Processing cierra la ventana después de 58 minutos de inactividad a las 1:00 p. m., lo cual es más largo que el tiempo de idleTimeout y el tiempo restante de la ventana.

También puede definir esta configuración con un valor entero 0 de. Consulte la definición de la canalización para obtener más información.

allowedLateness

Documento

Opcional

Documento que especifica cuánto tiempo mantener abiertas las ventanas generadas a partir de la fuente para aceptar datos que lleguen tarde después de procesar documentos hasta la hora de finalización de la ventana. Si se omite, por defecto es de 3 segundos.

También puede definir esta configuración con un valor entero 0 de. Consulte la definición de la canalización para obtener más información.

Atlas Stream Processing solo admite una etapa de ventana por canalización.

Cuando aplicas la $group etapa a tu etapa de ventana, una única clave de grupo tiene un límite de 100 megabytes de RAM.

El soporte para ciertas etapas de agregación puede ser limitado o no estar disponible dentro de ventanas. Para obtener más información, consulta Etapas del pipeline de agregación admitidas.

En caso de interrupción del servicio, puedes reanudar el pipeline interno de una ventana desde su estado en el punto de la interrupción. Para obtener más información, consulta Puntos de control.

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:

  1. La etapa $source establece una conexión con Apache Kafka bróker recopilando estos informes en un tema llamado my_weatherdata, exponiendo cada registro conforme se asimila a las etapas de agregación subsiguientes.

  2. La etapa $tumblingWindow define ventanas consecutivas con una duración de 30segundos. Cada ventana ejecuta un(a) interno(a) pipeline, que encuentra el promedio, la mediana, el máximo y el mínimo atmosphericPressureObservation.altimeterSetting.value durante el período de esa ventana. A continuación, el pipeline produce un solo documento con un _id equivalente a la marca de tiempo de inicio de la ventana que representa y los valores especificados para esa ventana.

  3. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$tumblingWindow': {
interval: {
size: 30,
unit: "second"
},
pipeline: [{
$group: {
_id:
{ $meta: "stream.window.start"},
averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" },
medianPressure: {
$median: {
input: "$atmosphericPressureObservation.altimeterSetting.value",
method: "approximate"
}
},
maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" },
minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" }
}
}]
}
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-09-26T16:34:00.000Z'),
averagePressure: 5271.47894736842,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
},
{
_id: ISODate('2024-09-26T16:34:30.000Z'),
averagePressure: 5507.9,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.

Volver

$hoppingWindow

En esta página