Este tutorial te guía a través de los pasos para configurar Atlas Stream Processing y ejecutar tu primer procesador de streams.
Requisitos previos
Para completar este tutorial necesitas:
Un proyecto de Atlas con un clúster vacío. Este clúster sirve como sumidero de datos para tu procesador de flujo.
Un usuario de base de datos con el
atlasAdminoreadWriteAnyDatabaserol, o un usuario o rol al que asigne privilegios de procesamiento de flujo para crear y ejecutar procesadores de flujoUn Atlas user con el rol de
Organization Owner,Project OwneroProject Stream Processing Ownerpara gestionar un Espacio de trabajo de Stream Processing Workspace y el Registro de conexionesNota
El rol
Project Ownerpermite crear implementaciones de la base de datos, gestionar el acceso al Proyecto y los ajustes del Proyecto, gestionar las entradas de la lista de acceso IP y mucho más.El
Project Stream Processing Ownerrol permite realizar acciones de Atlas Stream Processing tales como ver, crear, borrar y editar espacios de trabajo de stream processing, así como ver, añadir, modificar y borrar conexiones en el registro de conexiones.Consulta Roles del Proyecto para aprender más sobre las diferencias entre ambos roles.
Procedimiento
Este tutorial te guía para crear un espacio de trabajo de procesamiento de flujos, conectarlo a un clúster existente de Atlas, y configurar un procesador de flujos para ingerir datos de ejemplo desde dispositivos solares de transmisión y guardar los datos en tu clúster conectado.
Cree un espacio de trabajo de Stream Processing.
En Atlas, ve a Stream Processing página para tu proyecto.
Si aún no aparece, se debe seleccionar la organización que contiene el proyecto en el menú Organizations de la barra de navegación.
Si aún no se muestra, seleccione su proyecto en el menú Projects de la barra de navegación.
En la barra lateral, haz clic en Stream Processing en la sección Streaming Data.
Se muestra la página Stream Processing.
Haga clic en Create a workspace.
En la página Create a stream processing workspace, configura tu espacio de trabajo de la siguiente manera:
Tier:
SP30Provider:
AWSRegion:
us-east-1Workspace Name:
tutorialWorkspace
Haga clic en Create.
Agregar una conexión de sumidero al registro de conexiones.
Agrega una conexión a un clúster Atlas vacío existente a tu registro de conexiones. Tu procesador de flujo usará esta conexión como un receptor de datos en transmisión.
En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Manage.
En la pestaña Connection Registry, haz clic en + Add Connection en la parte superior derecha.
En la lista desplegable Connection Type, haz clic en Atlas Database.
En el campo Connection Name, introduce
mongodb1.Desde la lista desplegable Atlas Cluster, selecciona un clúster de Atlas sin ningún dato almacenado en él.
En la lista desplegable Execute as, selecciona Read and write to any database.
Haga clic en Add connection.
Verifica que tu fuente de datos de transmisión emita mensajes.
Tu espacio de trabajo de Stream Processing viene preconfigurado con una conexión a una fuente de datos de muestra llamada sample_stream_solar. Esta fuente genera un flujo de informes de varios dispositivos de energía solar. Cada informe describe la potencia observada y la temperatura de un único dispositivo solar en un punto específico en el tiempo, así como la potencia máxima de ese dispositivo.
El siguiente documento representa un informe de esta fuente de datos:
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 } }
Para verificar que esta fuente emite mensajes, crea un procesador de flujos interactivamente usando mongosh:
Conéctate a tu espacio de trabajo de stream processing.
Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando
mongosh.En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Connect.
En el diálogo de conexión del espacio de trabajo, haz clic en Choose a connection method y luego selecciona la pestaña Shell.
Copie la cadena de conexión que aparece en el cuadro de diálogo. Tiene el siguiente formato, donde
<atlas-stream-processing-url>es la URL de tu espacio de trabajo de Stream Processing y<username>es el nombre de usuario de un usuario de base de datos con el rolatlasAdmin:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Pega la cadena de conexión en tu terminal y reemplaza el marcador
<password>por las credenciales del usuario.Presiona Enter para ejecutarlo y conecta a tu espacio de trabajo de Stream Processing.
En el mensaje
mongosh, utiliza el métodosp.process()para crear el procesador de flujo de forma interactiva.sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) Verifica que los datos de la conexión
sample_stream_solarse muestren en la consola y termina el proceso.Los procesadores de flujo que creas con
sp.process()no persisten después de que los terminas.
Crea un procesador de flujo persistente.
Un procesador de flujo persistente ingiere, procesa y escribe continuamente datos en transmisión en un destino de datos especificado hasta que descartes el procesador. El siguiente procesador de flujos es un Pipeline de Agregación que determina la temperatura máxima, así como el promedio, el máximo y el mínimo de la potencia de cada dispositivo solar en intervalos de 10segundos, y luego guarda los resultados en el cluster vacío conectado.
Seleccione una de las siguientes pestañas para crear un procesador de flujos usando la interfaz de usuario de Atlas o mongosh:
Para crear un procesador de Stream Processing en la Interfaz de Usuario de Atlas, vaya a la página Stream Processing de su Proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de Stream Processing. Luego elige entre usar el editor visual o el editor JSON para configurar un procesador de flujos llamado solarDemo:
Haga clic en Create with visual builder.
El Constructor Visual se abre con un formulario donde puedes configurar tu procesador de flujo.
En el campo Stream processor name, introduce
solarDemo.En el campo Source, selecciona
sample_stream_solarde la lista desplegable Connection.Esto añade la siguiente
$sourceetapa a tu pipeline de agregación:{ "$source": { "connectionName": "sample_stream_solar" } } Configura una etapa
$tumblingWindow.En el panel Start building your pipeline, haz clic en + Custom stage y copia y pega lo siguiente JSON en el cuadro de texto que aparece. Esto define una etapa
$tumblingWindowcon una etapa$groupanidada que obtiene la temperatura máxima y la potencia máxima, mínima y media de cada dispositivo solar en intervalos de 10 segundos.Esto significa, por ejemplo, que si la etapa
$groupcalcula un valor paramax_watts, extrae el valor máximo de los valoresobs.wattsde todos los documentos con ungroup_iddado ingeridos en los 10 segundos anteriores.{ "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } }] } } En el campo Sink, selecciona
mongodb1de la lista desplegable Connection.En el cuadro de texto que aparece, copia y pega el siguiente JSON. Esto configura una etapa
$mergeque escribe los datos de transmisión procesados en una colección llamadasolarCollen la base de datossolarDbde tu clúster Atlas conectado:{ "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } Haga clic en Create stream processor.
El procesador de flujo se crea y se muestra en la pestaña Stream Processors de la página Stream Processing.
Haga clic en Use JSON editor.
El editor JSON se abre con un cuadro de texto donde puede configurar su procesador de flujo en formato JSON.
Define el procesador de flujo.
Copie y pegue la siguiente definición JSON en el cuadro de texto del editor JSON para definir un procesador de flujo llamado
solarDemo. Este procesador de flujo utiliza una etapa$tumblingWindowcon una etapa anidada$grouppara derivar la temperatura máxima y la potencia máxima, mínima y promedio de cada dispositivo solar en intervalos de 10segundos, y luego guardar los resultados en una colección llamadasolarCollen la base de datosCluster0del clúster Atlas conectado.Esto significa, por ejemplo, que si la etapa
$groupcalcula un valor paramax_watts, extrae el valor máximo de los valoresobs.wattsde todos los documentos con ungroup_iddado ingeridos en los 10 segundos anteriores.{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "Cluster0", "coll": "solarColl" }, "parallelism":16, } } ] }
Ejecuta los siguientes comandos en mongosh para crear un procesador de flujo persistente llamado solarDemo:
Conéctate a tu espacio de trabajo de stream processing.
Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando
mongosh.En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Connect.
En el cuadro de diálogo Connect to your workspace, selecciona la pestaña Shell.
Copie la cadena de conexión que aparece en el cuadro de diálogo. Tiene el siguiente formato, donde
<atlas-stream-processing-url>es la URL de tu espacio de trabajo de Stream Processing y<username>es el nombre de usuario de un usuario de base de datos con el rolatlasAdmin:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Pega la cadena de conexión en tu terminal y reemplaza el marcador
<password>por las credenciales del usuario.Presiona Enter para ejecutarlo y conecta a tu espacio de trabajo de Stream Processing.
Configura una etapa
$source.Defina una variable para una etapa
$sourceque ingiera datos de la fuentesample_stream_solar.let s = { source: { connectionName: "sample_stream_solar" } } Configure una etapa
$group.Defina una variable para una etapa
$groupque derive la temperatura máxima y los valores promedio, máximo y mínimo de potencia de cada dispositivo solar según sugroup_id.let g = { group: { _id: "$group_id", max_temp: { $max: "$obs.temp" }, avg_watts: { $avg: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } Configura una etapa
$tumblingWindow.Para realizar acumulaciones como
$groupen datos de transmisión, Atlas Stream Processing utiliza ventanas para delimitar el conjunto de datos. Define una variable para una etapa$tumblingWindowque separe la transmisión en intervalos consecutivos de 10 segundos.Esto significa, por ejemplo, que si la etapa
$groupcalcula un valor paramax_watts, extrae el valor máximo de los valoresobs.wattsde todos los documentos con ungroup_iddado ingeridos en los 10 segundos anteriores.let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } Configure un escenario de $merge.
Define una variable para una etapa de
$mergeque guarda los datos procesados por transmisión a una colección denominadasolarCollen la base de datossolarDbde tu clúster conectado de Atlas.let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } Crea el procesador de flujos.
Utilice el método
sp.createStreamProcessor()para asignar un nombre a su nuevo procesador de flujo y declarar su pipeline de agregación. La etapa$grouppertenece a la pipeline anidada del$tumblingWindow, y no debe incluirse en la definición de pipeline del procesador.sp.createStreamProcessor("solarDemo", [s, t, m]) Esto crea un procesador de flujo llamado
solarDemoque aplica la previamente definida query y guarda los datos procesados en la colecciónsolarCollde la base de datossolarDben el clúster al que se conectó. Devuelve diversas mediciones derivadas de intervalos de observación de 10segundos de tus dispositivos solares.Para obtener más información sobre cómo Atlas Stream Processing escribe en bases de datos en reposo, consulta
$merge(Stream Processing).
Inicia el procesador de flujos.
En la lista de procesadores de flujo para tu espacio de trabajo de stream processing, haz clic en el icono Start para tu procesador de flujo.
Usa el método sp.processor.start() en mongosh:
sp.solarDemo.start()
Verifique la salida del procesador de flujo.
Para verificar que el procesador de flujo esté desplegando datos en su clúster de Atlas:
En Atlas, se debe ir a la página Clusters del proyecto.
Si aún no se muestra, seleccione la organización que contiene su proyecto deseado en el menú Organizations de la barra de navegación.
Si aún no aparece, selecciona el proyecto deseado en el menú Projects de la barra de navegación.
En la barra lateral, haz clic en Clusters en la sección Database.
La página de clústeres se muestra.
En Atlas, se debe ir a la página Data Explorer del proyecto.
Si aún no aparece, se debe seleccionar la organización que contiene el proyecto en el menú Organizations de la barra de navegación.
Si aún no se muestra, seleccione su proyecto en el menú Projects de la barra de navegación.
En la barra lateral, haz clic en Data Explorer en la sección Database.
El Data Explorer se muestra.
Nota
También puedes hacer clic en el nombre de un clúster para abrir la barra lateral Cluster, y luego hacer clic en Data Explorer bajo el encabezado Shortcuts.
Mira la colección
MySolar.
Para verificar que el procesador esté activo, utiliza el método sp.processor.stats() en mongosh:
sp.solarDemo.stats()
Este método informa estadísticas operativas del procesador de flujo solarDemo.
También puedes utilizar el método sp.processor.sample() en mongosh para devolver una muestra de documentos procesados en el terminal.
sp.solarDemo.sample()
{ _id: 10, max_temp: 16, avg_watts: 232, max_watts: 414, min_watts: 73 }
Nota
El resultado anterior es un ejemplo representativo. Los datos en transmisión no son estáticos y cada usuario ve documentos distintos.
Descarta el procesador de flujos.
En la lista de procesadores de Stream Processing para tu espacio de trabajo de Stream Processing, haz clic en el icono Delete () de tu procesador de Stream Processing.
En el diálogo de confirmación que aparece, escribe el nombre del procesador de flujo (solarDemo) para confirmar que deseas borrarlo y luego haz clic en Delete.
Usa el método sp.processor.drop() en mongosh para descartar solarDemo:
sp.solarDemo.drop()
Para confirmar que haya descartado solarDemo, utilice el sp.listStreamProcessors() método para enumerar todos sus procesadores de flujo disponibles:
sp.listStreamProcessors()
Próximos pasos
Aprende cómo: