Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

Desarrollar procesadores de flujo

Un procesador de flujo de Atlas Stream Processing aplica la lógica de un pipeline de agregación de flujo con un nombre único a los datos de transmisión. Atlas Stream Processing guarda cada definición de procesador de flujo en almacenamiento persistente para que pueda reutilizarse. Solo puede usar un procesador de flujo determinado en el espacio de trabajo de Stream Processing en el que está almacenada su definición.

Para crear y gestionar un procesador de flujo, debes tener:

Muchos de los comandos del procesador de flujos requieren que se especifique el nombre del procesador de flujos relevante en la invocación del método. La sintaxis descrita en las siguientes secciones supone nombres estrictamente alfanuméricos. Si el nombre de su procesador de flujo incluye caracteres no alfanuméricos, como guiones (-) o puntos (puntos) (.), debe encerrar el nombre entre corchetes ([]) y comillas double ("") en la invocación del método, como en sp.["special-name-stream"].stats().

Puedes crear un procesador de flujo de forma interactiva con el método sp.process() en mongosh. Los procesadores de flujo que creas de forma interactiva muestran el siguiente comportamiento:

  • Guarde la salida y los documentos de la fila de letra muerta en el shell

  • Empieza a ejecutarse inmediatamente al crearse

  • Se ejecutan durante 10 minutos o hasta que el usuario los detenga

  • No persista después de detenerse

Los procesadores de flujo que creas de forma interactiva sirven para prototipar. Para crear un procesador de flujo persistente, consulte Crear un procesador de flujo.

sp.process() tiene la siguiente sintaxis:

sp.process(<pipeline>)
Campo
Tipo
Necesidad
Descripción

pipeline

arreglo

Requerido

Pipeline de agregación de streams que deseas aplicar a tus datos de transmisión.

Para crear un procesador de flujo de forma interactiva:

1

Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando mongosh.

Ejemplo

El siguiente comando se conecta a un espacio de trabajo de procesamiento de flujos como un usuario llamado streamOwner usando la autenticación x.059:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Proporciona tu contraseña de usuario cuando se solicite.

2

En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.

El siguiente ejemplo utiliza el tema stuff en la conexión myKafka en el registro de conexiones como el $source, coincide con los registros donde el campo temperature tiene un valor de 46 y emite los mensajes procesados al tema output de la conexión mySink en el registro de conexiones:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

El siguiente comando crea un procesador de stream que aplica la lógica definida en pipeline.

sp.process(pipeline)

Para crear un procesador de flujo que persista hasta que se elimine:

La API de administración de Atlas proporciona un endpoint para crear un procesador de flujos.

Crea un Procesador de Flujo

Para crear un procesador de flujos en la interfaz de usuario de Atlas, ve a la página Stream Processing de tu Proyecto de Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing.

Se puede elegir entre utilizar el generador visual o el editor JSON para configurar el procesador de streams:

1

Si existen procesadores de stream processing en tu espacio de trabajo de stream processing, haz clic en el botón + Create stream processor y luego selecciona Visual Builder de las opciones desplegables.

El Constructor Visual se abre con un formulario donde puedes configurar tu procesador de flujo.

2
3

En el campo Source, selecciona una conexión del menú desplegable Connection para usar como fuente de tu procesador de flujos.

Esto abre un cuadro de texto JSON donde puedes configurar la etapa source para el procesador de flujo. Para aprender más sobre la sintaxis de la etapa source, consulta $source.

Ejemplo

La siguiente etapa source opera en datos en tiempo real de la conexión sample_stream_solar preconfigurada:

{
"$source": {
"connectionName": "sample_stream_solar"
}
}
4

En el panel Start building your pipeline, haga clic en el botón para la etapa de agregación que desea agregar a su pipeline. Esto abre un cuadro de texto donde puede configurar la etapa de agregación seleccionada en formato JSON.

Si la etapa de agregación no está en la lista, haz clic en + Custom stage para definir una etapa de agregación admitida en formato JSON. Para aprender más sobre las etapas de agregación de Stream Processing y su sintaxis, consulta Etapas del pipeline de agregación.

Ejemplo

La siguiente etapa $match coincide con todos los documentos en el flujo preconfigurado sample_stream_solar donde el campo obs.watts es mayor que 300:

{
"$match": {
"obs.watts": { "$gt": 300 }
}
}
5

Para añadir etapas de agregación adicionales a tu pipeline, haz clic en el botón + Add stage below debajo de la última etapa de tu pipeline y selecciona la etapa de agregación que deseas añadir o haz clic en Custom stage para definir una etapa de agregación compatible diferente. Esto abre un cuadro de texto donde puedes configurar la nueva etapa en formato JSON.

6

En el campo Sink, selecciona una conexión de destino de la lista desplegable Connection.

En el campo Sink, selecciona una conexión de la lista desplegable Connection a la que se guarden tus datos procesados.

Esto abre un cuadro de texto JSON donde se puede configurar la etapa merge de su procesador de flujo. Para obtener más información sobre la sintaxis de etapa de merge, consulte $merge.

Ejemplo

La siguiente etapa sink de guardar datos procesados en la colección demoDb.demoColl en una conexión denominada demoConnection conexión:

{
"$merge": {
"into": {
"connectionName": "demoConnection",
"db": "demoDb",
"coll": "demoColl"
}
}
}
7

El procesador de flujo se crea y se muestra en la pestaña Stream Processors de la página Stream Processing.

1

Si existen procesadores de stream processing en tu espacio de trabajo de stream processing, haz clic en el botón + Create stream processor y luego selecciona Visual Builder de las opciones desplegables.

El editor JSON se abre con un cuadro de texto donde puedes configurar tu procesador de flujos en formato JSON.

2

Especifique la definición JSON para su procesador de flujos en el cuadro de texto del editor JSON. Esta definición debe incluir un nombre para el procesador de stream y una pipeline de agregación que comience con una etapa $source y termine con la etapa $merge. Puedes incluir cualquier número de etapas de agregación adicionales entre las etapas $source y $merge.

Para obtener más información sobre las etapas de agregación del Stream Processing y su sintaxis, consulta etapas del pipeline de agregación.

Ejemplo

La siguiente definición de JSON crea un procesador de flujo llamado solarDemo que utiliza una etapa $tumblingWindow con una etapa anidada $group para agregar datos en tiempo real desde la conexión preconfigurada sample_stream_solar durante intervalos de 10segundos y escribe los datos procesados en una colección en una conexión llamada mongodb1.

{
"name": "solarDemo",
"pipeline": [
{
"$source": {
"connectionName": "sample_stream_solar"
}
},
{
"$tumblingWindow": {
"interval": {
"size": 10,
"unit": "second"
},
"pipeline": [
{
"$group": {
"_id": "$group_id",
"max_watts": { "$max": "$obs.watts" },
"min_watts": { "$min": "$obs.watts" }
}
}
]
}
},
{
"$merge": {
"into": {
"connectionName": "mongodb1",
"db": "solarDb",
"coll": "solarColl"
}
}
}
]
}

Para crear un nuevo procesador de streaming con mongosh, se puede utilizar el método sp.createStreamProcessor(). Tiene la siguiente sintaxis:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
Tipo
Necesidad
Descripción

name

string

Requerido

Nombre lógico para el procesador de flujo. Debe ser único en el espacio de trabajo de Stream Processing. Este nombre solo debe contener caracteres alfanuméricos.

pipeline

arreglo

Requerido

Pipeline de agregación de streams que deseas aplicar a tus datos de transmisión.

options

Objeto

Opcional

Objeto que define varias configuraciones opcionales para tu procesador de flujo.

options.dlq

Objeto

Condicional

Objeto que asigna una fila de letra muerta para su espacio de trabajo de Stream Processing. Este campo es necesario si se define el campo options.

options.dlq.connectionName

string

Condicional

Etiqueta legible por humanos que identifica una conexión en el registro de conexiones. Esta conexión debe hacer referencia a un clúster de Atlas. Este campo es necesario si define el campo options.dlq.

options.dlq.db

string

Condicional

Nombre de una base de datos de Atlas en el clúster especificado en options.dlq.connectionName. Este campo es necesario si se define el campo options.dlq.

options.dlq.coll

string

Condicional

Nombre de una colección en la base de datos especificada en options.dlq.db. Este campo es necesario si defines el campo options.dlq.

options.tier

string

Opcional

El nivel del pod al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador a un pod del nivel por defecto del área de trabajo de stream processing. Para obtener más información, consulta Niveles.

1

Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando mongosh.

  1. En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Connect.

  2. En el cuadro de diálogo Connect to your workspace, selecciona la pestaña Shell.

  3. Copie la cadena de conexión que aparece en el cuadro de diálogo. Tiene el siguiente formato, donde <atlas-stream-processing-url> es la URL de tu espacio de trabajo de Stream Processing y <username> es el nombre de usuario de un usuario de base de datos con el rol atlasAdmin:

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>
    --password <password>
  4. Pega la cadena de conexión en tu terminal y reemplaza el marcador de posición <password> con las credenciales del usuario. Presiona Enter para ejecutarlo y conectarte a tu espacio de trabajo de stream processing.

Ejemplo

El siguiente comando conecta a un espacio de trabajo de procesamiento de flujos como un usuario llamado streamOwner utilizando x.059 autenticación.

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Proporciona tu contraseña de usuario cuando se solicite.

2

En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.

El siguiente ejemplo de pipeline utiliza el tema stuff en la conexión myKafka en el registro de conexiones como $source, coincide con los registros donde el campo temperature tiene un valor de 46 y emite los mensajes procesados al tema output de la conexión mySink en el registro de conexiones:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

En el mensaje de mongosh, asigna un objeto que contenga las siguientes propiedades de tu DLQ:

  • Nombre de la conexión

  • Nombre de la base de datos

  • Nombre de colección

El siguiente ejemplo define una DLQ sobre la conexión cluster01, en la colección de base de datos metadata.dlq.

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

El siguiente comando crea un procesador de flujos llamado proc01 que aplica la lógica definida en pipeline. Los documentos que generan errores durante el procesamiento se escriben en la DLQ definida en deadLetter.

sp.createStreamProcessor("proc01", pipeline, deadLetter)

Nota

Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Cuando se inicia un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.

Para iniciar un procesador de flujos:

La API de administración Atlas proporciona los puntos finales Iniciar un procesador de flujo y Iniciar un procesador de flujo con opciones para iniciar un procesador de flujo.

Para iniciar un procesador de streaming en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su Proyecto en Atlas y haga clic en Manage en el panel de su área de trabajo de Stream Processing para ver la lista de procesadores de streaming definidos para ella.

Luego, haz clic en el icono Start de tu procesador de flujo.

Para iniciar un procesador de flujos con mongosh, utiliza el método sp.processor.start(). Tiene la siguiente sintaxis:

sp.processor.start(<options>)

Donde <options> es un documento opcional genérico cuyos campos se pasan al comando de inicio subyacente. Estos campos pueden ser:

Campo
Tipo
Necesidad
Descripción

startAfter

token

Condicional

startAtOperationTime

Marca de tiempo

Condicional

tier

string

Opcional

El nivel al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador al nivel del espacio de trabajo de Stream Processing. Debe ser uno de los siguientes:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

Para obtener más información, consulta Niveles.

Por ejemplo, para iniciar un procesador de flujo llamado proc01, ejecuta el siguiente comando:

sp.proc01.start()
{ "ok" : 1 }

Este método devuelve { "ok": 1 } si el procesador de flujo existe y no se está ejecutando actualmente. Si invoca sp.processor.start() para un procesador de flujo que no es STOPPED, mongosh devolverá un error.

Nota

Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Cuando se inicia un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.

Para detener un procesador de flujo:

La API de administración de Atlas proporciona un endpoint para detener un procesador de flujo.

Detén un procesador de flujo

Para detener un procesador de flujo en la Interfaz de Usuario de Atlas, ve a la página Stream Processing de tu proyecto Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing para ver la lista de procesadores de flujo definidos para él.

Luego, haz clic en el icono Stop de tu procesador de flujo.

Para detener un procesador de flujo existente con mongosh, utiliza el método sp.processor.stop(). Tiene la siguiente sintaxis:

sp.processor.stop(<options>)

Donde <options> es un documento genérico y opcional cuyos campos se pasan al comando subyacente de detención.

Por ejemplo, para detener un procesador de streams denominado proc01, ejecute el siguiente comando:

sp.proc01.stop()
{ "ok" : 1 }

Este método devuelve { "ok": 1 } si el procesador de flujo existe y se está ejecutando actualmente. Si invoca sp.processor.stop() para un procesador de flujo que no es running, mongosh devolverá un error.

Puedes modificar los siguientes elementos de un procesador de flujo existente:

Para modificar un procesador de flujo, sigue estos pasos:

Por defecto, los procesadores modificados se restauran desde el último punto de control. Alternativamente, se puede configurar resumeFromCheckpoint=false, en cuyo caso el procesador solo mantiene estadísticas resumidas. Cuando se modifica un procesador con ventanas abiertas, las ventanas se recomputan completamente en el pipeline actualizado.

Nota

Si cambias el nombre de un procesador de flujo para el que has configurado la alerta de El estado del procesador de flujo falló usando un Operator (que contiene expresiones coincidentes como is, contains y más), Atlas no activará alertas para el procesador de flujo renombrado si la expresión coincidente no coincide con el nuevo nombre. Para supervisar el procesador de transmisión renombrado, reconfigura la alerta.

Cuando se activa la configuración por defecto resumeFromCheckpoint=true, se aplican las siguientes limitaciones:

  • No puedes modificar la etapa $source.

  • No se puede modificar el intervalo de la ventana.

  • No puedes remover una ventana.

  • Solo puedes modificar un pipeline con una ventana si esa ventana tiene una etapa $group o $sort en su pipeline interno.

  • No puedes cambiar un tipo de ventana existente. Por ejemplo, no puedes cambiar de $tumblingWindow a $hoppingWindow o viceversa.

  • Los procesadores con ventanas pueden reprocesar algunos datos como resultado de recalcular las ventanas.

  • Las estadísticas por operador no se conservan después de una operación de modificación.

La API de administración de Atlas proporciona un endpoint para modificar un procesador de flujo.

Modifica un Procesador de Flujo

Requiere mongosh v2.3.4+.

Use el comando sp.<streamprocessor>.modify() para modificar un procesador de flujo existente. <streamprocessor> debe ser el nombre de un procesador de flujo detenido definido para el espacio de trabajo de Stream Processing actual.

Por ejemplo, para modificar un procesador de flujos llamado proc01, ejecuta el siguiente comando:

sp.proc1.modify(<pipeline>, {
resumeFromCheckpoint: bool, // optional
name: string, // optional
dlq: string, // optional
})
sp.createStreamProcessor("foo", [
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
])
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
]);
sp.foo.start();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test",
config: {
startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000)
}
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.stop();
sp.foo.modify({dlq: {}})
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$replaceRoot: {newRoot: "$fullDocument"}},
{$match: {cost: {$gt: 500}}},
{$tumblingWindow: {
interval: {unit: "day", size: 1},
pipeline: [
{$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}}
]
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.start();

Para descartar un procesador de flujo:

La API de administración de Atlas proporciona un endpoint para borrar un procesador de flujo.

Borrar un procesador de flujo

Para borrar un procesador de Stream Processing en la Interfaz de Usuario de Atlas, Go a la página Stream Processing de tu Proyecto Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing para ver la lista de procesadores de Stream Processing definidos para el mismo.

Luego, haz clic en el ícono Delete () para tu procesador de flujo. En el cuadro de diálogo de confirmación que aparece, escriba el nombre del procesador de transmisión (solarDemo) para confirmar que desea borrarlo y luego haga clic en Delete.

Para borrar un procesador de flujo existente con mongosh, usa el método sp.processor.drop(). Tiene la siguiente sintaxis:

sp.processor.drop(<options>)

Donde <options> es un documento genérico y opcional cuyos campos se transfieren al comando descartar subyacente.

Por ejemplo, para descartar un procesador de flujo llamado proc01, ejecuta el siguiente comando:

sp.proc01.drop()

Este método devuelve:

  • true si el procesador de flujos existe.

  • false si el procesador de flujos no existe.

Cuando se descarta un procesador de flujo, todos los recursos que Atlas Stream Processing ha aprovisionado para él, junto con todo el estado guardado, se destruyen.

Para listar todos los procesadores de flujo disponibles:

La API de administración de Atlas proporciona un endpoint para enumerar todos los procesadores de flujos disponibles.

Listado de procesadores de flujo

Para ver la lista de procesadores de flujos definidos para tu espacio de trabajo de Stream Processing en la Interfaz de Usuario de Atlas, Go a la página Stream Processing de tu Proyecto de Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing.

Se muestra la lista de procesadores de flujos y sus estados.

Para enumerar todos los procesadores de flujo disponibles en el espacio de trabajo actual de Stream Processing con mongosh, se debe utilizar el método sp.listStreamProcessors(). Devuelve una lista de documentos con el nombre, la hora de inicio, el estado actual y la pipeline asociados a cada procesador de streaming. Tiene la siguiente sintaxis:

sp.listStreamProcessors(<filter>)

<filter> es un documento que especifica por qué campo(s) filtrar la lista.

Ejemplo

El siguiente ejemplo muestra un valor de retorno para una solicitud no filtrada:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

Si ejecutas el comando nuevamente en el mismo espacio de trabajo de procesamiento de flujo, filtrando por un "state" de "running", verás la siguiente salida:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

Para ver información sobre la configuración del nivel de tu espacio de trabajo de Stream Processing:

El endpoint List Stream Workspace Details enumera los detalles de un espacio de trabajo de Stream Processing, incluyendo el defaultTierSize y maxTierSize.

Para ver los defaultTierSize y maxTierSize de un espacio de trabajo de Stream Processing en la Interfaz de Usuario de Atlas, ve a la página Stream Processing de tu Proyecto Atlas.

El defaultTierSize y maxTierSize están listados en la parte inferior de la tarjeta del área de Stream Processing.

Para ver el defaultTierSize y el maxTierSize de un espacio de trabajo de Stream Processing mediante mongosh, utiliza el método sp.listWorkspaceDefaults().

sp.listWorkspaceDefaults()

Este método devuelve:

  • defaultTierSize

  • maxTierSize

Para devolver un arreglo de resultados muestreados desde un procesador de flujo existente a STDOUT con mongosh, use el método sp.processor.sample(). Por ejemplo, el siguiente comando toma muestras de un procesador de flujo llamado proc01.

sp.proc01.sample()

Este comando se ejecuta de forma continua hasta que se cancele usando CTRL-C, o hasta que las muestras devueltas acumulen un tamaño de 40 MB. El procesador de flujos reporta documentos inválidos en la muestra en un documento _dlqMessage del siguiente formato:

{
_dlqMessage: {
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
workspaceName: '<workspaceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

Puedes utilizar estos mensajes para diagnosticar problemas de higiene de datos sin definir una colección de fila de letra muerta.

Nota

Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Cuando se inicia un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.

Para visualizar las estadísticas de un procesador de secuencias:

La API de administración de Atlas proporciona un endpoint para ver las estadísticas de un procesador de flujo.

Obtener un procesador de flujo

Para ver la supervisión de tu procesador de flujos, ve a la página Stream Processing de tu Proyecto Atlas y abre la pestaña Monitoring. Luego, selecciona tu procesador de flujos en la lista desplegable Stream processor en la parte superior izquierda de la página.

Para devolver un documento que resuma el estado actual de un procesador de streams existente con mongosh, utiliza el método sp.processor.stats(). Tiene la siguiente sintaxis:

sp.<streamprocessor>.stats({options: {<options>}})

Donde options es un documento opcional con los siguientes campos:

Campo
Tipo
Descripción

scale

entero

Unidad a utilizar para el tamaño de los elementos en la salida. Por defecto, Atlas Stream Processing muestra el tamaño de los elementos en bytes. Para mostrar en KB, especifica un scale de 1024.

verbose

booleano

Indicador que especifica el nivel de verbosidad del documento de salida. Si se establece en true, el documento de salida contiene un subdocumento que informa las estadísticas de cada operador individual en tu pipeline. Por defecto, es false.

El documento de salida tiene los siguientes campos:

Campo
Tipo
Descripción

ns

string

El namespace en el que se define el procesador de flujos.

stats

Objeto

Un documento que describe el estado operativo del procesador de flujo.

stats.name

string

El nombre del procesador de flujo.

stats.status

string

El estado del procesador de flujo. Este campo puede tener los siguientes valores:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor

entero

La escala en la que se muestra el campo de tamaños. Si se establece en 1, los tamaños se muestran en bytes. Si se establece en 1024, los tamaños se muestran en kilobytes.

stats.inputMessageCount

entero

La cantidad de documentos publicados en el stream. Un documento se considera 'publicado' en el flujo sólo una vez que pasa por la etapa $source, no cuando pasa por toda la pipeline.

stats.inputMessageSize

entero

La cantidad de bytes o kilobytes publicados en el flujo. Los bytes se consideran 'publicados' en el flujo una vez que pasan por la etapa $source, no cuando pasan por toda la pipeline.

stats.outputMessageCount

entero

El número de documentos procesados por el flujo. Se considera que un documento ha sido 'procesado' por el flujo una vez que pasa por toda la pipeline.

stats.outputMessageSize

entero

La cantidad de bytes o kilobytes procesados por el flujo. Los bytes se consideran "procesados" por el flujo una vez que pasan por todo el pipeline.

stats.dlqMessageCount

entero

El número de documentos enviados a la fila de letra muerta.

stats.dlqMessageSize

entero

El número de bytes o kilobytes enviados a la fila de letra muerta.

stats.changeStreamTimeDifferenceSecs

entero

La diferencia, en segundos, entre la hora del evento representada por el flujo de cambios más reciente token de reanudación y el evento más reciente en el oplog.

stats.changeStreamState

token

La flujo de cambios más reciente token de reanudación. Solo aplica a los procesadores de flujo con una fuente de flujo de cambios.

stats.latency

Documento

Estadísticas de latencia para el procesador de flujo en su conjunto. Atlas Stream Processing devuelve este campo solo si se pasa la opción verbose.

stats.latency.p50

entero

La latencia estimada del percentil 50de todos los documentos procesados en los últimos 30 segundos. Si tu pipeline incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana.

Por ejemplo, si la etapa de $tumblingWindow tiene un intervalo de 5 minutos, las mediciones de latencia incluirán esos 5 minutos.

stats.latency.p99

entero

La latencia estimada del percentil 99de todos los documentos procesados en los últimos 30 segundos. Si tu pipeline incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana.

Por ejemplo, si la etapa de $tumblingWindow tiene un intervalo de 5 minutos, las mediciones de latencia incluirán esos 5 minutos.

stats.latency.start

datetime

Tiempo de pared en el que comenzó la ventana de medición de 30 segundos más reciente.

stats.latency.end

datetime

Momento en que finalizó la ventana de medición más reciente de 30 segundos.

stats.latency.unit

string

Unidad de tiempo en la que se cuenta la latencia. Este valor es siempre microseconds.

stats.latency.count

entero

Número de documentos que el procesador de flujos ha procesado en la ventana de medición más reciente de 30 segundos.

stats.latency.sum

entero

Suma de todas las mediciones individuales de latencia, en microsegundos, obtenidas en la ventana de medición más reciente de 30 segundos.

stats.stateSize

entero

La cantidad de bytes que utiliza Windows para almacenar el estado del procesador.

stats.watermark

entero

La marca de tiempo de la marca de agua actual.

stats.operatorStats

arreglo

Las estadísticas para cada operador en el pipeline del procesador. Atlas Stream Processing devuelve este campo sólo si se pasa la opción verbose.

stats.operatorStats proporciona versiones por operador de muchos campos principales de stats:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.latency

  • stats.operatorStats.stateSize

stats.operatorStats incluye los siguientes campos únicos:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTimeMillis

stats.operatorStats también incluye los siguientes campos dado que se haya pasado la opción verbose y su procesador incluya una etapa de ventana:

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats también puede incluir el siguiente campo para ciertos operadores de fuente y sumidero:

  • stats.operatorStats.targetStats

stats.operatorStats.maxMemoryUsage

entero

El uso máximo de memoria del operador en bytes o kilobytes.

stats.operatorStats.executionTimeSecs

entero

El tiempo total de ejecución del operador en segundos.

stats.minOpenWindowStartTime

fecha

La hora de inicio de la ventana mínima abierta. Este valor es opcional.

stats.maxOpenWindowStartTime

fecha

La hora de inicio de la ventana máxima de apertura. Este valor es opcional.

stats.operatorStats.targetStats

arreglo

Las estadísticas por objetivo para ciertos operadores fuente y sumideros.

Cada elemento de este arreglo es un documento que representa un único target de entrada o salida, como un input o output colección o un Apache Kafka tema. Según el operador, cada documento contiene un subconjunto de los siguientes campos:

Para operadores fuente, como Apache Kafka $source o fuentes de flujo de cambios:

Ya sea db y coll para destinos MongoDB, o topic para Apache Kafka destinos.

  • inputMessageCount

  • inputMessageSize

Para operadores de sumidero, como MongoDB $merge o $emit sumideros:

Ya sea db y coll para destinos MongoDB, o topic para Apache Kafka destinos.

  • outputMessageCount

  • outputMessageSize

Atlas Stream Processing recopila estadísticas por objetivo sólo para algunas etapas de agregación de flujos, como Apache Kafka $source y MongoDB $merge.

Registra estadísticas por objetivo para un máximo de 100 objetivos diferentes; después de eso, el procesador de flujo deja de añadir nuevas entradas a targetStats pero sigue actualizando las estadísticas agregadas por operador.

stats.kafkaPartitions

arreglo

Información sobre el offset de las particiones de un Apache Kafka broker (nodo de servidor). kafkaPartitions se aplica solo a las conexiones que utilizan una fuente de Apache Kafka.

stats.kafkaPartitions.partition

entero

El número de partición de temas en Apache Kafka.

stats.kafkaPartitions.currentOffset

entero

El desplazamiento en el que se encuentra el procesador de flujo para la partición especificada. Este valor es igual al desplazamiento anterior que el procesador de flujo procesó, más 1.

stats.kafkaPartitions.checkpointOffset

entero

El desplazamiento que el procesador de flujos comprometió por última vez con el Apache Kafka broker y el punto de control para la partición especificada. Todos los mensajes a través de este desplazamiento se registran en el último punto de control.

stats.kafkaPartitions.isIdle

booleano

La bandera que indica si la partición está inactiva. Este valor por defecto es false.

Por ejemplo, lo siguiente muestra el estado de un procesador de Stream Processing llamado proc01 en un espacio de trabajo de Stream Processing llamado inst01 y los tamaños de los elementos se muestran en KB:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

El ejemplo muestra las estadísticas generales del procesador de streams.

Para ver cómo se comportan los operadores individuales o cuánta tráfico gestiona cada objetivo, llama a sp.<streamprocessor>.stats() con la opción verbose y examina stats.operatorStats y, para algunos operadores, stats.operatorStats.targetStats.

Por ejemplo, para un operador fuente, stats.operatorStats.targetStats recopila los campos inputMessageCount y inputMessageSize para cada db/coll único o para cada tema único:

{
"name" : "KafkaConsumerOperator",
"inputMessageCount" : NumberLong(100),
"inputMessageSize" : 100352,
"targetStats" : [
{
"topic" : "outputTopic1",
"inputMessageCount" : NumberLong(100),
"inputMessageSize" : 100352
}
],
...
}

Y para un operador Sink , stats.operatorStats.targetStats recopila los campos outputMessageCount y outputMessageSize para cada db/coll único o cada tema único:

{
"name" : "MergeOperator",
"inputMessageCount" : NumberLong(10),
"inputMessageSize" : 1744,
"targetStats" : [
{
"db" : "cust1",
"coll" : "outColl1",
"outputMessageCount" : NumberLong(3),
"outputMessageSize" : 1748
},
{
"db" : "cust2",
"coll" : "outColl2",
"outputMessageCount" : NumberLong(4),
"outputMessageSize" : 2241
}
],
...
}