Fundamentos de Arquitectura
La abstracción central de Atlas Stream Processing es el procesador de flujo. Un procesador de flujos es un MongoDB Canal de agregación que opera continuamente con datos en streaming desde una fuente específica y escribe la salida en un receptor. Para obtener más información, consulte Estructura de un procesador de flujo.
Stream Processing takes place on Stream Processing workspaces. Cada espacio de trabajo de Stream Processing es un namespace de Atlas que asocia lo siguiente:
Uno o más procesadores de flujo, cada uno ejecutándose en su propia asignación de RAM y CPU.
Un nivel por defecto, que determina la cantidad de memoria y cómputo disponible para cada procesador de flujo cuando no se especifica un nivel.
Un nivel máximo, que determina la mayor cantidad de memoria y procesamiento que puede asignar a un pod dentro de ese espacio de trabajo de procesamiento de flujo.
Un proveedor de nube y una región de nube.
Un registro de conexiones, que almacena la lista de orígenes y destinos disponibles de datos en transmisión.
Un contexto de seguridad en el que definir las autorizaciones de usuario.
Una cadena de conexión al mismo espacio de trabajo de Stream Processing.
Niveles
Al definir un procesador de flujo, este solo está disponible para el espacio de trabajo de procesamiento de flujo donde lo define. Cada procesador de flujo se ejecuta con recursos asignados según su nivel. Atlas Stream Processing factura a los usuarios por un procesador de flujo solo mientras está en ejecución.
Si inicia un procesador de flujo sin declarar el tamaño de un nivel, se ejecutará el nivel del espacio de trabajo de procesamiento de flujo. Puede iniciar un procesador de flujo de cualquier nivel hasta el nivel máximo del espacio de trabajo de procesamiento de flujo, incluido este.
Ejemplo
Define un procesador de flujo en un espacio de trabajo de procesamiento de flujo denominado myWorkspace
con un nivel por defecto de SP10, y un nivel máximo de SP30. Si inicia el procesador sin especificar un nivel, Atlas Stream Processing lo asigna a un pod SP10. Sin embargo, puede declarar cualquier nivel desde SP2 hasta SP30 y Atlas Stream Processing asignará el procesador a un pod de tamaño adecuado.
Workers (herencia)
Cada trabajador puede host hasta cuatro procesadores de flujo en ejecución. Los Espacios de trabajo de Stream Processing que se ejecutan en el modelo de trabajadores heredados facturan a los usuarios según el número de trabajadores. Atlas Stream Processing escala automáticamente tu espacio de trabajo de Stream Processing cuando inicias procesadores de flujos, aprovisionando trabajadores según sea necesario. Puedes desproveer a un trabajador deteniendo todos los procesadores de streaming en él. Atlas Stream Processing siempre prefiere asignar un procesador de flujo a un trabajador existente en lugar de aprovisionar nuevos trabajadores.
Ejemplo
Tiene un espacio de trabajo de procesamiento de flujos que ejecuta ocho procesadores de flujo, denominados proc01 a proc08. Del proc01 al proc04 se ejecutan en un trabajador y del proc05 al proc08 en un segundo trabajador. Inicia un nuevo procesador de flujos denominado proc09. Atlas Stream Processing proporciona un tercer trabajador para alojar proc09.
Más tarde, detiene proc03 en el primer trabajador. Al detener proc09 y reiniciarlo, Atlas Stream Processing reasigna proc09 al primer trabajador y desaprovisiona al tercero.
Si inicia un nuevo procesador de flujo llamado proc10 antes de detener y reiniciar proc09, Atlas Stream Processing asigna proc10 al primer trabajador en el espacio previamente asignado a proc03.
Al escalar, Atlas Stream Processing solo considera la cantidad de procesadores de flujo que se están ejecutando actualmente; no cuenta los procesadores de flujo definidos que no se están ejecutando. El nivel del espacio de trabajo de Stream Processing determina la asignación de RAM y CPU de sus trabajadores.
Importante
SP10 SP30 Los procesadores operan y facturan a los usuarios según el modelo de trabajador heredado. Estos procesadores se actualizan al modelo de precios por procesador el de diciembre 32025del. Para obtener más información, consulte la sección sobre el modelo de trabajador en la descripción general de la arquitectura de Atlas Stream Processing.
Registro de conexões
Los registros de conexión almacenan una o más conexiones. Cada conexión asigna un nombre a la combinación de detalles de red y seguridad que permiten que un procesador de flujos interactúe con servicios externos. Las conexiones presentan el siguiente comportamiento:
Solamente una conexión definida en el registro de conexiones de un espacio de trabajo de Stream Processing específico puede dar servicio a procesadores de flujo alojados en ese espacio de trabajo de Stream Processing.
Cada conexión puede dar servicio a un número arbitrario de procesadores de flujo
Sólo una única conexión puede servir como fuente para un procesador de flujo determinado.
Solo una única conexión puede servir como receptor de un procesador de flujo determinado.
Una conexión no se define intrínsecamente como fuente o como sumidero. Cualquier conexión dada puede servir para cualquiera de las dos funciones según cómo un procesador de flujos invoque esa conexión.
Atlas Stream Processing ejecuta pods de procesamiento de streams en contenedores dedicados de clientes, sobre infraestructura multiinquilino. Para obtener más información sobre la seguridad y el cumplimiento de MongoDB, consulta el MongoDB Centro de Confianza.
Puntos de control
Atlas Stream Processing captura el estado de un procesador de flujo utilizando puntos de control. Cada punto de control tiene una ID única y está sujeto al flujo de la lógica de tu procesador de flujo. Una vez que todos los operadores de un procesador de flujo han añadido su estado a un punto de control, Atlas Stream Processing confirma el punto de control, generando dos tipos de registros:
Un registro de commit único que valida el ID del punto de control y el procesador de flujo al que pertenece.
Un conjunto de registros que describe el estado de cada operación con estado en el respectivo procesador de flujo justo en el momento en que Atlas Stream Processing realiza el compromiso del punto de control.
Cuando vuelves a iniciar un procesador de flujo tras una interrupción, Atlas Stream Processing query el último punto de control confirmado y reanuda la operación desde el estado descrito.
Fuentes de Kafka y compensaciones del grupo de consumidores
Después de que Atlas Stream Processing confirma un punto de control, actualiza los desplazamientos del grupo de consumidores en Kafka de forma asíncrona. Cuando utiliza Apache Kafkacomo $source un, los grupos de consumidores rastrean estos puntos de control como desplazamientos confirmados en el clúster de Kafka para cada partición.
Debido a que estas actualizaciones se producen periódicamente y de forma asíncrona, los desplazamientos confirmados del grupo de consumidores pueden retrasarse temporalmente con respecto al punto de control más reciente. Esto puede provocar retrasos a corto plazo en la monitorización y en las métricas de latencia para las herramientas que leen los desplazamientos confirmados de Kafka, como la herramienta CLI kafka-consumer-group. Estas herramientas pueden mostrar al grupo de consumidores con un desfase respecto a la posición interna real del procesador de flujos.
El retardo entre los puntos de control internos y los desplazamientos confirmados de Kafka no es fijo y puede variar según la carga de trabajo y la configuración. En cargas de trabajo típicas, es del orden de segundos, pero no existe un límite superior estricto.
fila de letra muerta
Atlas Stream Processing admite el uso de una colección de bases de datos Atlas como fila de letra muerta (DLQ). Cuando Atlas Stream Processing no puede procesar un documento de tu flujo de datos, graba el contenido del documento en la DLQ junto con detalles del fallo de procesamiento. Puedes asignar una colección como DLQ en tus definiciones de procesador de flujo.
Para obtener más información, consulta Crear un procesador de flujo.
Estados del procesador de flujo
Los procesadores de flujo transitan entre diferentes estados a lo largo de su ciclo de vida. Comprender estos estados le ayuda a supervisar y solucionar problemas en sus operaciones de procesamiento de flujo.
Los estados posibles incluyen:
Estado | Descripción |
|---|---|
| El procesador de flujo está procesando datos activamente y funcionando con normalidad. Este es el estado operativo deseado, donde el procesador consume datos de las fuentes, aplica transformaciones y los escribe en los destinos. |
| El procesador de flujo se ha detenido manualmente y no está procesando datos. Para obtener información sobre cómo iniciar un procesador de flujo, consulte la sección «Iniciar un procesador de flujo». |
| El procesador de flujos está en proceso de asignar recursos o escalar la infraestructura. Las causas comunes incluyen:
Entre los posibles impactos se incluyen:
Una vez finalizado el aprovisionamiento, los procesos se reanudan automáticamente. No se produce pérdida de datos, ya que Atlas Stream Processing conserva el estado mediante puntos de control. |
| El procesador de flujo ha encontrado un error que impide su ejecución. Esto se debe a errores del usuario o problemas de configuración. Para obtener información detallada sobre las causas de los fallos y la recuperación, consulte la sección «Gestión de errores y políticas de reintento». |
Nota
Es posible que los procesadores de flujo aparezcan intermitentemente en estado de APROVISIONAMIENTO en los paneles de monitorización mientras el sistema asigna recursos. Este comportamiento es normal durante las operaciones de escalado. Si un procesador permanece en estado de APROVISIONAMIENTO durante periodos prolongados (>010 minutos), compruebe los límites de recursos del espacio de trabajo de procesamiento de flujo y la configuración de niveles.
Manejo de errores y políticas de reintentos
Atlas Stream Processing implementa políticas integrales de manejo de errores y reintentos para garantizar un procesamiento de flujo confiable. El sistema distingue entre diferentes tipos de errores y aplica estrategias de reintento adecuadas según la clasificación del error.
Clasificación de errores
Atlas Stream Processing clasifica los errores en dos categorías principales:
Errores del usuario
Errores causados por la configuración del usuario, problemas con los datos o problemas con servicios externos que están fuera del control de Atlas Stream Processing. Algunos ejemplos son:
Credenciales de conexión inválidas
Problemas de conectividad de red con servicios externos
Datos con formato incorrecto que no se pueden procesar
Problemas de permisos al acceder a recursos externos
Errores internos
Errores que se producen dentro del propio sistema Atlas Stream Processing, generalmente debido a problemas temporales de infraestructura o interrupciones del servicio. Se considera que es responsabilidad del servicio Atlas Stream Processing resolverlos.
Comportamiento de reintentos
El comportamiento de reintento varía en función de la clasificación del error:
Política de reintento por error del usuario
Atlas Stream Processing attempts to restart the stream processor a limited number of times over a 5-minute period
Si todos los intentos de reintento fallan dentro de este plazo, el procesador de flujo pasa a un estado
FAILED.Algunos errores del usuario se clasifican como no recuperables y provocan inmediatamente el fallo del procesador. Algunos ejemplos son:
StreamProcessorWorkerOutOfMemory (418)La canalización supera los límites de memoria del nivel.StreamProcessorInvalidOptions (420): Sintaxis o configuración de canalización no válida
Puedes reiniciar manualmente un procesador de flujos fallido llamando al método
start()
Política de reintento por error interno
Atlas Stream Processing reintenta continuamente errores internos sin límite de tiempo
Los errores internos activan alertas al equipo de ingeniería de Atlas Stream Processing
El sistema depende de mecanismos automáticos de reintento para recuperar los procesadores una vez resueltos los problemas internos
Los procesadores de flujo se reanudan automáticamente desde su último punto de control cuando se reinician
Proceso de recuperación
Cuando un procesador de flujos encuentra un error y requiere un reinicio, el proceso de recuperación sigue estos pasos:
Este proceso de recuperación garantiza que los fallos temporales no resultan en la pérdida de datos ni requieren intervención manual en la mayoría de los casos.
Tiempo de Stream Processing
En el procesamiento de datos en transmisión, los documentos están sujetos a dos sistemas de temporización:
hora del evento
tiempo de procesamiento
Atlas Stream Processing ofrece varios parámetros para controlar cómo los procesadores de flujos interactúan con estos sistemas temporales.
Hora del evento
La hora del evento es el momento en que el flujo de origen genera un documento o el sistema de mensajería (p. ej., Apache Kafka) lo recibe. Esto se determina mediante la marca de tiempo del documento.
La latencia de red, el procesamiento ascendente y otros factores no sólo pueden causar discrepancias entre estos tiempos para un documento determinado, sino que también pueden provocar que los documentos lleguen a un procesador de flujo fuera del orden del tiempo de evento. En cualquier caso, las ventanas pueden dejar pasar documentos que usted desea que capturen. Atlas Stream Processing considera tales documentos de llegada tardía y los envía a tu fila de letra muerta si configuras una.
Event Time es una opción configurable para el campo boundary habilitada en Ventanas Tumbling y Ventanas Hopping.
Tiempo de procesamiento
El tiempo de procesamiento es el momento en el que el procesador de flujo consume un documento. Esto se determina por el reloj del sistema que aloja el procesador de flujo.
El tiempo de procesamiento es una opción configurable para el campo boundary admitido en Tumbling Windows y Hopping Windows. Permite crear una pipeline con una especie de ventana que acumula datos según la hora del reloj del servidor. A diferencia de tiempo de evento ventanas, las ventanas de procesamiento asignan a cada evento una marca de tiempo basada en la hora del reloj del servidor cuando llega al procesador del flujo.
Las marcas de tiempo de documentos y las marcas de tiempo de los límites de ventana están en UTC. No puedes especificar las opciones de idleTimeout o allowedLateness al configurar una ventana processingTime.
Ejemplo
Se crea un pipeline con una ventana de tiempo de evento de 5 minutos. Se añade un evento a un clúster origen de Kafka en 09:33. Debido a cierto retraso en el clúster de Kafka, llega al procesador de flujo en 09:37.
Si la canalización tiene una ventana de tiempo de evento de 5 minutos, este evento se asignará a la ventana 09:30-09:35. Si la canalización tiene una ventana de tiempo de procesamiento de 5 minutos, el evento se asignará a la ventana 09:35-09:40.
Marcas de agua
Una marca de agua reemplaza el tiempo de procesamiento y solo se actualiza cuando el procesador consume un documento con un tiempo de evento posterior al de cualquier documento consumido previamente. Todos los procesadores de flujo aplican marcas de agua en Atlas Stream Processing.
Ejemplo
Usted configura un procesador de flujo con ventanas de 5minutos. Inicia el procesador en 12:00, para que las dos primeras ventanas tengan duraciones de 12:00-12:05 y 12:05-12:10. La siguiente tabla ilustra qué ventanas capturarán qué eventos dada las variaciones de los retardos, con y sin marcas de agua.
Hora del evento | Tiempo de procesamiento | Tiempo de ventana (sin marcas de agua) | Tiempo de ventana (marcas de agua) |
|---|---|---|---|
12:00 | 12:00 | 12:00-12:05 | 12:00-12:05 |
12:01 | 12:03 | 12:00-12:05 | 12:00-12:05 |
12:02 | 12:05 | 12:05-12:10 | 12:00-12:05 |
12:01 | 12:06 | 12:05-12:10 | 12:00-12:05 |
12:06 | 12:07 | 12:05-12:10 | 12:05-12:10 |
Sin marcas de agua, la ventana 12:00-12:05 se cierra a las 12:05 según el reloj del sistema del espacio de trabajo de procesamiento de flujos, y la ventana 12:05-12:10 se abre inmediatamente. Por lo tanto, aunque la fuente generó cuatro de los documentos durante el intervalo 12:00-12:05, la ventana correspondiente solo captura dos.
Con las marcas de agua, la ventana 12:00-12:05 no se cierra a las 12:05 porque, entre los documentos que ingiere hasta ese momento, la última hora del evento (y, por lo tanto, el valor de la marca de agua) es 12:03. La ventana 12:00-12:05 no se cierra hasta las 12:07 del reloj del sistema, momento en el que el procesador de flujo ingiere un documento con una hora de evento de 12:05, adelanta la marca de agua a esa hora y abre la ventana 12:05-12:10. Cada ventana captura todos los documentos correspondientes.
Al leer de Apache Kafka, Atlas Stream Processing espera que todas las particiones pasen la marca de agua. Si una partición queda inactiva y no produce eventos con marcas de tiempo posteriores a la watermark, la ventana no se cierra ni produce resultados. Para abordar esto, establece partitionIdleTimeout para asegurar que las particiones inactivas no detengan el avance de las marcas de agua. Para obtener más información, consulta $source Etapa (Procesamiento de flujo).
Atraso permitido
Si las diferencias entre el tiempo del evento y el tiempo de procesamiento varían lo suficiente, los documentos pueden llegar a un procesador de flujo después de que la marca de agua haya avanzado lo suficiente para cerrar la ventana esperada. Para mitigar esto, Atlas Stream Processing admite el Retardo Permitido, una configuración que retrasa el cierre de una ventana por un intervalo determinado en relación con el watermark.
Mientras que las marcas de agua son propiedades de los procesadores de flujo, la Tolerancia permitida es una propiedad de la ventana y solo afecta cuándo se cierra esa ventana. Si la marca de agua del procesador de flujo avanza hasta un punto que desencadenaría la apertura de una nueva ventana, la Tolerancia permitida mantiene abiertas las ventanas anteriores sin impedir este hecho.
Ejemplo
Configura un procesador de flujo con ventanas de tiempo de respuesta de 5minutos. Inicia el procesador a las 12:00, de modo que las dos primeras ventanas tengan duraciones de 12:00-12:05 y 12:05-12:10. Establece un retraso permitido de 2 minutos.
La tabla siguiente refleja el orden en que el procesador de flujo ingiere los documentos descritos.
Hora del evento | Marca de agua | Tiempo de retraso permitido | Tiempo de ventana |
|---|---|---|---|
12:00 | 12:00 | 11:58 | 12:00-12:05 |
12:02 | 12:03 | 12:01 | 12:00-12:05 |
12:01 | 12:04 | 12:02 | 12:00-12:05 |
12:05 | 12:05 | 12:03 | 12:00-12:15, 12:05-12:10 |
12:04 | 12:06 | 12:04 | 12:00-12:05, 12:05-12:10 |
12:07 | 12:07 | 12:05 | 12:05-12:10 |
Cuando la marca de agua avance a 12:05, se abre la ventana 12:05-12:10. Sin embargo, debido a que el intervalo de retraso permitido es de 2 minutos, desde dentro de la ventana 12:00-12:05, es efectivamente solo 12:03, por lo que permanece abierto. Solo cuando la marca de agua avanza a 12:07 la hora ajustada alcanza 12:05. En este punto, la ventana 12:00-12:05 se cerrará.
Tiempo de espera por inactividad
Desvincular el funcionamiento de las ventanas del tiempo de procesamiento por defecto mejora la precisión del procesamiento de la transmisión en la mayoría de los casos. Sin embargo, una fuente de datos de transmisión puede experimentar periodos de inactividad prolongados. En este caso, una ventana puede capturar eventos anteriores al periodo de inactividad y no poder devolver resultados procesados mientras espera a que la marca de agua avance lo suficiente para cerrarse.
Atlas Stream Processing permite a los usuarios configurar un tiempo de espera por inactividad para las ventanas a fin de mitigar estos escenarios utilizando el tiempo de procesamiento. Un tiempo de inactividad es un período que comienza cuando el tiempo de procesamiento supera el final del intervalo de una ventana abierta y la fuente del procesador de flujo está inactiva. Si la fuente permanece inactiva durante un intervalo igual al tiempo de espera de inactividad, la ventana se cierra y la marca de agua avanza independientemente de cualquier ingestión de documentos.
Ejemplo
Configura una ventana por lotes con un intervalo de 3minutos y un tiempo de espera por inactividad de 1minutos. La siguiente tabla ilustra los efectos del tiempo de espera de inactividad durante y después del intervalo de una ventana.
Tiempo de procesamiento | Hora o estado del evento | Marca de agua | Tiempo de ventana |
|---|---|---|---|
12:00 | 12:00 | 12:00 | 12:00-12:03 |
12:01 | Fuente inactiva | 12:00 | 12:00-12:03 |
12:02 | Fuente inactiva | 12:00 | 12:00-12:03 |
12:03 | Fuente inactiva | 12:00 | 12:00-12:03 |
12:04 | 12:02 | 12:02 | 12:00-12:03 |
12:05 | 12:05 | 12:05 | 12:03-12:06 |
12:06 | Fuente inactiva | 12:05 | 12:03-12:06 |
12:07 | Fuente inactiva | 12:00 | 12:06-12:09 |
12:08 | Fuente inactiva | 12:00 | 12:06-12:09 |
12:09 | 12:09 | 12:09 | 12:09-12:12 |
Durante el intervalo 12:00-12:03, la fuente permanece inactiva durante tres minutos, pero el procesador de flujo no cierra la ventana porque el tiempo de procesamiento no ha superado el final del intervalo de la ventana y la fuente no permanece inactiva una vez finalizado. Cuando la marca de agua avanza a 12:05, la ventana se cierra normalmente y se abre la ventana 12:03-12:06.
Cuando la fuente queda inactiva en 12:06, permanece inactiva hasta 12:07, lo que activa el tiempo de espera de inactividad y avanza la marca de agua a 12:06.