Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

Supervisión y alerta de Stream Processing en Atlas.

Atlas Stream Processing proporciona supervisión y alertas para que los usuarios puedan aprovechar perspectivas sobre el rendimiento y el estado para optimizar sus flujos de trabajo.

Para cada uno de tus espacios de trabajo de Stream Processing, puedes supervisar tus procesadores de flujos en la Interfaz de Usuario de Atlas:

1
  1. Si aún no aparece, se debe seleccionar la organización que contiene el proyecto en el menú Organizations de la barra de navegación.

  2. Si aún no se muestra, seleccione su proyecto en el menú Projects de la barra de navegación.

  3. En la barra lateral, haz clic en Stream Processing en la sección Streaming Data.

Se muestra la página Stream Processing.

2

En el panel del área de trabajo de Stream Processing que desea supervisar, haz clic en Manage.

3

La pestaña Monitoring muestra una variedad de estadísticas de ejecución sobre el procesador de flujo que elijas, incluyendo, pero no limitado a:

  • Número de mensajes ingeridos

  • Número de mensajes procesados con éxito

  • Número de mensajes enviados a su fila de letra muerta

Si tu conexión de origen es Apache Kafka, puedes supervisar el retraso entre el desplazamiento actual y el último desplazamiento en el broker para la partición de un tema y la suma de todos los retrasos de partición.

4

Puedes filtrar las gráficas por nombre del procesador de flujo, rango de tiempo y granularidad.

Atlas Stream Processing proporciona los siguientes métodos para reportes on-demand sobre tus procesadores de transmisión:

El método sp.processor.sample() te permite ver una pequeña muestra de los documentos que produce un procesador de flujos que esté en ejecución. Los usuarios pueden comparar los resultados muestreados con sus resultados esperados para diagnosticar cualquier error en el diseño de su pipeline de agregación.

El método sp.processor.stats() devuelve estadísticas de tiempo de ejecución sobre un procesador de flujo de su elección.

El documento de salida tiene los siguientes campos:

Campo
Tipo
Descripción

ns

string

El namespace en el que se define el procesador de flujos.

stats

Objeto

Un documento que describe el estado operativo del procesador de flujo.

stats.name

string

El nombre del procesador de flujo.

stats.status

string

El estado del procesador de flujo. Este campo puede tener los siguientes valores:

  • created

  • running

  • error

  • stopping

stats.scaleFactor

entero

La escala en la que se muestra el campo de tamaños. Si se establece en 1, los tamaños se muestran en bytes. Si se establece en 1024, los tamaños se muestran en kilobytes.

stats.inputMessageCount

entero

La cantidad de documentos publicados en el stream. Un documento se considera 'publicado' en el flujo sólo una vez que pasa por la etapa $source, no cuando pasa por toda la pipeline.

stats.inputMessageSize

entero

La cantidad de bytes o kilobytes publicados en el flujo. Los bytes se consideran 'publicados' en el flujo una vez que pasan por la etapa $source, no cuando pasan por toda la pipeline.

stats.outputMessageCount

entero

El número de documentos procesados por el flujo. Se considera que un documento ha sido 'procesado' por el flujo una vez que pasa por toda la pipeline.

stats.outputMessageSize

entero

La cantidad de bytes o kilobytes procesados por el flujo. Los bytes se consideran "procesados" por el flujo una vez que pasan por todo el pipeline.

stats.dlqMessageCount

entero

El número de documentos enviados a la fila de letra muerta.

stats.dlqMessageSize

entero

El número de bytes o kilobytes enviados a la fila de letra muerta.

stats.changeStreamTimeDifferenceSecs

entero

La diferencia, en segundos, entre la hora del evento representada por el flujo de cambios más reciente token de reanudación y el evento más reciente en el oplog.

stats.changeStreamState

token

La flujo de cambios más reciente token de reanudación. Solo aplica a los procesadores de flujo con una fuente de flujo de cambios.

stats.latency

Documento

Estadísticas de latencia para el procesador de flujo en su conjunto. Atlas Stream Processing devuelve este campo solo si se pasa la opción verbose.

stats.latency.p50

entero

La latencia estimada del percentil 50de todos los documentos procesados en los últimos 30 segundos. Si tu pipeline incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana.

Por ejemplo, si su etapa $tumblingWindow tiene un intervalo de 5 minutos, las mediciones de latencia incluyen esos 5 minutos.

stats.latency.p99

entero

La latencia estimada del percentil 99de todos los documentos procesados en los últimos 30 segundos. Si tu pipeline incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana.

Por ejemplo, si su etapa $tumblingWindow tiene un intervalo de 5 minutos, las mediciones de latencia incluyen esos 5 minutos.

stats.latency.start

datetime

Tiempo de pared en el que comenzó la ventana de medición de 30 segundos más reciente.

stats.latency.end

datetime

Momento en que finalizó la ventana de medición más reciente de 30 segundos.

stats.latency.unit

string

Unidad de tiempo en la que se cuenta la latencia. Este valor es siempre microseconds.

stats.latency.count

entero

Número de documentos que el procesador de flujos ha procesado en la ventana de medición más reciente de 30 segundos.

stats.latency.sum

entero

Suma de todas las mediciones de latencia individuales, en microsegundos, tomadas en la ventana de medición de 30 segundos más reciente.

stats.stateSize

entero

La cantidad de bytes que utiliza Windows para almacenar el estado del procesador.

stats.watermark

fecha

La marca de tiempo de la marca de agua actual.

stats.operatorStats

arreglo

Las estadísticas para cada operador en el pipeline del procesador. Atlas Stream Processing devuelve este campo sólo si se pasa la opción verbose.

stats.operatorStats proporciona versiones por operador de muchos campos principales de stats:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.latency

  • stats.operatorStats.stateSize

stats.operatorStats incluye los siguientes campos únicos:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTimeMillis

stats.operatorStats También incluye los siguientes campos cuando se pasa la opción verbose y el procesador incluye una etapa de ventana:

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats también puede incluir el siguiente campo para ciertos operadores de fuente y sumidero:

  • stats.operatorStats.targetStats

stats.operatorStats.maxMemoryUsage

entero

El uso máximo de memoria del operador en bytes o kilobytes.

stats.operatorStats.executionTimeMillis

entero

El tiempo total de ejecución del operador en milisegundos.

stats.minOpenWindowStartTime

fecha

La hora de inicio de la ventana mínima abierta. Este valor es opcional.

stats.maxOpenWindowStartTime

fecha

La hora de inicio de la ventana máxima de apertura. Este valor es opcional.

stats.operatorStats.targetStats

arreglo

Las estadísticas por objetivo para ciertos operadores fuente y sumideros.

Cada elemento de este arreglo es un documento que representa un único target de entrada o salida, como un input o output colección o un Apache Kafka tema. Según el operador, cada documento contiene un subconjunto de los siguientes campos:

Para operadores fuente, como Apache Kafka $source o fuentes de flujo de cambios:

Ya sea db y coll para destinos MongoDB, o topic para Apache Kafka destinos.

  • inputMessageCount

  • inputMessageSize

Para operadores de sumidero, como MongoDB $merge o $emit sumideros:

Ya sea db y coll para destinos MongoDB, o topic para Apache Kafka destinos.

  • outputMessageCount

  • outputMessageSize

Atlas Stream Processing recopila estadísticas por objetivo sólo para algunas etapas de agregación de flujos, como Apache Kafka $source y MongoDB $merge.

Registra estadísticas por objetivo para un máximo de 100 objetivos diferentes; después de eso, el procesador de flujo deja de añadir nuevas entradas a targetStats pero sigue actualizando las estadísticas agregadas por operador.

stats.kafkaPartitions

arreglo

Información sobre el offset de las particiones de un Apache Kafka broker (nodo de servidor). kafkaPartitions se aplica solo a las conexiones que utilizan una fuente de Apache Kafka.

stats.kafkaPartitions.partition

entero

El número de partición de temas en Apache Kafka.

stats.kafkaPartitions.currentOffset

entero

El desplazamiento en el que se encuentra el procesador de flujo para la partición especificada. Este valor es igual al desplazamiento anterior que el procesador de flujo procesó, más 1.

stats.kafkaPartitions.checkpointOffset

entero

El desplazamiento que el procesador de flujos comprometió por última vez con el Apache Kafka broker y el punto de control para la partición especificada. Todos los mensajes a través de este desplazamiento se registran en el último punto de control.

stats.kafkaPartitions.isIdle

booleano

La bandera que indica si la partición está inactiva. Este valor por defecto es false.

Atlas Stream Processing ofrece dos tipos de registros de actividades del área de trabajo para Stream Processing:

  • Registros de operaciones, que rastrean principalmente el comportamiento de
    procesadores de flujo individuales.
  • Registros de auditoría, que principalmente rastrean la autenticación y la seguridad
    actividad a nivel del espacio de trabajo de Stream Processing.

Para descargar los registros operativos o de auditoría de Atlas Stream Processing:

1
  1. Si aún no aparece, se debe seleccionar la organización que contiene el proyecto en el menú Organizations de la barra de navegación.

  2. Si aún no se muestra, seleccione su proyecto en el menú Projects de la barra de navegación.

  3. En la barra lateral, haz clic en Stream Processing en la sección Streaming Data.

Se muestra la página Stream Processing.

2

Navega al panel del área de trabajo de Stream Processing desde el cual deseas descargar los registros y haz clic en el icono de tres puntos suspensivos.

3
4

En la ventana modal, selecciona el tipo de registro que deseas descargar.

5

En el campo Stream processor, introduce el nombre del procesador de flujo para el que deseas descargar registros. Deje este campo en blanco para descargar los registros de todos los procesadores de flujo.

6

En el menú desplegable Time Period, selecciona el intervalo para el que deseas descargar los registros.

7

Nota

Los registros pueden tardar hasta cinco minutos después de su creación en estar disponibles para consulta.

Atlas Stream Processing admite integraciones de métricas de terceros, lo que permite a los usuarios registrar y analizar el rendimiento del procesador de flujos sin tener que desarrollar lógica o vistas personalizadas.

Datadog realiza un seguimiento de las métricas de Atlas Stream Processing con el mongodb.atlas.stream_processing prefijo. Para configurar la integración y ver las métricas disponibles, consulte Integrar con Datadog.

Para enviar métricas a un recolector de OpenTelemetry (OTel), consulte Integración con OpenTelemetry (OTel).

Atlas Stream Processing activa alertas cuando los procesadores cambian de estado o cuando un procesador alcanza diversos umbrales de ingestión o salida. Para ver una lista de alertas disponibles de Atlas Stream Processing, consulta Atlas Stream Processing Alerts. Para obtener más información sobre la configuración de alertas, consulte Configurar ajustes de alerta.

Puede dirigir las alertas de Atlas Stream Processing de las siguientes maneras:

  • Todos los procesadores de flujo dentro de un proyecto

  • Todos los procesadores de flujos dentro de un espacio de trabajo de Stream Processing que coinciden con el predicado configurado

  • Todos los procesadores de flujo cuyos nombres coinciden con el predicado configurado

Para objetivos distintos a todos los procesadores de flujo, puedes configurar varios objetivos para la misma alerta.

Atlas Stream Processing configura por defecto una alerta Strean Processor State is failed. Dado que se trata de una alerta a nivel de proyecto, se aplica a cualquier procesador de flujos que esté en ejecución en cualquier espacio de trabajo de procesamiento de flujos dentro del proyecto para el que esté configurada.