Docs Menu
Docs Home
/ /

Desarrollar procesadores de flujo

Un procesador de flujo Atlas Stream Processing aplica la lógica de un procesador con nombre único. Canal de agregación de flujos a sus datos de streaming. Atlas Stream Processing guarda cada definición de procesador de flujo en almacenamiento persistente para que pueda reutilizarse. Solo puede usar un procesador de flujo determinado en el espacio de trabajo de procesamiento de flujos donde se almacena su definición.

Para crear y administrar un procesador de flujo, debe tener:

Muchos comandos de procesadores de flujo requieren que se especifique el nombre del procesador correspondiente al invocar el método. La sintaxis descrita en las siguientes secciones asume nombres estrictamente alfanuméricos. Si el nombre de su procesador de flujo incluye caracteres no alfanuméricos, como guiones (-) o puntos (.), debe escribirlo entre corchetes ([]) y comillas dobles ("") al invocar el método, como en sp.["special-name-stream"].stats().

Puede crear un procesador de flujo de forma interactiva con el sp.process() método en. Los procesadores de flujo creados de forma interactiva presentan el siguiente mongosh comportamiento:

  • Escribe documentos de salida y de cola de mensajes no entregados en el shell

  • Comienza a ejecutarse inmediatamente después de su creación.

  • Ejecutar durante 10 minutos o hasta que el usuario lo detenga

  • No persista después de parar

Los procesadores de flujo que se crean interactivamente están diseñados para la creación de prototipos. Para crear un procesador de flujo persistente, consulte Crear un procesador de flujo.

sp.process() tiene la siguiente sintaxis:

sp.process(<pipeline>)
Campo
Tipo
Necesidad
Descripción

pipeline

arreglo

Requerido

Canal de agregación de transmisiones que desea aplicar a sus datos de transmisión.

Para crear un procesador de flujo de forma interactiva:

1

Utilice la cadena de conexión asociada con su espacio de trabajo de procesamiento de flujo para conectarse mongosh usando.

Ejemplo

El siguiente comando se conecta a un espacio de trabajo de procesamiento de flujos como un usuario llamado streamOwner usando la autenticación x.059:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Proporcione su contraseña de usuario cuando se le solicite.

2

En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.

El siguiente ejemplo utiliza el tema stuff en la conexión myKafka en el registro de conexiones como el $source, coincide con los registros donde el campo temperature tiene un valor de 46 y emite los mensajes procesados al tema output de la conexión mySink en el registro de conexiones:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

El siguiente comando crea un procesador de flujo que aplica la lógica definida en pipeline.

sp.process(pipeline)

Para crear un procesador de flujo que persista hasta que lo elimines:

La API de administración de Atlas proporciona un punto final para crear un procesador de flujo.

Crear un procesador de flujo

Para crear un procesador de flujo en la interfaz de usuario de Atlas, vaya a Stream Processing página para su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo.

Puede elegir entre utilizar el Visual Builder o el EditorJSON para configurar su procesador de flujo:

1

Si hay procesadores de flujo existentes en su espacio de trabajo de procesamiento de flujo, haga clic en el botón + Create stream processor y luego seleccione Visual Builder de las opciones desplegables.

El Visual Builder se abre con un formulario donde puede configurar su procesador de flujo.

2
3

En el campo Source, seleccione una conexión de la lista desplegable Connection para usar como fuente para su procesador de flujo.

Esto abre un cuadro de texto JSON donde puede configurar la source etapa para su procesador de flujo. Para obtener más información sobre la source sintaxis de la $source etapa, consulte.

Ejemplo

La siguiente etapa source opera con datos en tiempo real de la conexión sample_stream_solar preconfigurada:

{
"$source": {
"connectionName": "sample_stream_solar"
}
}
4

En el Start building your pipeline panel, haga clic en el botón de la etapa de agregación que desee añadir a su pipeline. Se abrirá un cuadro de texto donde podrá configurar la etapa de agregación seleccionada en formato JSON.

Si su etapa de agregación no aparece en la lista, haga + Custom stage clic en para definir una etapa de agregación compatible en formato JSON. Para obtener más información sobre las etapas de agregación del procesamiento de flujos y su sintaxis, consulte Etapas de la canalización de agregación.

Ejemplo

La siguiente etapa coincide con todos los documentos en $match la sample_stream_solar secuencia preconfigurada donde el obs.watts campo es mayor 300 que:

{
"$match": {
"obs.watts": { "$gt": 300 }
}
}
5

Para añadir etapas de agregación adicionales a su pipeline, haga clic + Add stage below en el botón debajo de la última etapa y seleccione la etapa que desea añadir o haga clic en Custom stage para definir otra etapa de agregación compatible. Se abrirá un cuadro de texto donde podrá configurar la nueva etapa en formato JSON.

6

En el campo Sink, seleccione una conexión de destino de la lista desplegable Connection.

En el campo Sink, seleccione una conexión de la lista desplegable Connection para escribir los datos procesados.

Esto abre un cuadro de texto JSON donde puede configurar la merge etapa para su procesador de flujo. Para obtener más información sobre la merge sintaxis de la $merge etapa, consulte.

Ejemplo

La siguiente etapa sink escribe datos procesados ​​en la colección demoDb.demoColl en una conexión denominada demoConnection:

{
"$merge": {
"into": {
"connectionName": "demoConnection",
"db": "demoDb",
"coll": "demoColl"
}
}
}
7

El procesador de flujo se crea y aparece en la pestaña Stream Processors de la página Stream Processing.

1

Si hay procesadores de flujo existentes en su espacio de trabajo de procesamiento de flujo, haga clic en el botón + Create stream processor y luego seleccione Visual Builder de las opciones desplegables.

El editor JSON se abre con un cuadro de texto donde puedes configurar tu procesador de flujo en formato JSON.

2

Especifique la definición JSON de su procesador de flujo en el cuadro de texto del editor JSON. Esta definición debe incluir un nombre para su procesador de flujo y una canalización de agregación que comience con la etapa$sourcey termine con la etapa$merge. Puede incluir cualquier número de etapas de agregación adicionales entre las etapas $source y $merge.

Para obtener más información sobre las etapas de agregación del procesamiento de flujo y su sintaxis, consulte Etapas de la canalización de agregación.

Ejemplo

La siguiente definición JSON crea un procesador de flujo llamado que utiliza solarDemo una etapa con una $tumblingWindow etapa anidada para agregar datos en tiempo real desde la $group sample_stream_solar conexión preconfigurada en 10intervalos de segundos y escribe los datos procesados ​​en una colección en una conexión mongodb1 llamada.

{
"name": "solarDemo",
"pipeline": [
{
"$source": {
"connectionName": "sample_stream_solar"
}
},
{
"$tumblingWindow": {
"interval": {
"size": 10,
"unit": "second"
},
"pipeline": [
{
"$group": {
"_id": "$group_id",
"max_watts": { "$max": "$obs.watts" },
"min_watts": { "$min": "$obs.watts" }
}
}
]
}
},
{
"$merge": {
"into": {
"connectionName": "mongodb1",
"db": "solarDb",
"coll": "solarColl"
}
}
}
]
}

Para crear un nuevo procesador de flujo con,mongosh utilice el método. Tiene la siguiente sp.createStreamProcessor() sintaxis:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
Tipo
Necesidad
Descripción

name

string

Requerido

Nombre lógico del procesador de flujo. Debe ser único dentro del espacio de trabajo de procesamiento de flujo. Debe contener únicamente caracteres alfanuméricos.

pipeline

arreglo

Requerido

Canal de agregación de transmisiones que desea aplicar a sus datos de transmisión.

options

Objeto

Opcional

Objeto que define varias configuraciones opcionales para su procesador de flujo.

options.dlq

Objeto

Condicional

Objeto que asigna una cola de mensajes no entregados a su espacio de trabajo de procesamiento de flujos. Este campo es necesario si define el options campo.

options.dlq.connectionName

string

Condicional

Etiqueta legible que identifica una conexión en su registro de conexiones. Esta conexión debe hacer referencia a un clúster Atlas. Este campo es necesario si define el campo options.dlq.

options.dlq.db

string

Condicional

Nombre de una base de datos Atlas en el clúster especificado en options.dlq.connectionName. Este campo es necesario si se define el campo options.dlq.

options.dlq.coll

string

Condicional

Nombre de una colección en la base de datos especificada en options.dlq.db. Este campo es necesario si se define el campo options.dlq.

options.tier

string

Opcional

El nivel del pod al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador a un pod del nivel por defecto del área de trabajo de stream processing. Para obtener más información, consulta Niveles.

1

Utilice la cadena de conexión asociada con su espacio de trabajo de procesamiento de flujo para conectarse mongosh usando.

  1. En el panel de su espacio de trabajo de procesamiento de flujo, haga clic en Connect.

  2. En el cuadro de diálogo Connect to your workspace, seleccione la pestaña Shell.

  3. Copie la cadena de conexión que se muestra en el cuadro de diálogo. Tiene el siguiente formato:<atlas-stream-processing-url> es la URL de su espacio de trabajo de procesamiento de flujos y <username> es el nombre de usuario de la base de datos con el atlasAdmin rol.

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>
    --password <password>
  4. Pegue la cadena de conexión en su terminal y reemplace el marcador <password> con las credenciales del usuario. Presione Enter para ejecutarlo y conectarse a su espacio de trabajo de procesamiento de flujos.

Ejemplo

El siguiente comando se conecta a un espacio de trabajo de procesamiento de flujo como un usuario llamado streamOwner utilizando la autenticación x..059

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Proporcione su contraseña de usuario cuando se le solicite.

2

En el prompt mongosh, asigna un arreglo que contenga las etapas de agregación que deseas aplicar a una variable llamada pipeline.

El siguiente ejemplo de canalización utiliza el stuff tema en la myKafka conexión en el registro de conexión como, coincide con los registros donde $source el temperature campo tiene un valor de 46 y emite los mensajes procesados ​​al output tema de la mySink conexión en el registro de conexión:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

En el mensaje, asigne un objeto que contenga las siguientes propiedades de su mongosh DLQ:

  • Nombre de la conexión

  • Nombre de la base de datos

  • Nombre de colección

El siguiente ejemplo define un DLQ a través de la conexión cluster01, en la colección de bases de datos metadata.dlq.

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

El siguiente comando crea un procesador de flujo llamado proc01 que aplica la lógica definida en pipeline. Los documentos que generan errores durante el procesamiento se escriben en la cola de mensajes de error definida en deadLetter.

sp.createStreamProcessor("proc01", pipeline, deadLetter)

Nota

Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Al iniciar un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.

Para iniciar un procesador de flujo:

La API de administración de Atlas proporciona los puntos finales Iniciar un procesador de flujo y Iniciar un procesador de flujo con opciones para iniciar un procesador de flujo.

  • Para iniciar un procesador de flujo sin opciones, utilice Iniciar un procesador de flujo

  • Para iniciar un procesador de flujo y modificar las propiedades de la $source etapa, utilice Iniciar un procesador de flujo con opciones. Este punto final admite la modificación de las startAfter startAtOperationTime propiedades o.

Para iniciar un procesador de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo para ver la lista de procesadores de flujo definidos para él.

Luego, haga clic en el ícono Start para su procesador de flujo.

Para iniciar un procesador de flujo con,mongosh sp.processor.start() utilice el método. Tiene la siguiente sintaxis:

sp.processor.start(<options>)

Donde <options> puede ser uno de los siguientes:

Opción
Descripción

startAtOperationTime

tier

El nivel del pod al que Atlas Stream Processing asigna el procesador. Si no declaras esta opción, Atlas Stream Processing asigna el procesador a un pod del nivel por defecto del área de trabajo de stream processing. Para obtener más información, consulta Niveles.

Por ejemplo, para iniciar un procesador de flujo llamado proc01, ejecute el siguiente comando:

sp.proc01.start()
{ "ok" : 1 }

Este método devuelve { "ok": 1 } si el procesador de flujo existe y no se está ejecutando. Si se invoca sp.processor.start() para un procesador de flujo que no STOPPED es, devolverá unmongosh error.

Nota

Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Al iniciar un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.

Para detener un procesador de flujo:

La API de administración de Atlas proporciona un punto final para detener un procesador de flujo.

Detener un procesador de flujo

Para pausar un procesador de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo para ver la lista de procesadores de flujo definidos para él.

Luego, haga clic en el ícono Pause para su procesador de flujo.

Para detener un procesador de flujo existente con,mongosh sp.processor.stop() utilice el método.

Por ejemplo, para detener un procesador de flujo llamado proc01, ejecute el siguiente comando:

sp.proc01.stop()
{ "ok" : 1 }

Este método devuelve { "ok": 1 } si el procesador de flujo existe y se está ejecutando. Si se invoca sp.processor.stop() para un procesador de flujo distinto running de, devolverá unmongosh error.

Puede modificar los siguientes elementos de un procesador de flujo existente:

Para modificar un procesador de flujo, haga lo siguiente:

  1. Detener el procesador de flujo.

  2. Modificar el procesador de flujo.

  3. Reinicie el procesador de flujo.

De forma predeterminada, los procesadores modificados se restauran desde el último punto de control. Como alternativa, puede configurar resumeFromCheckpoint=false, en cuyo caso el procesador solo conserva las estadísticas resumidas. Al modificar un procesador con ventanas abiertas, estas se recalculan completamente en la canalización actualizada.

Nota

Si cambia el nombre de un procesador de flujo para el que había configurado Operator iscontainsla alerta "El estado del procesador de flujo ha fallado" mediante un (que contiene expresiones de comparación como, y otras), Atlas no activará alertas para el procesador de flujo renombrado si la expresión de comparación no coincide con el nuevo nombre. Para supervisar el procesador de flujo renombrado, reconfigure la alerta.

Cuando la configuración predeterminada resumeFromCheckpoint=true está habilitada, se aplican las siguientes limitaciones:

  • No puedes modificar la etapa $source.

  • No puedes modificar el intervalo de tu ventana.

  • No puedes eliminar una ventana.

  • Solo se puede modificar una tubería con una ventana si esa ventana tiene una etapa $group o $sort en su tubería interna.

  • No puedes cambiar un tipo de ventana existente. Por ejemplo, no puedes cambiar de $tumblingWindow a $hoppingWindow o viceversa.

  • Los procesadores con ventanas pueden reprocesar algunos datos como producto del recálculo de las ventanas.

La API de administración de Atlas proporciona un endpoint para modificar un procesador de flujo.

Modificar un procesador de flujo

Requiere mongosh v2.3.4+.

Utilice el comando sp.<streamprocessor>.modify() para modificar un procesador de flujo existente. <streamprocessor> debe ser el nombre de un procesador de flujo detenido definido para el espacio de trabajo de procesamiento de flujo actual.

Por ejemplo, para modificar un procesador de flujo llamado proc01, ejecute el siguiente comando:

sp.proc1.modify(<pipeline>, {
resumeFromCheckpoint: bool, // optional
name: string, // optional
dlq: string, // optional
}})
sp.createStreamProcessor("foo", [
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
])
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
]);
sp.foo.start();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test",
config: {
startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000)
}
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.stop();
sp.foo.modify({dlq: {}})
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$replaceRoot: {newRoot: "$fullDocument"}},
{$match: {cost: {$gt: 500}}},
{$tumblingWindow: {
interval: {unit: "day", size: 1},
pipeline: [
{$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}}
]
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.start();

Para eliminar un procesador de flujo:

La API de administración de Atlas proporciona un punto final para eliminar un procesador de flujo.

Eliminar un procesador de flujo

Para eliminar un procesador de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo para ver la lista de procesadores de flujo definidos para él.

Luego, haga clic Delete en el icono ( ) de su procesador de flujo. En el cuadro de diálogo de confirmación que aparece, escriba el nombre del procesador de flujosolarDemo () para confirmar que desea eliminarlo y haga clic Delete en.

Para eliminar un procesador de flujo existente con,mongosh utilice el sp.processor.drop() método.

Por ejemplo, para eliminar un procesador de flujo llamado proc01, ejecute el siguiente comando:

sp.proc01.drop()

Este método devuelve:

  • true Si existe el procesador de flujo.

  • false si el procesador de flujos no existe.

Cuando se elimina un procesador de flujo, se destruyen todos los recursos que Atlas Stream Processing aprovisionó para él, junto con todo el estado guardado.

Para enumerar todos los procesadores de flujo disponibles:

La API de administración de Atlas proporciona un punto final para enumerar todos los procesadores de flujo disponibles.

Lista de procesadores de flujo

Para ver la lista de procesadores de flujo definidos para su espacio de trabajo de procesamiento de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo.

Se muestra la lista de procesadores de flujo y sus estados.

Para listar todos los procesadores de flujo disponibles en el espacio de trabajo de procesamiento de flujo actual con,mongosh sp.listStreamProcessors() utilice el método. Esto devuelve una lista de documentos que contiene el nombre, la hora de inicio, el estado actual y la canalización asociada a cada procesador de flujo. Tiene la siguiente sintaxis:

sp.listStreamProcessors(<filter>)

<filter> es un documento que especifica por qué campo(s) filtrar la lista.

Ejemplo

El siguiente ejemplo muestra un valor de retorno para una solicitud sin filtrar:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

Si ejecuta el comando nuevamente en el mismo espacio de trabajo de procesamiento de flujo, filtrando por "state" de "running", verá el siguiente resultado:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

Para devolver una matriz de resultados muestreados de un procesador de flujo existente a STDOUT mongoshcon, utilice el sp.processor.sample() método. Por ejemplo, el siguiente comando muestrea de un procesador de flujo proc01 llamado.

sp.proc01.sample()

Este comando se ejecuta de forma continua hasta que se cancele usando CTRL-C, o hasta que las muestras devueltas acumulen un tamaño de 40 MB. El procesador de flujos reporta documentos inválidos en la muestra en un documento _dlqMessage del siguiente formato:

{
_dlqMessage: {
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
workspaceName: '<workspaceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

Puede utilizar estos mensajes para diagnosticar problemas de higiene de datos sin definir una recopilación de cola de mensajes inactivos.

Nota

Atlas Stream Processing descarta el estado interno de los procesadores de flujo que han estado en durante stopped 45 días o más. Al iniciar un procesador de este tipo, funciona y reporta estadísticas de forma idéntica a su ejecución inicial.

Para ver las estadísticas de un procesador de flujo:

La API de administración de Atlas proporciona un punto final para ver las estadísticas de un procesador de flujo.

Obtenga un procesador de flujo

Para ver la supervisión de tu procesador de flujos, ve a la página Stream Processing de tu Proyecto Atlas y abre la pestaña Monitoring. Luego, selecciona tu procesador de flujos en la lista desplegable Stream processor en la parte superior izquierda de la página.

Para devolver un documento que resuma el estado actual de un procesador de flujo existente con,mongosh utilice el método. Tiene la siguiente sp.processor.stats() sintaxis:

sp.<streamprocessor>.stats({options: {<options>}})

Donde options es un documento opcional con los siguientes campos:

Campo
Tipo
Descripción

scale

entero

Unidad que se utiliza para el tamaño de los elementos en la salida. De forma predeterminada, Atlas Stream Processing muestra el tamaño de los elementos en bytes. Para mostrarlo en KB, especifique un valor de scale o 1024.

verbose

booleano

Indicador que especifica el nivel de detalle del documento de salida. Si se establece en true, el documento de salida contiene un subdocumento que informa las estadísticas de cada operador individual en su canalización. El valor predeterminado es false.

El documento de salida tiene los siguientes campos:

Campo
Tipo
Descripción

ns

string

El espacio de nombres en el que se define el procesador de flujo.

stats

Objeto

Un documento que describe el estado operativo del procesador de flujo.

stats.name

string

El nombre del procesador de flujo.

stats.status

string

El estado del procesador de flujo. Este campo puede tener los siguientes valores:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor

entero

La escala en la que se muestra el campo de tamaño. Si se establece en 1, los tamaños se muestran en bytes. Si se establece en 1024, los tamaños se muestran en kilobytes.

stats.inputMessageCount

entero

Número de documentos publicados en el flujo. Un documento se considera "publicado" en el flujo una vez que pasa por la $source etapa, no cuando pasa por todo el pipeline.

stats.inputMessageSize

entero

La cantidad de bytes o kilobytes publicados en la secuencia. Los bytes se consideran "publicados" en la secuencia una vez que pasan por la $source etapa, no cuando pasan por todo el pipeline.

stats.outputMessageCount

entero

El número de documentos procesados ​​por el flujo. Un documento se considera procesado por el flujo una vez que pasa por todo el pipeline.

stats.outputMessageSize

entero

El número de bytes o kilobytes procesados ​​por el flujo. Los bytes se consideran "procesados" por el flujo una vez que pasan por toda la canalización.

stats.dlqMessageCount

entero

El número de documentos enviados a la cola de cartas muertas.

stats.dlqMessageSize

entero

La cantidad de bytes o kilobytes enviados a la cola de mensajes no entregados.

stats.changeStreamTimeDifferenceSecs

entero

La diferencia, en segundos, entre el tiempo del evento representado por el token de reanudación del flujo de cambio más reciente y el último evento en el registro de operaciones.

stats.changeStreamState

token

El token de reanudación del flujo de cambios más reciente. Solo se aplica a procesadores de flujo con un origen de flujo de cambios.

stats.latency

Documento

Estadísticas de latencia del procesador de flujo en su conjunto. Atlas Stream Processing devuelve este campo solo si se pasa la opción verbose.

stats.latency.p50

entero

La latencia estimada del percentil 50de todos los documentos procesados ​​en los últimos 30 segundos. Si su canalización incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana.

Por ejemplo, si su etapa $tumblingWindow tiene un intervalo de 5 minutos, las mediciones de latencia incluirán esos 5 minutos.

stats.latency.p99

entero

La latencia estimada del percentil 99de todos los documentos procesados ​​en los últimos 30 segundos. Si su canalización incluye una etapa de ventana, las mediciones de latencia incluyen el intervalo de la ventana.

Por ejemplo, si su etapa $tumblingWindow tiene un intervalo de 5 minutos, las mediciones de latencia incluirán esos 5 minutos.

stats.latency.start

datetime

Hora de la pared en la que comenzó la ventana de medición de 30 segundos más reciente.

stats.latency.end

datetime

Hora de la pared en la que finalizó la ventana de medición de 30 segundos más reciente.

stats.latency.unit

string

Unidad de tiempo en la que se contabiliza la latencia. Este valor siempre es microseconds.

stats.latency.count

entero

Número de documentos que el procesador de flujos ha procesado en la ventana de medición más reciente de 30 segundos.

stats.latency.sum

entero

Suma de todas las mediciones de latencia individuales, en microsegundos, tomadas en la ventana de medición de 30 segundos más reciente.

stats.stateSize

entero

La cantidad de bytes que utiliza Windows para almacenar el estado del procesador.

stats.watermark

entero

La marca de tiempo de la marca de agua actual.

stats.operatorStats

arreglo

Las estadísticas para cada operador en el pipeline del procesador. Atlas Stream Processing devuelve este campo sólo si se pasa la opción verbose.

stats.operatorStats proporciona versiones por operador de muchos campos principales stats:

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.latency

  • stats.operatorStats.stateSize

stats.operatorStats incluye los siguientes campos únicos:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTimeMillis

stats.operatorStats También incluye los siguientes campos siempre que haya pasado la opción verbose y su procesador incluya una etapa de ventana:

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats.maxMemoryUsage

entero

El uso máximo de memoria del operador en bytes o kilobytes.

stats.operatorStats.executionTimeSecs

entero

El tiempo total de ejecución del operador en segundos.

stats.minOpenWindowStartTime

fecha

La hora de inicio de la ventana mínima abierta. Este valor es opcional.

stats.maxOpenWindowStartTime

fecha

Hora de inicio de la ventana de máxima apertura. Este valor es opcional.

stats.kafkaPartitions

arreglo

stats.kafkaPartitions.partition

entero

El número de partición de temas en Apache Kafka.

stats.kafkaPartitions.currentOffset

entero

El desplazamiento en el que se encuentra el procesador de flujo para la partición especificada. Este valor es igual al desplazamiento anterior que procesó el procesador de flujo más 1.

stats.kafkaPartitions.checkpointOffset

entero

El último desplazamiento que el procesador de flujo envió al broker de Apache Kafka y el punto de control de la partición especificada. Todos los mensajes a través de este desplazamiento se registran en el último punto de control.

stats.kafkaPartitions.isIdle

booleano

La bandera que indica si la partición está inactiva. Este valor por defecto es false.

Por ejemplo, a continuación se muestra el estado de un procesador de flujo llamado proc01 en un espacio de trabajo de procesamiento de flujo llamado inst01 con tamaños de elementos mostrados en KB:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

Volver

Administrar conexiones de VPC

En esta página