Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

$tumblingWindow Etapa (Stream Processing)

La etapa $tumblingWindow especifica una ventana de agregación de datos. Las ventanas de procesamiento de flujos de Atlas tienen estado, se pueden recuperar si se interrumpen y cuentan con mecanismos para procesar datos que llegan tarde. Debe aplicar todas las demÔs consultas de agregación a sus datos de flujo dentro de esta etapa de ventana.

$tumblingWindow

Una etapa de pipeline $tumblingWindow tiene la siguiente forma de prototipo:

{
"$tumblingWindow": {
"boundary": "eventTime" | "processingTime",
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": {
"size": <int>,
"unit": "<unit-of-time>"
},
"allowedLateness": {
size: <int>,
unit: "<unit-of-time>"
}
}
}

Alternativamente, una etapa de pipeline $tumblingWindow puede tener los campos allowedLateness y idleTimeout con un valor entero de 0, como se muestra a continuación:

{
"$tumblingWindow": {
"boundary": "eventTime" | "processingTime",
"interval": {
"size": <int>,
"unit": "<unit-of-time>"
},
"pipeline" : [
<aggregation-stage-array>
],
"offset": {
"offsetFromUtc": <int>,
"unit": "<unit-of-time>"
},
"idleTimeout": 0,
"allowedLateness": 0
}
}

La etapa $tumblingWindow procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

boundary

string

Opcional

string que especifica si los límites de la ventana se determinan por el tiempo del evento o por el tiempo de procesamiento. El valor puede ser eventTime o processingTime. Consulta Stream Processing timing para obtener mÔs información. Si se omite, este campo se establece por defecto en eventTime.

idleTimeout y allowedLateness campos no se pueden establecer cuando boundary se establece en processingTime.

interval

Documento

Requerido

Documento que especifica el intervalo de una ventana de salto como una combinación de un tamaño y una unidad de tiempo donde:

  • El valor de size debe ser un nĆŗmero entero positivo distinto de cero.

  • El valor de unit debe ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por ejemplo, una size de 20 y una unit de second hacen que cada ventana permanezca abierta durante 20 segundos.

pipeline

arreglo

Requerido

pipeline de agregación anidada evaluada contra los mensajes dentro de la ventana.

offset

Documento

Opcional

Documento que especifica un desplazamiento de tiempo para los límites de ventana relativos a UTC. El documento es una combinación del campo de tamaño offsetFromUtc y una unidad de tiempo donde:

  • El valor de offsetFromUtc debe ser un nĆŗmero entero positivo distinto de cero.

  • El valor de unit debe ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

Por ejemplo, un offsetFromUtc de 8 y un unit de hour generan lĆ­mites que se trasladan ocho horas por delante de UTC. Si no especificas un desplazamiento, los lĆ­mites de la ventana se alinean con UTC.

idleTimeout

Documento

Opcional

Documento que especifica cuÔnto tiempo esperar antes de cerrar las ventanas si $source estÔ inactivo. Defina esta configuración como una combinación de un size y un unit de tiempo en donde:

  • El valor de size debe ser un nĆŗmero entero positivo distinto de cero.

  • El valor de unit puede ser uno de los siguientes:

    • "ms" (milisegundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Si se establece idleTimeout, Atlas Stream Processing cierra las ventanas abiertas solo si $source estƔ inactivo durante mƔs tiempo que el mayor entre el tiempo restante de la ventana o el tiempo idleTimeout. El temporizador de inactividad comienza tan pronto como $source entra en modo inactivo.

Por ejemplo, considera una ventana de 12:00 pm a 1:00 pm y un tiempo de idleTimeout de 2 horas. Si el último evento es a las 12:02 p. m. después de lo cual $source se queda inactivo, el tiempo restante de la ventana es de 58 minutos. Atlas Stream Processing cierra la ventana después de 2 horas de inactividad a las 2:02 pm, lo que es mÔs largo que el tiempo restante de la ventana y el tiempo idleTimeout. Si el tiempo de idleTimeout estÔ configurado a solo 10 minutos, Atlas Stream Processing cierra la ventana después de 58 minutos de inactividad a las 1:00 p. m., lo cual es mÔs largo que el tiempo de idleTimeout y el tiempo restante de la ventana.

Como alternativa, puedes definir este ajuste con un valor entero de 0. Consulta la definición de pipeline para mÔs información.

allowedLateness

Documento

Opcional

Documento que especifica cuÔnto tiempo mantener abiertas las ventanas generadas a partir de la fuente para aceptar datos que lleguen tarde después de procesar documentos hasta la hora de finalización de la ventana. Si se omite, por defecto es de 3 segundos.

Como alternativa, puedes definir este ajuste con un valor entero de 0. Consulta la definición de pipeline para mÔs información.

Atlas Stream Processing solo admite una etapa de ventana por pipeline .

Cuando aplicas la $group etapa a tu etapa de ventana, una Ćŗnica clave de grupo tiene un lĆ­mite de 100 megabytes de RAM.

El soporte para ciertas etapas de agregación puede ser limitado o no estar disponible dentro de ventanas. Para obtener mÔs información, consulta Etapas del pipeline de agregación admitidas.

En caso de interrupción del servicio, puedes reanudar el pipeline interno de una ventana desde su estado en el punto de la interrupción. Para obtener mÔs información, consulta Puntos de control.

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:

  1. La etapa $source establece una conexión con el intermediario de Apache Kafka, recolectando estos informes en un tema llamado my_weatherdata y exponiendo cada registro en cuanto se incorpora a las siguientes etapas de agregación.

  2. La etapa define ventanas consecutivas $tumblingWindow con 30una duración de segundos. Cada ventana ejecuta un pipeline interno, que calcula el promedio, la mediana, el mÔximo y atmosphericPressureObservation.altimeterSetting.value el mínimo de la duración de esa ventana. El pipeline genera un único documento con un _id equivalente a la marca de tiempo de inicio de la ventana que representa y los valores especificados para dicha ventana.

  3. La fase $merge escribe la salida en una colección Atlas denominada stream en la base de datos sample_weatherstream. Si no existe tal base de datos o colección, Atlas los crearÔ.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$tumblingWindow': {
interval: {
size: 30,
unit: "second"
},
pipeline: [{
$group: {
_id:
{ $meta: "stream.window.start"},
averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" },
medianPressure: {
$median: {
input: "$atmosphericPressureObservation.altimeterSetting.value",
method: "approximate"
}
},
maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" },
minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" }
}
}]
}
},
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos de la colección sample_weatherstream.stream resultante, conéctate a un clúster de Atlas y ejecuta el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ISODate('2024-09-26T16:34:00.000Z'),
averagePressure: 5271.47894736842,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
},
{
_id: ISODate('2024-09-26T16:34:30.000Z'),
averagePressure: 5507.9,
maxPressure: 9999.9,
medianPressure: 1015.9,
minPressure: 1015.9
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estÔticos, y cada usuario ve documentos distintos.