Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Desarrollar procesadores de flujo

Un procesador de flujo de Atlas Stream Processing aplica la lógica de un nombre único pipeline de agregación de stream a los datos de transmisión. Atlas Stream Processing guarda cada definición de procesador de flujo en almacenamiento persistente para que pueda ser reutilizada. Solo se puede utilizar un procesador de flujo dado en el espacio de trabajo de procesamiento de flujos en el que se almacena su definición.

Para crear y gestionar un procesador de flujo, debes tener:

Muchos de los comandos del procesador de flujos requieren que se especifique el nombre del procesador de flujos relevante en la invocación del método. La sintaxis descrita en las siguientes secciones supone nombres estrictamente alfanuméricos. Si el nombre de su procesador de flujo incluye caracteres no alfanuméricos, como guiones (-) o puntos (puntos) (.), debe encerrar el nombre entre corchetes ([]) y comillas double ("") en la invocación del método, como en sp.["special-name-stream"].stats().

Puede crear un procesador de flujo de forma interactiva con el sp.process() método en. Los procesadores de flujo creados de forma interactiva presentan el siguiente mongosh comportamiento:

  • Escribe documentos de salida y de cola de mensajes no entregados en el shell

  • Empieza a ejecutarse inmediatamente al crearse

  • Ejecutar durante 10 minutos o hasta que el usuario lo detenga

  • No persista después de detenerse

Los procesadores de flujo que se crean interactivamente están diseñados para la creación de prototipos. Para crear un procesador de flujo persistente, consulte Crear un procesador de flujo.

sp.process() tiene la siguiente sintaxis:

sp.process(<pipeline>)
Campo
Tipo
Necesidad
Descripción

pipeline

arreglo

Requerido

Pipeline de agregación de streams que deseas aplicar a tus datos de transmisión.

Para crear un procesador de flujo de forma interactiva:

1

Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando mongosh.

Ejemplo

El siguiente comando se conecta a un espacio de trabajo de procesamiento de flujos como un usuario llamado streamOwner usando la autenticación x.059:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Proporciona tu contraseña de usuario cuando se solicite.

2

En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.

El siguiente ejemplo utiliza el tema stuff en la conexión myKafka en el registro de conexiones como el $source, coincide con los registros donde el campo temperature tiene un valor de 46 y emite los mensajes procesados al tema output de la conexión mySink en el registro de conexiones:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

El siguiente comando crea un procesador de flujo que aplica la lógica definida en pipeline.

sp.process(pipeline)

Para crear un procesador de flujo que persista hasta que se elimine:

La API de administración de Atlas proporciona un punto final para crear un procesador de flujo.

Crea un Procesador de Flujo

Para crear un procesador de flujos en la interfaz de usuario de Atlas, ve a Stream Processing página para su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo.

Puede elegir entre utilizar el Visual Builder o el JSON editor para configurar tu procesador de streams:

1

Si existen procesadores de stream processing en tu espacio de trabajo de stream processing, haz clic en el botón + Create stream processor y luego selecciona Visual Builder de las opciones desplegables.

El Constructor Visual se abre con un formulario donde puedes configurar tu procesador de flujo.

2
3

En el campo Source, selecciona una conexión del menú desplegable Connection para usar como fuente de tu procesador de flujos.

Esto abre un JSON donde puedes configurar la source etapa de tu procesador de transmisión. Para obtener más información sobre la sintaxis de la etapa source, consulta $source.

Ejemplo

La siguiente etapa source opera en datos en tiempo real de la conexión sample_stream_solar preconfigurada:

{
"$source": {
"connectionName": "sample_stream_solar"
}
}
4

En el panel Start building your pipeline, haga clic en el botón para la etapa de agregación que desea agregar a su pipeline. Esto abre un cuadro de texto donde se puede configurar la etapa de agregación seleccionada en formato JSON.

Si tu etapa de agregación no está en la lista, haz clic en + Custom stage para definir una etapa de agregación compatible en formato JSON. Para obtener más información sobre las etapas de agregación en el Stream Processing y su sintaxis, consulta Etapas del Pipeline de Agregación.

Ejemplo

La siguiente etapa $match coincide con todos los documentos en el flujo preconfigurado sample_stream_solar donde el campo obs.watts es mayor que 300:

{
"$match": {
"obs.watts": { "$gt": 300 }
}
}
5

Para agregar etapas de agregación adicionales a tu pipeline, haz clic en el botón + Add stage below debajo de la última etapa de tu pipeline y selecciona la etapa de agregación que deseas agregar o haz clic en Custom stage para definir una etapa de agregación compatible diferente. Esto abre un cuadro de texto donde puedes configurar la nueva etapa en formato JSON.

6

En el campo Sink, selecciona una conexión de destino de la lista desplegable Connection.

En el campo Sink, selecciona una conexión de la lista desplegable Connection a la que se guarden tus datos procesados.

Esto abre un cuadro de texto JSON donde puede configurar la merge etapa para su procesador de flujo. Para obtener más información sobre la merge sintaxis de la $merge etapa, consulte.

Ejemplo

La siguiente etapa sink de guardar datos procesados en la colección demoDb.demoColl en una conexión denominada demoConnection conexión:

{
"$merge": {
"into": {
"connectionName": "demoConnection",
"db": "demoDb",
"coll": "demoColl"
}
}
}
7

El procesador de flujo se crea y se muestra en la pestaña Stream Processors de la página Stream Processing.

1

Si existen procesadores de stream processing en tu espacio de trabajo de stream processing, haz clic en el botón + Create stream processor y luego selecciona Visual Builder de las opciones desplegables.

El editor JSON se abre con un cuadro de texto donde puede configurar su procesador de flujo en formato JSON.

2

Especifica la definición de en el cuadro de texto del editor JSON. JSON Esta definición debe incluir un nombre para tu procesador de flujos y un pipeline de agregación que comience con una etapa $source y finalice con la etapa $merge. Puedes incluir cualquier número de etapas adicionales de agregación entre las etapas $source y $merge.

Para obtener más información sobre las etapas de agregación del procesamiento de flujo y su sintaxis, consulte Etapas de la canalización de agregación.

Ejemplo

La siguiente definición JSON crea un procesador de flujo denominado solarDemo que utiliza una etapa $tumblingWindow con una etapa anidada $group para agregar datos en tiempo real de la conexión preconfigurada sample_stream_solar durante intervalos de 10segundos, y guarda los datos procesados en una colección en una conexión denominada mongodb1.

{
"name": "solarDemo",
"pipeline": [
{
"$source": {
"connectionName": "sample_stream_solar"
}
},
{
"$tumblingWindow": {
"interval": {
"size": 10,
"unit": "second"
},
"pipeline": [
{
"$group": {
"_id": "$group_id",
"max_watts": { "$max": "$obs.watts" },
"min_watts": { "$min": "$obs.watts" }
}
}
]
}
},
{
"$merge": {
"into": {
"connectionName": "mongodb1",
"db": "solarDb",
"coll": "solarColl"
}
}
}
]
}

Para crear un nuevo procesador de streaming con mongosh, se puede utilizar el método sp.createStreamProcessor(). Tiene la siguiente sintaxis:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
Tipo
Necesidad
Descripción

name

string

Requerido

Nombre lógico del procesador de flujo. Debe ser único dentro del espacio de trabajo de procesamiento de flujo. Debe contener únicamente caracteres alfanuméricos.

pipeline

arreglo

Requerido

Pipeline de agregación de streams que deseas aplicar a tus datos de transmisión.

options

Objeto

Opcional

Objeto que define varias configuraciones opcionales para tu procesador de flujo.

options.dlq

Objeto

Condicional

Objeto que asigna una cola de mensajes no entregados a su espacio de trabajo de procesamiento de flujos. Este campo es necesario si define el options campo.

options.dlq.connectionName

string

Condicional

Etiqueta legible que identifica una conexión en su registro de conexiones. Esta conexión debe hacer referencia a un clúster Atlas. Este campo es necesario si define el campo options.dlq.

options.dlq.db

string

Condicional

Nombre de una base de datos Atlas en el clúster especificado en options.dlq.connectionName. Este campo es necesario si se define el campo options.dlq.

options.dlq.coll

string

Condicional

Nombre de una colección en la base de datos especificada en options.dlq.db. Este campo es necesario si defines el campo options.dlq.

options.tier

string

Opcional

El nivel del pod al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador a un pod del nivel por defecto del área de trabajo de stream processing. Para obtener más información, consulta Niveles.

1

Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando mongosh.

  1. En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Connect.

  2. En el cuadro de diálogo Connect to your workspace, seleccione la pestaña Shell.

  3. Copie la cadena de conexión que aparece en el cuadro de diálogo. Tiene el siguiente formato, donde <atlas-stream-processing-url> es la URL de tu espacio de trabajo de Stream Processing y <username> es el nombre de usuario de un usuario de base de datos con el rol atlasAdmin:

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>
    --password <password>
  4. Pega la cadena de conexión en tu terminal y reemplaza el marcador de posición <password> con las credenciales del usuario. Presiona Enter para ejecutarlo y conectarte a tu espacio de trabajo de stream processing.

Ejemplo

El siguiente comando conecta a un espacio de trabajo de procesamiento de flujos como un usuario llamado streamOwner utilizando x.059 autenticación.

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Proporciona tu contraseña de usuario cuando se solicite.

2

En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.

El siguiente ejemplo de canalización utiliza el stuff tema en la myKafka conexión en el registro de conexión como, coincide con los registros donde $source el temperature campo tiene un valor de 46 y emite los mensajes procesados ​​al output tema de la mySink conexión en el registro de conexión:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

En el mensaje de mongosh, asigna un objeto que contenga las siguientes propiedades de tu DLQ:

  • Nombre de la conexión

  • Nombre de la base de datos

  • Nombre de colección

El siguiente ejemplo define una DLQ sobre la conexión cluster01, en la colección de base de datos metadata.dlq.

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

El siguiente comando crea un procesador de flujos llamado proc01 que aplica la lógica definida en pipeline. Los documentos que generan errores durante el procesamiento se escriben en la DLQ definida en deadLetter.

sp.createStreamProcessor("proc01", pipeline, deadLetter)

Nota

Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Al iniciar un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.

Para iniciar un procesador de flujo:

La API de administración Atlas proporciona los puntos finales Iniciar un procesador de flujo y Iniciar un procesador de flujo con opciones para iniciar un procesador de flujo.

Para iniciar un procesador de streaming en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su Proyecto en Atlas y haga clic en Manage en el panel de su área de trabajo de Stream Processing para ver la lista de procesadores de streaming definidos para ella.

Luego, haz clic en el icono Start de tu procesador de flujo.

Para iniciar un procesador de flujos con mongosh, utiliza el método sp.processor.start(). Tiene la siguiente sintaxis:

sp.processor.start(<options>)

Donde <options> es un documento opcional genérico cuyos campos se pasan al comando de inicio subyacente. Estos campos pueden ser:

Campo
Tipo
Necesidad
Descripción

startAfter

token

Condicional

startAtOperationTime

fecha y hora

Condicional

tier

string

Opcional

El nivel al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador al nivel del espacio de trabajo de Stream Processing. Debe ser uno de los siguientes:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

Para obtener más información, consulta Niveles.

Por ejemplo, para iniciar un procesador de flujo llamado proc01, ejecuta el siguiente comando:

sp.proc01.start()
{ "ok" : 1 }

Este método devuelve { "ok": 1 } si el procesador de flujo existe y no se está ejecutando actualmente. Si invoca sp.processor.start() para un procesador de flujo que no es STOPPED, mongosh devolverá un error.

Nota

Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Al iniciar un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.

Para detener un procesador de flujo:

La API de administración de Atlas proporciona un punto final para detener un procesador de flujo.

Detén un procesador de flujo

Para detener un procesador de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto de Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo para ver la lista de procesadores de flujo definidos para él.

Luego, haz clic en el icono Stop de tu procesador de flujo.

Para detener un procesador de flujo existente con mongosh, utiliza el método sp.processor.stop(). Tiene la siguiente sintaxis:

sp.processor.stop(<options>)

Donde <options> es un documento genérico y opcional cuyos campos se pasan al comando subyacente de detención.

Por ejemplo, para detener un procesador de streams denominado proc01, ejecute el siguiente comando:

sp.proc01.stop()
{ "ok" : 1 }

Este método devuelve { "ok": 1 } si el procesador de flujo existe y se está ejecutando actualmente. Si invoca sp.processor.stop() para un procesador de flujo que no es running, mongosh devolverá un error.

Puede modificar los siguientes elementos de un procesador de flujo existente:

Para modificar un procesador de flujo, sigue estos pasos:

  1. Detener el procesador de flujo.

  2. Modificar el procesador de flujo.

  3. Reinicia el procesador de flujos.

Por defecto, los procesadores modificados se restauran desde el último punto de control. Alternativamente, se puede configurar resumeFromCheckpoint=false, en cuyo caso el procesador solo mantiene estadísticas resumidas. Cuando se modifica un procesador con ventanas abiertas, las ventanas se recomputan completamente en el pipeline actualizado.

Nota

Si cambia el nombre de un procesador de flujo para el que había configurado Operator iscontainsla alerta "El estado del procesador de flujo ha fallado" mediante un (que contiene expresiones de comparación como, y otras), Atlas no activará alertas para el procesador de flujo renombrado si la expresión de comparación no coincide con el nuevo nombre. Para supervisar el procesador de flujo renombrado, reconfigure la alerta.

Cuando la configuración predeterminada resumeFromCheckpoint=true está habilitada, se aplican las siguientes limitaciones:

  • No puedes modificar la etapa $source.

  • No se puede modificar el intervalo de la ventana.

  • No puedes remover una ventana.

  • Solo puedes modificar un pipeline con una ventana si esa ventana tiene una etapa $group o $sort en su pipeline interno.

  • No puedes cambiar un tipo de ventana existente. Por ejemplo, no puedes cambiar de $tumblingWindow a $hoppingWindow o viceversa.

  • Los procesadores con ventanas pueden reprocesar algunos datos como resultado de recalcular las ventanas.

  • Las estadísticas por operador no se conservan después de una operación de modificación.

La API de administración de Atlas proporciona un endpoint para modificar un procesador de flujo.

Modifica un Procesador de Flujo

Requiere mongosh v2.3.4+.

Use el comando sp.<streamprocessor>.modify() para modificar un procesador de flujo existente. <streamprocessor> debe ser el nombre de un procesador de flujo detenido definido para el espacio de trabajo de Stream Processing actual.

Por ejemplo, para modificar un procesador de flujo llamado proc01, ejecute el siguiente comando:

sp.proc1.modify(<pipeline>, {
resumeFromCheckpoint: bool, // optional
name: string, // optional
dlq: string, // optional
})
sp.createStreamProcessor("foo", [
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
])
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
]);
sp.foo.start();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test",
config: {
startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000)
}
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.stop();
sp.foo.modify({dlq: {}})
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$replaceRoot: {newRoot: "$fullDocument"}},
{$match: {cost: {$gt: 500}}},
{$tumblingWindow: {
interval: {unit: "day", size: 1},
pipeline: [
{$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}}
]
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.start();

Para descartar un procesador de flujo:

La API de administración de Atlas proporciona un endpoint para borrar un procesador de flujo.

Eliminar un procesador de flujo

Para borrar un procesador de Stream Processing en la Interfaz de Usuario de Atlas, Go a la página Stream Processing de tu Proyecto Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing para ver la lista de procesadores de Stream Processing definidos para el mismo.

Luego, haz clic en el ícono Delete () para tu procesador de flujo. En el cuadro de diálogo de confirmación que aparece, escriba el nombre del procesador de transmisión (solarDemo) para confirmar que desea borrarlo y luego haga clic en Delete.

Para eliminar un procesador de flujo existente con,mongosh utilice el método. Tiene la siguiente sp.processor.drop() sintaxis:

sp.processor.drop(<options>)

Donde <options> es un documento genérico y opcional cuyos campos se transfieren al comando descartar subyacente.

Por ejemplo, para descartar un procesador de flujo llamado proc01, ejecuta el siguiente comando:

sp.proc01.drop()

Este método devuelve:

  • true si el procesador de flujos existe.

  • false si el procesador de flujos no existe.

Cuando se elimina un procesador de flujo, se destruyen todos los recursos que Atlas Stream Processing aprovisionó para él, junto con todo el estado guardado.

Para enumerar todos los procesadores de flujo disponibles:

La API de administración de Atlas proporciona un punto final para enumerar todos los procesadores de flujo disponibles.

Listado de procesadores de flujo

Para ver la lista de procesadores de flujos definidos para tu espacio de trabajo de Stream Processing en la Interfaz de Usuario de Atlas, Go a la página Stream Processing de tu Proyecto de Atlas y haz clic en Manage en el panel de tu espacio de trabajo de Stream Processing.

Se muestra la lista de procesadores de flujos y sus estados.

Para listar todos los procesadores de flujo disponibles en el espacio de trabajo de procesamiento de flujo actual con,mongosh sp.listStreamProcessors() utilice el método. Esto devuelve una lista de documentos que contiene el nombre, la hora de inicio, el estado actual y la canalización asociada a cada procesador de flujo. Tiene la siguiente sintaxis:

sp.listStreamProcessors(<filter>)

<filter> es un documento que especifica por qué campo(s) filtrar la lista.

Ejemplo

El siguiente ejemplo muestra un valor de retorno para una solicitud no filtrada:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

Si ejecutas el comando nuevamente en el mismo espacio de trabajo de procesamiento de flujo, filtrando por un "state" de "running", verás la siguiente salida:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

Para ver información sobre el tamaño de los niveles del espacio de trabajo de procesamiento de flujo:

El endpoint List Stream Workspace Details enumera los detalles de un espacio de trabajo de Stream Processing, incluyendo el defaultTierSize y maxTierSize.

Para ver los defaultTierSize y maxTierSize de un espacio de trabajo de Stream Processing en la Interfaz de Usuario de Atlas, ve a la página Stream Processing de tu Proyecto Atlas.

El defaultTierSize y maxTierSize están listados en la parte inferior de la tarjeta del área de Stream Processing.

Para ver el defaultTierSize y el maxTierSize de un espacio de trabajo de Stream Processing mediante mongosh, utiliza el método sp.listWorkspaceDefaults().

sp.listWorkspaceDefaults()

Este método devuelve:

  • defaultTierSize

  • maxTierSize

Para devolver un arreglo de resultados muestreados desde un procesador de flujo existente a STDOUT con mongosh, use el método sp.processor.sample(). Por ejemplo, el siguiente comando toma muestras de un procesador de flujo llamado proc01.

sp.proc01.sample()

Este comando se ejecuta de forma continua hasta que se cancele usando CTRL-C, o hasta que las muestras devueltas acumulen un tamaño de 40 MB. El procesador de flujos reporta documentos inválidos en la muestra en un documento _dlqMessage del siguiente formato:

{
_dlqMessage: {
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
workspaceName: '<workspaceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

Puede utilizar estos mensajes para diagnosticar problemas de higiene de datos sin definir una recopilación de cola de mensajes inactivos.

Nota

Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Al iniciar un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.

Para visualizar las estadísticas de un procesador de secuencias:

La API de administración de Atlas proporciona un punto final para ver las estadísticas de un procesador de flujo.

Obtenga un procesador de flujo

Para ver la supervisión de tu procesador de flujos, ve a la página Stream Processing de tu Proyecto Atlas y abre la pestaña Monitoring. Luego, selecciona tu procesador de flujos en la lista desplegable Stream processor en la parte superior izquierda de la página.

Para devolver un documento que resuma el estado actual de un procesador de streams existente con mongosh, utiliza el método sp.processor.stats(). Tiene la siguiente sintaxis:

sp.<streamprocessor>.stats({options: {<options>}})

Donde options es un documento opcional con los siguientes campos:

Campo
Tipo
Descripción

scale

entero

Unidad a utilizar para el tamaño de los elementos en la salida. Por defecto, Atlas Stream Processing muestra el tamaño de los elementos en bytes. Para mostrar en KB, especifica un scale de 1024.

verbose

booleano

Indicador que especifica el nivel de verbosidad del documento de salida. Si se establece en true, el documento de salida contiene un subdocumento que informa las estadísticas de cada operador individual en tu pipeline. Por defecto, es false.

El documento de salida tiene los siguientes campos:

Campo
Tipo
Descripción

ns

string

El namespace en el que se define el procesador de flujos.

stats

Objeto

Un documento que describe el estado operativo del procesador de flujo.

stats.name

string

El nombre del procesador de flujo.

stats.status

string

El estado del procesador de flujo. Este campo puede tener los siguientes valores:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor

entero

La escala en la que se muestra el campo de tamaño. Si se establece en 1, los tamaños se muestran en bytes. Si se establece en 1024, los tamaños se muestran en kilobytes.

stats.inputMessageCount

entero

La cantidad de documentos publicados en el stream. Un documento se considera 'publicado' en el flujo sólo una vez que pasa por la etapa $source, no cuando pasa por toda la pipeline.

stats.inputMessageSize

entero

La cantidad de bytes o kilobytes publicados en el flujo. Los bytes se consideran 'publicados' en el flujo una vez que pasan por la etapa $source, no cuando pasan por toda la pipeline.

stats.outputMessageCount

entero

El número de documentos procesados por el flujo. Se considera que un documento ha sido 'procesado' por el flujo una vez que pasa por toda la pipeline.

stats.outputMessageSize

entero

El número de bytes o kilobytes procesados ​​por el flujo. Los bytes se consideran "procesados" por el flujo una vez que pasan por toda la canalización.

stats.dlqMessageCount

entero

El número de documentos enviados a la fila de letra muerta.

stats.dlqMessageSize

entero

El número de bytes o kilobytes enviados a la fila de letra muerta.

stats.changeStreamTimeDifferenceSecs

entero

La diferencia, en segundos, entre la hora del evento representada por el flujo de cambios más reciente token de reanudación y el evento más reciente en el oplog.

stats.changeStreamState

token

La flujo de cambios más reciente token de reanudación. Solo aplica a los procesadores de flujo con una fuente de flujo de cambios.

stats.latency

Documento

Estadísticas de latencia para el procesador de flujo en su conjunto. Atlas Stream Processing devuelve este campo solo si se pasa la opción verbose.

stats.latency.p50

entero

La latencia estimada del percentil 50de todos los documentos procesados en los últimos 30 segundos. Si tu pipeline incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana.

Por ejemplo, si la etapa de $tumblingWindow tiene un intervalo de 5 minutos, las mediciones de latencia incluirán esos 5 minutos.

stats.latency.p99

entero

La latencia estimada del percentil 99de todos los documentos procesados en los últimos 30 segundos. Si tu pipeline incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana.

Por ejemplo, si la etapa de $tumblingWindow tiene un intervalo de 5 minutos, las mediciones de latencia incluirán esos 5 minutos.

stats.latency.start

datetime

Tiempo de pared en el que comenzó la ventana de medición de 30 segundos más reciente.

stats.latency.end

datetime

Momento en que finalizó la ventana de medición más reciente de 30 segundos.

stats.latency.unit

string

Unidad de tiempo en la que se cuenta la latencia. Este valor es siempre microseconds.

stats.latency.count

entero

Número de documentos que el procesador de flujos ha procesado en la ventana de medición más reciente de 30 segundos.

stats.latency.sum

entero

Suma de todas las mediciones de latencia individuales, en microsegundos, tomadas en la ventana de medición de 30 segundos más reciente.

stats.stateSize

entero

La cantidad de bytes que utiliza Windows para almacenar el estado del procesador.

stats.watermark

entero

La marca de tiempo de la marca de agua actual.

stats.operatorStats

arreglo

Las estadísticas para cada operador en el pipeline del procesador. Atlas Stream Processing devuelve este campo sólo si se pasa la opción verbose.

stats.operatorStats proporciona versiones por operador de muchos campos principales de stats:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.latency

  • stats.operatorStats.stateSize

stats.operatorStats incluye los siguientes campos únicos:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTimeMillis

stats.operatorStats también incluye los siguientes campos dado que se haya pasado la opción verbose y su procesador incluya una etapa de ventana:

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats también puede incluir el siguiente campo para ciertos operadores de fuente y sumidero:

  • stats.operatorStats.targetStats

stats.operatorStats.maxMemoryUsage

entero

El uso máximo de memoria del operador en bytes o kilobytes.

stats.operatorStats.executionTimeSecs

entero

El tiempo total de ejecución del operador en segundos.

stats.minOpenWindowStartTime

fecha

La hora de inicio de la ventana mínima abierta. Este valor es opcional.

stats.maxOpenWindowStartTime

fecha

La hora de inicio de la ventana máxima de apertura. Este valor es opcional.

stats.operatorStats.targetStats

arreglo

Las estadísticas por objetivo para ciertos operadores de origen y destino.

Cada elemento de este arreglo es un documento que representa un único objetivo de entrada o salida, como una colección de entrada o salida o un Apache Kafka Tema. Dependiendo del operador, cada documento contiene un subconjunto de los siguientes campos:

Para operadores fuente, como Apache Kafka $source o fuentes de flujo de cambios:

Ya sea db y coll para destinos MongoDB, o topic para Apache Kafka destinos.

  • inputMessageCount

  • inputMessageSize

Para operadores de sumidero, como MongoDB $merge o $emit sumideros:

Ya sea db y coll para destinos MongoDB, o topic para Apache Kafka destinos.

  • outputMessageCount

  • outputMessageSize

Atlas Stream Processing recopila estadísticas por objetivo solo para algunas etapas de agregación de flujos, como Apache Kafka $source y MongoDB.$merge

Registra estadísticas por objetivo para un máximo de 100 objetivos diferentes; después de eso, el procesador de flujo deja de añadir nuevas entradas a targetStats pero sigue actualizando las estadísticas agregadas por operador.

stats.kafkaPartitions

arreglo

Información sobre el offset de las particiones de un Apache Kafka broker (nodo de servidor). kafkaPartitions se aplica solo a las conexiones que utilizan una fuente de Apache Kafka.

stats.kafkaPartitions.partition

entero

El número de partición de temas en Apache Kafka.

stats.kafkaPartitions.currentOffset

entero

El desplazamiento en el que se encuentra el procesador de flujo para la partición especificada. Este valor es igual al desplazamiento anterior que procesó el procesador de flujo más 1.

stats.kafkaPartitions.checkpointOffset

entero

El desplazamiento que el procesador de flujos comprometió por última vez con el Apache Kafka broker y el punto de control para la partición especificada. Todos los mensajes a través de este desplazamiento se registran en el último punto de control.

stats.kafkaPartitions.isIdle

booleano

La bandera que indica si la partición está inactiva. Este valor por defecto es false.

Por ejemplo, a continuación se muestra el estado de un procesador de flujo llamado proc01 en un espacio de trabajo de procesamiento de flujo llamado inst01 con tamaños de elementos mostrados en KB:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

El ejemplo muestra las estadísticas generales del procesador de streams.

Para ver cómo se comportan los operadores individuales o cuánta tráfico gestiona cada objetivo, llama a sp.<streamprocessor>.stats() con la opción verbose y examina stats.operatorStats y, para algunos operadores, stats.operatorStats.targetStats.

Por ejemplo, para un operador fuente, stats.operatorStats.targetStats recopila los campos inputMessageCount y inputMessageSize para cada db/coll único o para cada tema único:

{
"name" : "KafkaConsumerOperator",
"inputMessageCount" : NumberLong(100),
"inputMessageSize" : 100352,
"targetStats" : [
{
"topic" : "outputTopic1",
"inputMessageCount" : NumberLong(100),
"inputMessageSize" : 100352
}
],
...
}

Y para un operador de sumidero, stats.operatorStats.targetStats recopila los campos outputMessageCount y outputMessageSize para cada db/coll único o cada tema único:

{
"name" : "MergeOperator",
"inputMessageCount" : NumberLong(10),
"inputMessageSize" : 1744,
"targetStats" : [
{
"db" : "cust1",
"coll" : "outColl1",
"outputMessageCount" : NumberLong(3),
"outputMessageSize" : 1748
},
{
"db" : "cust2",
"coll" : "outColl2",
"outputMessageCount" : NumberLong(4),
"outputMessageSize" : 2241
}
],
...
}

Volver

Administrar conexiones VPC