Un procesador de flujo de Atlas Stream Processing aplica la lógica de un pipeline de agregación de flujo con un nombre único a los datos de transmisión. Atlas Stream Processing guarda cada definición de procesador de flujo en almacenamiento persistente para que pueda reutilizarse. Solo puede usar un procesador de flujo determinado en el espacio de trabajo de Stream Processing en el que está almacenada su definición.
Requisitos previos
Para crear y gestionar un procesador de flujo, debes tener:
Un usuario de base de datos con el rol
atlasAdminpara crear y ejecutar procesadores de flujoUn clúster Atlas
Considerations
Muchos de los comandos del procesador de flujos requieren que se especifique el nombre del procesador de flujos relevante en la invocación del método. La sintaxis descrita en las siguientes secciones supone nombres estrictamente alfanuméricos. Si el nombre de su procesador de flujo incluye caracteres no alfanuméricos, como guiones (-) o puntos (puntos) (.), debe encerrar el nombre entre corchetes ([]) y comillas double ("") en la invocación del método, como en sp.["special-name-stream"].stats().
Crear un procesador de flujos de datos de manera interactiva
Puedes crear un procesador de flujo de forma interactiva con el método sp.process() en mongosh. Los procesadores de flujo que creas de forma interactiva muestran el siguiente comportamiento:
Guarde la salida y los documentos de la fila de letra muerta en el shell
Empieza a ejecutarse inmediatamente al crearse
Se ejecutan durante 10 minutos o hasta que el usuario los detenga
No persista después de detenerse
Los procesadores de flujo que creas de forma interactiva sirven para prototipar. Para crear un procesador de flujo persistente, consulte Crear un procesador de flujo.
sp.process() tiene la siguiente sintaxis:
sp.process(<pipeline>)
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| arreglo | Requerido | Pipeline de agregación de streams que deseas aplicar a tus datos de transmisión. |
Para crear un procesador de flujo de forma interactiva:
Conéctate a tu espacio de trabajo de stream processing.
Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando mongosh.
Ejemplo
El siguiente comando se conecta a un espacio de trabajo de procesamiento de flujos como un usuario llamado streamOwner usando la autenticación x.059:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Proporciona tu contraseña de usuario cuando se solicite.
Define una pipeline.
En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.
El siguiente ejemplo utiliza el tema stuff en la conexión myKafka en el registro de conexiones como el $source, coincide con los registros donde el campo temperature tiene un valor de 46 y emite los mensajes procesados al tema output de la conexión mySink en el registro de conexiones:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
Crea un Procesador de Streams
Para crear un procesador de flujo que persista hasta que se elimine:
La API de administración de Atlas proporciona un endpoint para crear un procesador de flujos.
Para crear un procesador de flujos en la interfaz de usuario de Atlas, ve a la página Stream Processing de tu Proyecto de Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing.
Se puede elegir entre utilizar el generador visual o el editor JSON para configurar el procesador de streams:
Haz clic Create with visual builderen.
Si existen procesadores de stream processing en tu espacio de trabajo de stream processing, haz clic en el botón + Create stream processor y luego selecciona Visual Builder de las opciones desplegables.
El Constructor Visual se abre con un formulario donde puedes configurar tu procesador de flujo.
Añade una conexión de fuente.
En el campo Source, selecciona una conexión del menú desplegable Connection para usar como fuente de tu procesador de flujos.
Esto abre un cuadro de texto JSON donde puedes configurar la etapa source para el procesador de flujo. Para aprender más sobre la sintaxis de la etapa source, consulta $source.
Ejemplo
La siguiente etapa source opera en datos en tiempo real de la conexión sample_stream_solar preconfigurada:
{ "$source": { "connectionName": "sample_stream_solar" } }
Agregue etapas de agregación al pipeline del procesador de flujo.
En el panel Start building your pipeline, haga clic en el botón para la etapa de agregación que desea agregar a su pipeline. Esto abre un cuadro de texto donde puede configurar la etapa de agregación seleccionada en formato JSON.
Si la etapa de agregación no está en la lista, haz clic en + Custom stage para definir una etapa de agregación admitida en formato JSON. Para aprender más sobre las etapas de agregación de Stream Processing y su sintaxis, consulta Etapas del pipeline de agregación.
Ejemplo
La siguiente etapa $match coincide con todos los documentos en el flujo preconfigurado sample_stream_solar donde el campo obs.watts es mayor que 300:
{ "$match": { "obs.watts": { "$gt": 300 } } }
(Opcional) Configura etapas de agregación adicionales.
Para añadir etapas de agregación adicionales a tu pipeline, haz clic en el botón + Add stage below debajo de la última etapa de tu pipeline y selecciona la etapa de agregación que deseas añadir o haz clic en Custom stage para definir una etapa de agregación compatible diferente. Esto abre un cuadro de texto donde puedes configurar la nueva etapa en formato JSON.
Agregar una conexión de sumidero.
En el campo Sink, selecciona una conexión de destino de la lista desplegable Connection.
En el campo Sink, selecciona una conexión de la lista desplegable Connection a la que se guarden tus datos procesados.
Esto abre un cuadro de texto JSON donde se puede configurar la etapa merge de su procesador de flujo. Para obtener más información sobre la sintaxis de etapa de merge, consulte $merge.
Ejemplo
La siguiente etapa sink de guardar datos procesados en la colección demoDb.demoColl en una conexión denominada demoConnection conexión:
{ "$merge": { "into": { "connectionName": "demoConnection", "db": "demoDb", "coll": "demoColl" } } }
Haz clic Create stream processoren.
El procesador de flujo se crea y se muestra en la pestaña Stream Processors de la página Stream Processing.
Haz clic Use JSON editoren.
Si existen procesadores de stream processing en tu espacio de trabajo de stream processing, haz clic en el botón + Create stream processor y luego selecciona Visual Builder de las opciones desplegables.
El editor JSON se abre con un cuadro de texto donde puedes configurar tu procesador de flujos en formato JSON.
Define el procesador de flujo.
Especifique la definición JSON para su procesador de flujos en el cuadro de texto del editor JSON. Esta definición debe incluir un nombre para el procesador de stream y una pipeline de agregación que comience con una etapa $source y termine con la etapa $merge. Puedes incluir cualquier número de etapas de agregación adicionales entre las etapas $source y $merge.
Para obtener más información sobre las etapas de agregación del Stream Processing y su sintaxis, consulta etapas del pipeline de agregación.
Ejemplo
La siguiente definición de JSON crea un procesador de flujo llamado solarDemo que utiliza una etapa $tumblingWindow con una etapa anidada $group para agregar datos en tiempo real desde la conexión preconfigurada sample_stream_solar durante intervalos de 10segundos y escribe los datos procesados en una colección en una conexión llamada mongodb1.
{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] }
Para crear un nuevo procesador de streaming con mongosh, se puede utilizar el método sp.createStreamProcessor(). Tiene la siguiente sintaxis:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Requerido | Nombre lógico para el procesador de flujo. Debe ser único en el espacio de trabajo de Stream Processing. Este nombre solo debe contener caracteres alfanuméricos. |
| arreglo | Requerido | Pipeline de agregación de streams que deseas aplicar a tus datos de transmisión. |
| Objeto | Opcional | Objeto que define varias configuraciones opcionales para tu procesador de flujo. |
| Objeto | Condicional | Objeto que asigna una fila de letra muerta para su espacio de trabajo de Stream Processing. Este campo es necesario si se define el campo |
| string | Condicional | Etiqueta legible por humanos que identifica una conexión en el registro de conexiones. Esta conexión debe hacer referencia a un clúster de Atlas. Este campo es necesario si define el campo |
| string | Condicional | Nombre de una base de datos de Atlas en el clúster especificado en |
| string | Condicional | Nombre de una colección en la base de datos especificada en |
| string | Opcional | El nivel del pod al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador a un pod del nivel por defecto del área de trabajo de stream processing. Para obtener más información, consulta Niveles. |
Conéctate a tu espacio de trabajo de stream processing.
Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando mongosh.
En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Connect.
En el cuadro de diálogo Connect to your workspace, selecciona la pestaña Shell.
Copie la cadena de conexión que aparece en el cuadro de diálogo. Tiene el siguiente formato, donde
<atlas-stream-processing-url>es la URL de tu espacio de trabajo de Stream Processing y<username>es el nombre de usuario de un usuario de base de datos con el rolatlasAdmin:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Pega la cadena de conexión en tu terminal y reemplaza el marcador de posición
<password>con las credenciales del usuario. Presiona Enter para ejecutarlo y conectarte a tu espacio de trabajo de stream processing.
Ejemplo
El siguiente comando conecta a un espacio de trabajo de procesamiento de flujos como un usuario llamado streamOwner utilizando x.059 autenticación.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Proporciona tu contraseña de usuario cuando se solicite.
Define una pipeline.
En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.
El siguiente ejemplo de pipeline utiliza el tema stuff en la conexión myKafka en el registro de conexiones como $source, coincide con los registros donde el campo temperature tiene un valor de 46 y emite los mensajes procesados al tema output de la conexión mySink en el registro de conexiones:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
(Opcional) define un DLQ.
En el mensaje de mongosh, asigna un objeto que contenga las siguientes propiedades de tu DLQ:
Nombre de la conexión
Nombre de la base de datos
Nombre de colección
El siguiente ejemplo define una DLQ sobre la conexión cluster01, en la colección de base de datos metadata.dlq.
deadLetter = { dlq: { connectionName: "cluster01", db: "metadata", coll: "dlq" } }
Inicia un procesador de flujo
Nota
Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Cuando se inicia un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.
Para iniciar un procesador de flujos:
La API de administración Atlas proporciona los puntos finales Iniciar un procesador de flujo y Iniciar un procesador de flujo con opciones para iniciar un procesador de flujo.
Para iniciar un procesador de flujos sin opciones, utiliza Iniciar un procesador de flujos
Para iniciar un procesador de stream y modificar las propiedades de la etapa
$source, utiliza Iniciar un Procesador de Stream con Opciones Este endpoint permite modificar las propiedades destartAfterostartAtOperationTime.
Para iniciar un procesador de streaming en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su Proyecto en Atlas y haga clic en Manage en el panel de su área de trabajo de Stream Processing para ver la lista de procesadores de streaming definidos para ella.
Luego, haz clic en el icono Start de tu procesador de flujo.
Para iniciar un procesador de flujos con mongosh, utiliza el método sp.processor.start(). Tiene la siguiente sintaxis:
sp.processor.start(<options>)
Donde <options> es un documento opcional genérico cuyos campos se pasan al comando de inicio subyacente. Estos campos pueden ser:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| token | Condicional | Consulta MongoDB colección Change Stream |
| Marca de tiempo | Condicional | Consulta MongoDB colección Change Stream |
| string | Opcional | El nivel al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador al nivel del espacio de trabajo de Stream Processing. Debe ser uno de los siguientes:
Para obtener más información, consulta Niveles. |
Por ejemplo, para iniciar un procesador de flujo llamado proc01, ejecuta el siguiente comando:
sp.proc01.start()
{ "ok" : 1 }
Este método devuelve { "ok": 1 } si el procesador de flujo existe y no se está ejecutando actualmente. Si invoca sp.processor.start() para un procesador de flujo que no es STOPPED, mongosh devolverá un error.
Detener un procesador de flujo
Nota
Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Cuando se inicia un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.
Para detener un procesador de flujo:
La API de administración de Atlas proporciona un endpoint para detener un procesador de flujo.
Para detener un procesador de flujo en la Interfaz de Usuario de Atlas, ve a la página Stream Processing de tu proyecto Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing para ver la lista de procesadores de flujo definidos para él.
Luego, haz clic en el icono Stop de tu procesador de flujo.
Para detener un procesador de flujo existente con mongosh, utiliza el método sp.processor.stop(). Tiene la siguiente sintaxis:
sp.processor.stop(<options>)
Donde <options> es un documento genérico y opcional cuyos campos se pasan al comando subyacente de detención.
Por ejemplo, para detener un procesador de streams denominado proc01, ejecute el siguiente comando:
sp.proc01.stop()
{ "ok" : 1 }
Este método devuelve { "ok": 1 } si el procesador de flujo existe y se está ejecutando actualmente. Si invoca sp.processor.stop() para un procesador de flujo que no es running, mongosh devolverá un error.
Modificar un procesador de flujos
Puedes modificar los siguientes elementos de un procesador de flujo existente:
Nombre
Para modificar un procesador de flujo, sigue estos pasos:
Por defecto, los procesadores modificados se restauran desde el último punto de control. Alternativamente, se puede configurar resumeFromCheckpoint=false, en cuyo caso el procesador solo mantiene estadísticas resumidas. Cuando se modifica un procesador con ventanas abiertas, las ventanas se recomputan completamente en el pipeline actualizado.
Nota
Si cambias el nombre de un procesador de flujo para el que has configurado la alerta de El estado del procesador de flujo falló usando un Operator (que contiene expresiones coincidentes como is, contains y más), Atlas no activará alertas para el procesador de flujo renombrado si la expresión coincidente no coincide con el nuevo nombre. Para supervisar el procesador de transmisión renombrado, reconfigura la alerta.
Limitaciones
Cuando se activa la configuración por defecto resumeFromCheckpoint=true, se aplican las siguientes limitaciones:
No puedes modificar la etapa
$source.No se puede modificar el intervalo de la ventana.
No puedes remover una ventana.
Solo puedes modificar un pipeline con una ventana si esa ventana tiene una etapa
$groupo$sorten su pipeline interno.No puedes cambiar un tipo de ventana existente. Por ejemplo, no puedes cambiar de
$tumblingWindowa$hoppingWindowo viceversa.Los procesadores con ventanas pueden reprocesar algunos datos como resultado de recalcular las ventanas.
Las estadísticas por operador no se conservan después de una operación de modificación.
Para modificar un procesador de flujo:
La API de administración de Atlas proporciona un endpoint para modificar un procesador de flujo.
Requiere mongosh v2.3.4+.
Use el comando sp.<streamprocessor>.modify() para modificar un procesador de flujo existente. <streamprocessor> debe ser el nombre de un procesador de flujo detenido definido para el espacio de trabajo de Stream Processing actual.
Por ejemplo, para modificar un procesador de flujos llamado proc01, ejecuta el siguiente comando:
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional })
Agregar una etapa a un pipeline existente
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
Modificar la fuente de entrada de un procesador de flujo
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
Remover una fila de letra muerta de un procesador de stream
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
Modifica un Stream Processor con una Ventana
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
Descartar un procesador de flujo
Para descartar un procesador de flujo:
La API de administración de Atlas proporciona un endpoint para borrar un procesador de flujo.
Para borrar un procesador de Stream Processing en la Interfaz de Usuario de Atlas, Go a la página Stream Processing de tu Proyecto Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing para ver la lista de procesadores de Stream Processing definidos para el mismo.
Luego, haz clic en el ícono Delete () para tu procesador de flujo. En el cuadro de diálogo de confirmación que aparece, escriba el nombre del procesador de transmisión (solarDemo) para confirmar que desea borrarlo y luego haga clic en Delete.
Para borrar un procesador de flujo existente con mongosh, usa el método sp.processor.drop(). Tiene la siguiente sintaxis:
sp.processor.drop(<options>)
Donde <options> es un documento genérico y opcional cuyos campos se transfieren al comando descartar subyacente.
Por ejemplo, para descartar un procesador de flujo llamado proc01, ejecuta el siguiente comando:
sp.proc01.drop()
Este método devuelve:
truesi el procesador de flujos existe.falsesi el procesador de flujos no existe.
Cuando se descarta un procesador de flujo, todos los recursos que Atlas Stream Processing ha aprovisionado para él, junto con todo el estado guardado, se destruyen.
Listar los procesadores de flujo disponibles
Para listar todos los procesadores de flujo disponibles:
La API de administración de Atlas proporciona un endpoint para enumerar todos los procesadores de flujos disponibles.
Para ver la lista de procesadores de flujos definidos para tu espacio de trabajo de Stream Processing en la Interfaz de Usuario de Atlas, Go a la página Stream Processing de tu Proyecto de Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing.
Se muestra la lista de procesadores de flujos y sus estados.
Para enumerar todos los procesadores de flujo disponibles en el espacio de trabajo actual de Stream Processing con mongosh, se debe utilizar el método sp.listStreamProcessors(). Devuelve una lista de documentos con el nombre, la hora de inicio, el estado actual y la pipeline asociados a cada procesador de streaming. Tiene la siguiente sintaxis:
sp.listStreamProcessors(<filter>)
<filter> es un documento que especifica por qué campo(s) filtrar la lista.
Ejemplo
El siguiente ejemplo muestra un valor de retorno para una solicitud no filtrada:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
Si ejecutas el comando nuevamente en el mismo espacio de trabajo de procesamiento de flujo, filtrando por un "state" de "running", verás la siguiente salida:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
Enumere los valores por defecto del espacio de trabajo
Para ver información sobre la configuración del nivel de tu espacio de trabajo de Stream Processing:
El endpoint List Stream Workspace Details enumera los detalles de un espacio de trabajo de Stream Processing, incluyendo el defaultTierSize y maxTierSize.
Para ver los defaultTierSize y maxTierSize de un espacio de trabajo de Stream Processing en la Interfaz de Usuario de Atlas, ve a la página Stream Processing de tu Proyecto Atlas.
El defaultTierSize y maxTierSize están listados en la parte inferior de la tarjeta del área de Stream Processing.
Para ver el defaultTierSize y el maxTierSize de un espacio de trabajo de Stream Processing mediante mongosh, utiliza el método sp.listWorkspaceDefaults().
sp.listWorkspaceDefaults()
Este método devuelve:
defaultTierSizemaxTierSize
Muestra de un procesador de flujo
Para devolver un arreglo de resultados muestreados desde un procesador de flujo existente a STDOUT con mongosh, use el método sp.processor.sample(). Por ejemplo, el siguiente comando toma muestras de un procesador de flujo llamado proc01.
sp.proc01.sample()
Este comando se ejecuta de forma continua hasta que se cancele usando CTRL-C, o hasta que las muestras devueltas acumulen un tamaño de 40 MB. El procesador de flujos reporta documentos inválidos en la muestra en un documento _dlqMessage del siguiente formato:
{ _dlqMessage: { errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', workspaceName: '<workspaceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
Puedes utilizar estos mensajes para diagnosticar problemas de higiene de datos sin definir una colección de fila de letra muerta.
Ver estadísticas de un procesador de flujos
Nota
Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Cuando se inicia un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.
Para visualizar las estadísticas de un procesador de secuencias:
La API de administración de Atlas proporciona un endpoint para ver las estadísticas de un procesador de flujo.
Para ver la supervisión de tu procesador de flujos, ve a la página Stream Processing de tu Proyecto Atlas y abre la pestaña Monitoring. Luego, selecciona tu procesador de flujos en la lista desplegable Stream processor en la parte superior izquierda de la página.
Para devolver un documento que resuma el estado actual de un procesador de streams existente con mongosh, utiliza el método sp.processor.stats(). Tiene la siguiente sintaxis:
sp.<streamprocessor>.stats({options: {<options>}})
Donde options es un documento opcional con los siguientes campos:
Campo | Tipo | Descripción |
|---|---|---|
| entero | Unidad a utilizar para el tamaño de los elementos en la salida. Por defecto, Atlas Stream Processing muestra el tamaño de los elementos en bytes. Para mostrar en KB, especifica un |
| booleano | Indicador que especifica el nivel de verbosidad del documento de salida. Si se establece en |
El documento de salida tiene los siguientes campos:
Campo | Tipo | Descripción |
|---|---|---|
| string | El namespace en el que se define el procesador de flujos. |
| Objeto | Un documento que describe el estado operativo del procesador de flujo. |
| string | El nombre del procesador de flujo. |
| string | El estado del procesador de flujo. Este campo puede tener los siguientes valores:
|
| entero | La escala en la que se muestra el campo de tamaños. Si se establece en |
| entero | La cantidad de documentos publicados en el stream. Un documento se considera 'publicado' en el flujo sólo una vez que pasa por la etapa |
| entero | La cantidad de bytes o kilobytes publicados en el flujo. Los bytes se consideran 'publicados' en el flujo una vez que pasan por la etapa |
| entero | El número de documentos procesados por el flujo. Se considera que un documento ha sido 'procesado' por el flujo una vez que pasa por toda la pipeline. |
| entero | La cantidad de bytes o kilobytes procesados por el flujo. Los bytes se consideran "procesados" por el flujo una vez que pasan por todo el pipeline. |
| entero | El número de documentos enviados a la fila de letra muerta. |
| entero | El número de bytes o kilobytes enviados a la fila de letra muerta. |
| entero | La diferencia, en segundos, entre la hora del evento representada por el flujo de cambios más reciente token de reanudación y el evento más reciente en el oplog. |
| token | La flujo de cambios más reciente token de reanudación. Solo aplica a los procesadores de flujo con una fuente de flujo de cambios. |
| Documento | Estadísticas de latencia para el procesador de flujo en su conjunto. Atlas Stream Processing devuelve este campo solo si se pasa la opción |
| entero | La latencia estimada del percentil 50de todos los documentos procesados en los últimos 30 segundos. Si tu pipeline incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana. Por ejemplo, si la etapa de |
| entero | La latencia estimada del percentil 99de todos los documentos procesados en los últimos 30 segundos. Si tu pipeline incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana. Por ejemplo, si la etapa de |
| datetime | Tiempo de pared en el que comenzó la ventana de medición de 30 segundos más reciente. |
| datetime | Momento en que finalizó la ventana de medición más reciente de 30 segundos. |
| string | Unidad de tiempo en la que se cuenta la latencia. Este valor es siempre |
| entero | Número de documentos que el procesador de flujos ha procesado en la ventana de medición más reciente de 30 segundos. |
| entero | Suma de todas las mediciones individuales de latencia, en microsegundos, obtenidas en la ventana de medición más reciente de 30 segundos. |
| entero | La cantidad de bytes que utiliza Windows para almacenar el estado del procesador. |
| entero | La marca de tiempo de la marca de agua actual. |
| arreglo | Las estadísticas para cada operador en el pipeline del procesador. Atlas Stream Processing devuelve este campo sólo si se pasa la opción
|
| entero | El uso máximo de memoria del operador en bytes o kilobytes. |
| entero | El tiempo total de ejecución del operador en segundos. |
| fecha | La hora de inicio de la ventana mínima abierta. Este valor es opcional. |
| fecha | La hora de inicio de la ventana máxima de apertura. Este valor es opcional. |
| arreglo | Las estadísticas por objetivo para ciertos operadores fuente y sumideros. Cada elemento de este arreglo es un documento que representa un único target de entrada o salida, como un input o output colección o un Apache Kafka tema. Según el operador, cada documento contiene un subconjunto de los siguientes campos: Para operadores fuente, como Apache Kafka Ya sea
Para operadores de sumidero, como MongoDB Ya sea
Atlas Stream Processing recopila estadísticas por objetivo sólo para algunas etapas de agregación de flujos, como Apache Kafka Registra estadísticas por objetivo para un máximo de 100 objetivos diferentes; después de eso, el procesador de flujo deja de añadir nuevas entradas a |
| arreglo | Información sobre el offset de las particiones de un Apache Kafka broker (nodo de servidor). |
| entero | El número de partición de temas en Apache Kafka. |
| entero | El desplazamiento en el que se encuentra el procesador de flujo para la partición especificada. Este valor es igual al desplazamiento anterior que el procesador de flujo procesó, más |
| entero | El desplazamiento que el procesador de flujos comprometió por última vez con el Apache Kafka broker y el punto de control para la partición especificada. Todos los mensajes a través de este desplazamiento se registran en el último punto de control. |
| booleano | La bandera que indica si la partición está inactiva. Este valor por defecto es |
Por ejemplo, lo siguiente muestra el estado de un procesador de Stream Processing llamado proc01 en un espacio de trabajo de Stream Processing llamado inst01 y los tamaños de los elementos se muestran en KB:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }
El ejemplo muestra las estadísticas generales del procesador de streams.
Para ver cómo se comportan los operadores individuales o cuánta tráfico gestiona cada objetivo, llama a sp.<streamprocessor>.stats() con la opción verbose y examina stats.operatorStats y, para algunos operadores, stats.operatorStats.targetStats.
Por ejemplo, para un operador fuente, stats.operatorStats.targetStats recopila los campos inputMessageCount y inputMessageSize para cada db/coll único o para cada tema único:
{ "name" : "KafkaConsumerOperator", "inputMessageCount" : NumberLong(100), "inputMessageSize" : 100352, "targetStats" : [ { "topic" : "outputTopic1", "inputMessageCount" : NumberLong(100), "inputMessageSize" : 100352 } ], ... }
Y para un operador Sink , stats.operatorStats.targetStats recopila los campos outputMessageCount y outputMessageSize para cada db/coll único o cada tema único:
{ "name" : "MergeOperator", "inputMessageCount" : NumberLong(10), "inputMessageSize" : 1744, "targetStats" : [ { "db" : "cust1", "coll" : "outColl1", "outputMessageCount" : NumberLong(3), "outputMessageSize" : 1748 }, { "db" : "cust2", "coll" : "outColl2", "outputMessageCount" : NumberLong(4), "outputMessageSize" : 2241 } ], ... }