Un procesador de flujo Atlas Stream Processing aplica la lógica de un procesador con nombre único. Canal de agregación de flujos a sus datos de streaming. Atlas Stream Processing guarda cada definición de procesador de flujo en almacenamiento persistente para que pueda reutilizarse. Solo puede usar un procesador de flujo determinado en el espacio de trabajo de procesamiento de flujos donde se almacena su definición.
Requisitos previos
Para crear y administrar un procesador de flujo, debe tener:
Un usuario de base de datos con la
atlasAdminRol para crear y ejecutar procesadores de flujoUn grupo Atlas
Considerations
Muchos comandos de procesadores de flujo requieren que se especifique el nombre del procesador correspondiente al invocar el método. La sintaxis descrita en las siguientes secciones asume nombres estrictamente alfanuméricos. Si el nombre de su procesador de flujo incluye caracteres no alfanuméricos, como guiones (-) o puntos (.), debe escribirlo entre corchetes ([]) y comillas dobles ("") al invocar el método, como en sp.["special-name-stream"].stats().
Crear un procesador de flujo de forma interactiva
Puede crear un procesador de flujo de forma interactiva con el sp.process() método en. Los procesadores de flujo creados de forma interactiva presentan el siguiente mongosh comportamiento:
Escribe documentos de salida y de cola de mensajes no entregados en el shell
Comienza a ejecutarse inmediatamente después de su creación.
Ejecutar durante 10 minutos o hasta que el usuario lo detenga
No persista después de parar
Los procesadores de flujo que se crean interactivamente están diseñados para la creación de prototipos. Para crear un procesador de flujo persistente, consulte Crear un procesador de flujo.
sp.process() tiene la siguiente sintaxis:
sp.process(<pipeline>)
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| arreglo | Requerido | Canal de agregación de transmisiones que desea aplicar a sus datos de transmisión. |
Para crear un procesador de flujo de forma interactiva:
Conéctese a su espacio de trabajo de procesamiento de flujo.
Utilice la cadena de conexión asociada con su espacio de trabajo de procesamiento de flujo para conectarse mongosh usando.
Ejemplo
El siguiente comando se conecta a un espacio de trabajo de procesamiento de flujos como un usuario llamado streamOwner usando la autenticación x.059:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Proporcione su contraseña de usuario cuando se le solicite.
Definir una tubería.
En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.
El siguiente ejemplo utiliza el tema stuff en la conexión myKafka en el registro de conexiones como el $source, coincide con los registros donde el campo temperature tiene un valor de 46 y emite los mensajes procesados al tema output de la conexión mySink en el registro de conexiones:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
Crea un Procesador de Streams
Para crear un procesador de flujo que persista hasta que lo elimines:
La API de administración de Atlas proporciona un punto final para crear un procesador de flujo.
Para crear un procesador de flujo en la interfaz de usuario de Atlas, vaya a Stream Processing página para su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo.
Puede elegir entre utilizar el Visual Builder o el EditorJSON para configurar su procesador de flujo:
Haga clic en Create with visual builder.
Si hay procesadores de flujo existentes en su espacio de trabajo de procesamiento de flujo, haga clic en el botón + Create stream processor y luego seleccione Visual Builder de las opciones desplegables.
El Visual Builder se abre con un formulario donde puede configurar su procesador de flujo.
Agregar una conexión de origen.
En el campo Source, seleccione una conexión de la lista desplegable Connection para usar como fuente para su procesador de flujo.
Esto abre un cuadro de texto JSON donde puede configurar la source etapa para su procesador de flujo. Para obtener más información sobre la source sintaxis de la $source etapa, consulte.
Ejemplo
La siguiente etapa source opera con datos en tiempo real de la conexión sample_stream_solar preconfigurada:
{ "$source": { "connectionName": "sample_stream_solar" } }
Agregue etapas de agregación a la canalización del procesador de flujo.
En el Start building your pipeline panel, haga clic en el botón de la etapa de agregación que desee añadir a su pipeline. Se abrirá un cuadro de texto donde podrá configurar la etapa de agregación seleccionada en formato JSON.
Si su etapa de agregación no aparece en la lista, haga + Custom stage clic en para definir una etapa de agregación compatible en formato JSON. Para obtener más información sobre las etapas de agregación del procesamiento de flujos y su sintaxis, consulte Etapas de la canalización de agregación.
Ejemplo
La siguiente etapa coincide con todos los documentos en $match la sample_stream_solar secuencia preconfigurada donde el obs.watts campo es mayor 300 que:
{ "$match": { "obs.watts": { "$gt": 300 } } }
(Opcional) Configure etapas de agregación adicionales.
Para añadir etapas de agregación adicionales a su pipeline, haga clic + Add stage below en el botón debajo de la última etapa y seleccione la etapa que desea añadir o haga clic en Custom stage para definir otra etapa de agregación compatible. Se abrirá un cuadro de texto donde podrá configurar la nueva etapa en formato JSON.
Añade una conexión de fregadero.
En el campo Sink, seleccione una conexión de destino de la lista desplegable Connection.
En el campo Sink, seleccione una conexión de la lista desplegable Connection para escribir los datos procesados.
Esto abre un cuadro de texto JSON donde puede configurar la merge etapa para su procesador de flujo. Para obtener más información sobre la merge sintaxis de la $merge etapa, consulte.
Ejemplo
La siguiente etapa sink escribe datos procesados en la colección demoDb.demoColl en una conexión denominada demoConnection:
{ "$merge": { "into": { "connectionName": "demoConnection", "db": "demoDb", "coll": "demoColl" } } }
Haga clic en Use JSON editor.
Si hay procesadores de flujo existentes en su espacio de trabajo de procesamiento de flujo, haga clic en el botón + Create stream processor y luego seleccione Visual Builder de las opciones desplegables.
El editor JSON se abre con un cuadro de texto donde puedes configurar tu procesador de flujo en formato JSON.
Defina el procesador de flujo.
Especifique la definición JSON de su procesador de flujo en el cuadro de texto del editor JSON. Esta definición debe incluir un nombre para su procesador de flujo y una canalización de agregación que comience con la etapa$sourcey termine con la etapa$merge. Puede incluir cualquier número de etapas de agregación adicionales entre las etapas $source y $merge.
Para obtener más información sobre las etapas de agregación del procesamiento de flujo y su sintaxis, consulte Etapas de la canalización de agregación.
Ejemplo
La siguiente definición JSON crea un procesador de flujo llamado que utiliza solarDemo una etapa con una $tumblingWindow etapa anidada para agregar datos en tiempo real desde la $group sample_stream_solar conexión preconfigurada en 10intervalos de segundos y escribe los datos procesados en una colección en una conexión mongodb1 llamada.
{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] }
Para crear un nuevo procesador de flujo con,mongosh utilice el método. Tiene la siguiente sp.createStreamProcessor() sintaxis:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Requerido | Nombre lógico del procesador de flujo. Debe ser único dentro del espacio de trabajo de procesamiento de flujo. Debe contener únicamente caracteres alfanuméricos. |
| arreglo | Requerido | Canal de agregación de transmisiones que desea aplicar a sus datos de transmisión. |
| Objeto | Opcional | Objeto que define varias configuraciones opcionales para su procesador de flujo. |
| Objeto | Condicional | Objeto que asigna una cola de mensajes no entregados a su espacio de trabajo de procesamiento de flujos. Este campo es necesario si define el |
| string | Condicional | Etiqueta legible que identifica una conexión en su registro de conexiones. Esta conexión debe hacer referencia a un clúster Atlas. Este campo es necesario si define el campo |
| string | Condicional | Nombre de una base de datos Atlas en el clúster especificado en |
| string | Condicional | Nombre de una colección en la base de datos especificada en |
| string | Opcional | El nivel del pod al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador a un pod del nivel por defecto del área de trabajo de stream processing. Para obtener más información, consulta Niveles. |
Conéctese a su espacio de trabajo de procesamiento de flujo.
Utilice la cadena de conexión asociada con su espacio de trabajo de procesamiento de flujo para conectarse mongosh usando.
En el panel de su espacio de trabajo de procesamiento de flujo, haga clic en Connect.
En el cuadro de diálogo Connect to your workspace, seleccione la pestaña Shell.
Copie la cadena de conexión que se muestra en el cuadro de diálogo. Tiene el siguiente formato:
<atlas-stream-processing-url>es la URL de su espacio de trabajo de procesamiento de flujos y<username>es el nombre de usuario de la base de datos con elatlasAdminrol.mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Pegue la cadena de conexión en su terminal y reemplace el marcador
<password>con las credenciales del usuario. Presione Enter para ejecutarlo y conectarse a su espacio de trabajo de procesamiento de flujos.
Ejemplo
El siguiente comando se conecta a un espacio de trabajo de procesamiento de flujo como un usuario llamado streamOwner utilizando la autenticación x..059
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Proporcione su contraseña de usuario cuando se le solicite.
Definir una tubería.
En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.
El siguiente ejemplo de canalización utiliza el stuff tema en la myKafka conexión en el registro de conexión como, coincide con los registros donde $source el temperature campo tiene un valor de 46 y emite los mensajes procesados al output tema de la mySink conexión en el registro de conexión:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
(Opcional) Defina un DLQ.
En el mensaje, asigne un objeto que contenga las siguientes propiedades de su mongosh DLQ:
Nombre de la conexión
Nombre de la base de datos
Nombre de colección
El siguiente ejemplo define un DLQ a través de la conexión cluster01, en la colección de bases de datos metadata.dlq.
deadLetter = { dlq: { connectionName: "cluster01", db: "metadata", coll: "dlq" } }
Crear un procesador de flujo.
El siguiente comando crea un procesador de flujo llamado proc01 que aplica la lógica definida en pipeline. Los documentos que generan errores durante el procesamiento se escriben en la cola de mensajes de error definida en deadLetter.
sp.createStreamProcessor("proc01", pipeline, deadLetter)
Iniciar un procesador de flujo
Nota
Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Al iniciar un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.
Para iniciar un procesador de flujo:
La API de administración de Atlas proporciona los puntos finales Iniciar un procesador de flujo y Iniciar un procesador de flujo con opciones para iniciar un procesador de flujo.
Para iniciar un procesador de flujo sin opciones, utilice Iniciar un procesador de flujo
Para iniciar un procesador de flujo y modificar las propiedades de la
$sourceetapa, utilice Iniciar un procesador de flujo con opciones. Este punto final admite la modificación de lasstartAfterstartAtOperationTimepropiedades o.
Para iniciar un procesador de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo para ver la lista de procesadores de flujo definidos para él.
Luego, haga clic en el ícono Start para su procesador de flujo.
Para iniciar un procesador de flujo con,mongosh sp.processor.start() utilice el método. Tiene la siguiente sintaxis:
sp.processor.start(<options>)
Donde <options> puede ser uno de los siguientes:
Opción | Descripción |
|---|---|
| |
| El nivel del pod al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador a un pod del nivel por defecto del área de trabajo de stream processing. Para obtener más información, consulta Niveles. |
Por ejemplo, para iniciar un procesador de flujo llamado proc01, ejecute el siguiente comando:
sp.proc01.start()
{ "ok" : 1 }
Este método devuelve { "ok": 1 } si el procesador de flujo existe y no se está ejecutando. Si se invoca sp.processor.start() para un procesador de flujo que no STOPPED es, devolverá unmongosh error.
Detener un procesador de flujo
Nota
Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Al iniciar un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.
Para detener un procesador de flujo:
La API de administración de Atlas proporciona un punto final para detener un procesador de flujo.
Para pausar un procesador de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo para ver la lista de procesadores de flujo definidos para él.
Luego, haga clic en el ícono Pause para su procesador de flujo.
Para detener un procesador de flujo existente con,mongosh sp.processor.stop() utilice el método.
Por ejemplo, para detener un procesador de flujo llamado proc01, ejecute el siguiente comando:
sp.proc01.stop()
{ "ok" : 1 }
Este método devuelve { "ok": 1 } si el procesador de flujo existe y se está ejecutando. Si se invoca sp.processor.stop() para un procesador de flujo distinto running de, devolverá unmongosh error.
Modificar un procesador de flujo
Puede modificar los siguientes elementos de un procesador de flujo existente:
Para modificar un procesador de flujo, haga lo siguiente:
De forma predeterminada, los procesadores modificados se restauran desde el último punto de control. Como alternativa, puede configurar resumeFromCheckpoint=false, en cuyo caso el procesador solo conserva las estadísticas resumidas. Al modificar un procesador con ventanas abiertas, estas se recalculan completamente en la canalización actualizada.
Nota
Si cambia el nombre de un procesador de flujo para el que había configurado Operator iscontainsla alerta "El estado del procesador de flujo ha fallado" mediante un (que contiene expresiones de comparación como, y otras), Atlas no activará alertas para el procesador de flujo renombrado si la expresión de comparación no coincide con el nuevo nombre. Para supervisar el procesador de flujo renombrado, reconfigure la alerta.
Limitaciones
Cuando la configuración predeterminada resumeFromCheckpoint=true está habilitada, se aplican las siguientes limitaciones:
No puedes modificar la etapa
$source.No puedes modificar el intervalo de tu ventana.
No puedes eliminar una ventana.
Solo se puede modificar una tubería con una ventana si esa ventana tiene una etapa
$groupo$sorten su tubería interna.No puedes cambiar un tipo de ventana existente. Por ejemplo, no puedes cambiar de
$tumblingWindowa$hoppingWindowo viceversa.Los procesadores con ventanas pueden reprocesar algunos datos como producto del recálculo de las ventanas.
Para modificar un procesador de flujo:
La API de administración de Atlas proporciona un endpoint para modificar un procesador de flujo.
Requiere mongosh v2.3.4+.
Utilice el comando sp.<streamprocessor>.modify() para modificar un procesador de flujo existente. <streamprocessor> debe ser el nombre de un procesador de flujo detenido definido para el espacio de trabajo de procesamiento de flujo actual.
Por ejemplo, para modificar un procesador de flujo llamado proc01, ejecute el siguiente comando:
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional }})
Agregar una etapa a una tubería existente
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
Modificar la fuente de entrada de un procesador de flujo
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
Eliminar una cola de mensajes no entregados de un procesador de flujo
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
Modificar un procesador de flujo con una ventana
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
Eliminar un procesador de flujo
Para eliminar un procesador de flujo:
La API de administración de Atlas proporciona un punto final para eliminar un procesador de flujo.
Para eliminar un procesador de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo para ver la lista de procesadores de flujo definidos para él.
Luego, haga clic Delete en el icono ( ) de su procesador de flujo. En el cuadro de diálogo de confirmación que aparece, escriba el nombre del procesador de flujosolarDemo () para confirmar que desea eliminarlo y haga clic Delete en.
Para eliminar un procesador de flujo existente con,mongosh utilice el sp.processor.drop() método.
Por ejemplo, para eliminar un procesador de flujo llamado proc01, ejecute el siguiente comando:
sp.proc01.drop()
Este método devuelve:
trueSi existe el procesador de flujo.falsesi el procesador de flujos no existe.
Cuando se elimina un procesador de flujo, se destruyen todos los recursos que Atlas Stream Processing aprovisionó para él, junto con todo el estado guardado.
Lista de procesadores de flujo disponibles
Para enumerar todos los procesadores de flujo disponibles:
La API de administración de Atlas proporciona un punto final para enumerar todos los procesadores de flujo disponibles.
Para ver la lista de procesadores de flujo definidos para su espacio de trabajo de procesamiento de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo.
Se muestra la lista de procesadores de flujo y sus estados.
Para listar todos los procesadores de flujo disponibles en el espacio de trabajo de procesamiento de flujo actual con,mongosh sp.listStreamProcessors() utilice el método. Esto devuelve una lista de documentos que contiene el nombre, la hora de inicio, el estado actual y la canalización asociada a cada procesador de flujo. Tiene la siguiente sintaxis:
sp.listStreamProcessors(<filter>)
<filter> es un documento que especifica por qué campo(s) filtrar la lista.
Ejemplo
El siguiente ejemplo muestra un valor de retorno para una solicitud sin filtrar:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
Si ejecuta el comando nuevamente en el mismo espacio de trabajo de procesamiento de flujo, filtrando por "state" de "running", verá el siguiente resultado:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
Muestra de un procesador de flujo
Para devolver una matriz de resultados muestreados de un procesador de flujo existente a STDOUT mongoshcon, utilice el sp.processor.sample() método. Por ejemplo, el siguiente comando muestrea de un procesador de flujo proc01 llamado.
sp.proc01.sample()
Este comando se ejecuta de forma continua hasta que se cancele usando CTRL-C, o hasta que las muestras devueltas acumulen un tamaño de 40 MB. El procesador de flujos reporta documentos inválidos en la muestra en un documento _dlqMessage del siguiente formato:
{ _dlqMessage: { errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', workspaceName: '<workspaceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
Puede utilizar estos mensajes para diagnosticar problemas de higiene de datos sin definir una recopilación de cola de mensajes inactivos.
Ver estadísticas de un procesador de flujo
Nota
Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Al iniciar un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.
Para ver las estadísticas de un procesador de flujo:
La API de administración de Atlas proporciona un punto final para ver las estadísticas de un procesador de flujo.
Para ver la supervisión de tu procesador de flujos, ve a la página Stream Processing de tu Proyecto Atlas y abre la pestaña Monitoring. Luego, selecciona tu procesador de flujos en la lista desplegable Stream processor en la parte superior izquierda de la página.
Para devolver un documento que resuma el estado actual de un procesador de flujo existente con,mongosh utilice el método. Tiene la siguiente sp.processor.stats() sintaxis:
sp.<streamprocessor>.stats({options: {<options>}})
Donde options es un documento opcional con los siguientes campos:
Campo | Tipo | Descripción |
|---|---|---|
| entero | Unidad que se utiliza para el tamaño de los elementos en la salida. De forma predeterminada, Atlas Stream Processing muestra el tamaño de los elementos en bytes. Para mostrarlo en KB, especifique un valor de |
| booleano | Indicador que especifica el nivel de detalle del documento de salida. Si se establece en |
El documento de salida tiene los siguientes campos:
Campo | Tipo | Descripción |
|---|---|---|
| string | El espacio de nombres en el que se define el procesador de flujo. |
| Objeto | Un documento que describe el estado operativo del procesador de flujo. |
| string | El nombre del procesador de flujo. |
| string | El estado del procesador de flujo. Este campo puede tener los siguientes valores:
|
| entero | La escala en la que se muestra el campo de tamaño. Si se establece en |
| entero | Número de documentos publicados en el flujo. Un documento se considera "publicado" en el flujo una vez que pasa por la |
| entero | La cantidad de bytes o kilobytes publicados en la secuencia. Los bytes se consideran "publicados" en la secuencia una vez que pasan por la |
| entero | El número de documentos procesados por el flujo. Un documento se considera procesado por el flujo una vez que pasa por todo el pipeline. |
| entero | El número de bytes o kilobytes procesados por el flujo. Los bytes se consideran "procesados" por el flujo una vez que pasan por toda la canalización. |
| entero | El número de documentos enviados a la cola de cartas muertas. |
| entero | La cantidad de bytes o kilobytes enviados a la cola de mensajes no entregados. |
| entero | La diferencia, en segundos, entre el tiempo del evento representado por el token de reanudación del flujo de cambio más reciente y el último evento en el registro de operaciones. |
| token | El token de reanudación del flujo de cambios más reciente. Solo se aplica a procesadores de flujo con un origen de flujo de cambios. |
| Documento | Estadísticas de latencia del procesador de flujo en su conjunto. Atlas Stream Processing devuelve este campo solo si se pasa la opción |
| entero | La latencia estimada del percentil 50de todos los documentos procesados en los últimos 30 segundos. Si su canalización incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana. Por ejemplo, si su etapa |
| entero | La latencia estimada del percentil 99de todos los documentos procesados en los últimos 30 segundos. Si su canalización incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana. Por ejemplo, si su etapa |
| datetime | Hora de la pared en la que comenzó la ventana de medición de 30 segundos más reciente. |
| datetime | Hora de la pared en la que finalizó la ventana de medición de 30 segundos más reciente. |
| string | Unidad de tiempo en la que se contabiliza la latencia. Este valor siempre es |
| entero | Número de documentos que el procesador de flujos ha procesado en la ventana de medición más reciente de 30 segundos. |
| entero | Suma de todas las mediciones de latencia individuales, en microsegundos, tomadas en la ventana de medición de 30 segundos más reciente. |
| entero | La cantidad de bytes que utiliza Windows para almacenar el estado del procesador. |
| entero | La marca de tiempo de la marca de agua actual. |
| arreglo | Las estadísticas para cada operador en el pipeline del procesador. Atlas Stream Processing devuelve este campo sólo si se pasa la opción
|
| entero | El uso máximo de memoria del operador en bytes o kilobytes. |
| entero | El tiempo total de ejecución del operador en segundos. |
| fecha | La hora de inicio de la ventana mínima abierta. Este valor es opcional. |
| fecha | Hora de inicio de la ventana de máxima apertura. Este valor es opcional. |
| arreglo | Información de desplazamiento para un Apache Kafkaparticiones |
| entero | El número de partición de temas en Apache Kafka. |
| entero | El desplazamiento en el que se encuentra el procesador de flujo para la partición especificada. Este valor es igual al desplazamiento anterior que procesó el procesador de flujo más |
| entero | El último desplazamiento que el procesador de flujo envió al broker de Apache Kafka y el punto de control de la partición especificada. Todos los mensajes a través de este desplazamiento se registran en el último punto de control. |
| booleano | La bandera que indica si la partición está inactiva. Este valor por defecto es |
Por ejemplo, a continuación se muestra el estado de un procesador de flujo llamado proc01 en un espacio de trabajo de procesamiento de flujo llamado inst01 con tamaños de elementos mostrados en KB:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }