Docs Menu
Docs Home
/ /
/ / /

$tumblingWindow Etapa (procesamiento de flujo)

El $tumblingWindow La etapa especifica una Ventana de agregación de datos. Las ventanas de procesamiento de flujos de Atlas tienen estado, se pueden recuperar en caso de interrupción y cuentan con mecanismos para procesar datos que llegan tarde. Debe aplicar todas las demás consultas de agregación a sus datos de flujo dentro de esta etapa de la ventana.

$tumblingWindow

Una etapa de pipeline $tumblingWindow tiene la siguiente forma de prototipo:

{
"$tumblingWindow": {
"boundary": "eventTime" | "processingTime",
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
size: <int>,
unit: "<unit-of-time>"
}
}
}

Como alternativa, una etapa de canalización $tumblingWindow puede tener campos allowedLateness y idleTimeout con un valor entero de 0, como se muestra a continuación:

{
"$tumblingWindow": {
"boundary": "eventTime" | "processingTime",
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": 0,
"allowedLateness": 0
}
}

La etapa $tumblingWindow procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

boundary

string

Opcional

Cadena que especifica si los límites de la ventana se determinan por el tiempo del evento o el tiempo de procesamiento. El valor puede ser eventTime processingTimeo. Consulte la temporización del procesamiento de flujos para obtener más información. Si se omite, este campo tiene el valor eventTime predeterminado.

idleTimeout y los campos allowedLateness no se pueden configurar cuando boundary se configura en processingTime.

interval

Documento

Requerido

Documento que especifica el intervalo de una ventana de salto como una combinación de un tamaño y una unidad de tiempo donde:

  • El valor de size debe ser un entero positivo distinto de cero.

  • El valor de unit debe ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por ejemplo, un size de 20 y un unit de second establece que cada ventana permanezca abierta durante 20 segundos.

pipeline

arreglo

Requerido

Canal de agregación anidada evaluado contra los mensajes dentro de la ventana.

offset

Documento

Opcional

Documento que especifica un desplazamiento de tiempo para los límites de ventana relativos a UTC. El documento es una combinación del campo de tamaño offsetFromUtc y una unidad de tiempo donde:

  • El valor de offsetFromUtc debe ser un entero positivo distinto de cero.

  • El valor de unit debe ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

Por ejemplo, un offsetFromUtc de 8 y un unit de hour generan límites con un desfase de ocho horas respecto a la hora UTC. Si no se especifica un desfase, los límites de la ventana se alinean con la hora UTC.

idleTimeout

Documento

Opcional

Documento que especifica cuánto tiempo debe esperarse antes de cerrar las ventanas si $source está inactivo. Defina esta configuración como una combinación de size y unit, donde:

  • El valor de size debe ser un entero positivo distinto de cero.

  • El valor de unit puede ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Si configura idleTimeout, Atlas Stream Processing cierra las ventanas abiertas solo si $source permanece inactiva durante más tiempo que el mayor entre el tiempo restante de la ventana y el tiempo idleTimeout. El temporizador de inactividad se inicia en cuanto $source queda inactivo.

Por ejemplo, considere una ventana de 12:00 p. m. a 1:00 p. m. y un tiempo idleTimeout de 2 horas. Si el último evento ocurre a las 12:02 p. m., tras lo cual $source queda inactivo, el tiempo restante de la ventana es de 58 minutos. Atlas Stream Processing cierra la ventana después de 2 horas de inactividad a las 2:02 p. m., lo cual es mayor que el tiempo restante de la ventana y la hora idleTimeout. Si la hora idleTimeout se establece en solo 10 minutos, Atlas Stream Processing cierra la ventana después de 58 minutos de inactividad a las 1:00 p. m., lo cual es mayor que la hora idleTimeout y el tiempo restante de la ventana.

También puede definir esta configuración con un valor entero 0 de. Consulte la definición de la canalización para obtener más información.

allowedLateness

Documento

Opcional

Documento que especifica cuánto tiempo se mantienen abiertas las ventanas generadas desde el origen para aceptar datos que llegan tarde después de procesar los documentos para la hora de finalización de la ventana. Si se omite, el valor predeterminado es 3 segundos.

También puede definir esta configuración con un valor entero 0 de. Consulte la definición de la canalización para obtener más información.

Atlas Stream Processing solo admite una etapa de ventana por canalización.

Cuando aplica la etapa a su $group etapa de ventana, una sola clave de grupo tiene un límite de 100 megabytes de RAM.

La compatibilidad con ciertas etapas de agregación podría ser limitada o no estar disponible en Windows. Para obtener más información, consulte Etapas de canalización de agregación compatibles.

En caso de interrupción del servicio, puede reanudar el flujo de trabajo interno de una ventana desde el estado en que se encontraba en el punto de interrupción. Para obtener más información, consulte Puntos de control.

Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación consta de tres etapas:

  1. La etapa establece una conexión $source con Apache KafkaEl agente recopila estos informes en un tema llamado my_weatherdata y expone cada registro a medida que se ingiere a las etapas de agregación posteriores.

  2. La etapa define ventanas consecutivas $tumblingWindow con 30una duración de segundos. Cada ventana ejecuta un pipeline interno, que calcula el promedio, la mediana, el máximo y atmosphericPressureObservation.altimeterSetting.value el mínimo de la duración de esa ventana. El pipeline genera un único documento con un _id equivalente a la marca de tiempo de inicio de la ventana que representa y los valores especificados para dicha ventana.

  3. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$tumblingWindow': {
interval: {
size: 30,
unit: "second"
},
pipeline: [{
$group: {
_id:
{ $meta: "stream.window.start"},
averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" },
medianPressure: {
$median: {
input: "$atmosphericPressureObservation.altimeterSetting.value",
method: "approximate"
}
},
maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" },
minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" }
}
}]
}
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-09-26T16:34:00.000Z'),
averagePressure: 5271.47894736842,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
},
{
_id: ISODate('2024-09-26T16:34:30.000Z'),
averagePressure: 5507.9,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
}

Nota

El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.

Volver

$hoppingWindow

En esta página