Este tutorial te guía a través de los pasos para configurar Atlas Stream Processing y ejecutar tu primer procesador de streams.
Requisitos previos
Para completar este tutorial necesitas:
Un proyecto de Atlas con un clúster vacío. Este clúster sirve como sumidero de datos para tu procesador de flujo.
Un usuario de base de datos con la rol
atlasAdminoreadWriteAnyDatabase, o un usuario o rol al que usted asigne Privilegios de procesamiento de flujo para crear y ejecutar procesadores de flujo.Un usuario de Atlas con el rol,
Organization OwnerProject Ownero para administrar un espacio de trabajo de procesamiento de flujos y un registroProject Stream Processing Ownerde conexiones.Nota
El rol
Project Ownerpermite crear implementaciones de la base de datos, gestionar el acceso al Proyecto y los ajustes del Proyecto, gestionar las entradas de la lista de acceso IP y mucho más.El
Project Stream Processing Ownerrol permite realizar acciones de Atlas Stream Processing tales como ver, crear, borrar y editar espacios de trabajo de stream processing, así como ver, añadir, modificar y borrar conexiones en el registro de conexiones.Consulta Roles del proyecto para obtener más información sobre las diferencias entre los dos roles.
Procedimiento
Este tutorial te guía para crear un espacio de trabajo de procesamiento de flujos, conectarlo a un clúster existente de Atlas, y configurar un procesador de flujos para ingerir datos de ejemplo desde dispositivos solares de transmisión y guardar los datos en tu clúster conectado.
Cree un espacio de trabajo de Stream Processing.
En Atlas, ve a Stream Processing página para tu proyecto.
Si aún no aparece, se debe seleccionar la organización que contiene el proyecto en el menú Organizations de la barra de navegación.
Si aún no se muestra, seleccione su proyecto en el menú Projects de la barra de navegación.
En la barra lateral, haz clic en Stream Processing en la sección Streaming Data.
Se muestra la página Stream Processing.
Haga clic en Create a workspace.
En la página Create a stream processing workspace, configura tu espacio de trabajo de la siguiente manera:
Tier:
SP30Provider:
AWSRegion:
us-east-1Workspace Name:
tutorialWorkspace
Haga clic en Create.
Agregar una conexión de sumidero al registro de conexiones.
Agrega una conexión a un clúster Atlas vacío existente a tu registro de conexiones. Tu procesador de flujo usará esta conexión como un receptor de datos en transmisión.
En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Manage.
En la pestaña Connection Registry, haga clic en + Add Connection en la parte superior derecha.
En la lista desplegable Connection Type, haga clic en Atlas Database.
En el campo Connection Name, introduce
mongodb1.Desde la lista desplegable Atlas Cluster, selecciona un clúster de Atlas sin ningún dato almacenado en él.
En la lista desplegable Execute as, selecciona Read and write to any database.
Haga clic en Add connection.
Verifica que tu fuente de datos de transmisión emita mensajes.
Su espacio de trabajo de procesamiento de flujos viene preconfigurado con una conexión a una fuente de datos de muestra llamada sample_stream_solar. Esta fuente genera un flujo de informes de varios dispositivos solares. Cada informe describe la potencia y la temperatura observadas de un dispositivo solar en un momento específico, así como su potencia máxima.
El siguiente documento representa un informe de esta fuente de datos:
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 } }
Para verificar que esta fuente emite mensajes, cree un procesador de flujo de forma interactiva mongosh utilizando:
Conéctate a tu espacio de trabajo de stream processing.
Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando
mongosh.En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Connect.
En el diálogo de conexión del espacio de trabajo, haz clic en Choose a connection method y luego selecciona la pestaña Shell.
Copie la cadena de conexión que aparece en el cuadro de diálogo. Tiene el siguiente formato, donde
<atlas-stream-processing-url>es la URL de tu espacio de trabajo de Stream Processing y<username>es el nombre de usuario de un usuario de base de datos con el rolatlasAdmin:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Pegue la cadena de conexión en su terminal y reemplace el marcador
<password>con las credenciales del usuario.Presiona Enter para ejecutarlo y conecta a tu espacio de trabajo de Stream Processing.
En el mensaje
mongosh, utiliza el métodosp.process()para crear el procesador de flujo de forma interactiva.sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) Verifica que los datos de la conexión
sample_stream_solarse muestren en la consola y termina el proceso.Los procesadores de flujo que creas con
sp.process()no persisten después de que los terminas.
Crear un procesador de flujo persistente.
Un procesador de flujo persistente ingiere, procesa y escribe continuamente datos en transmisión en un destino de datos especificado hasta que descartes el procesador. El siguiente procesador de flujos es un Pipeline de Agregación que determina la temperatura máxima, así como el promedio, el máximo y el mínimo de la potencia de cada dispositivo solar en intervalos de 10segundos, y luego guarda los resultados en el cluster vacío conectado.
Seleccione una de las siguientes pestañas para crear un procesador de flujo mediante la interfaz de usuario de Atlas mongosh o:
Para crear un procesador de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo. A continuación, elija entre usar el constructor visual o el editor JSON para configurar un procesador de flujo llamado solarDemo:
Haga clic en Create with visual builder.
El Constructor Visual se abre con un formulario donde puedes configurar tu procesador de flujo.
En el campo Stream processor name, introduce
solarDemo.En el campo Source, seleccione
sample_stream_solarde la lista desplegable Connection.Esto añade la siguiente
$sourceetapa a tu pipeline de agregación:{ "$source": { "connectionName": "sample_stream_solar" } } Configura una etapa
$tumblingWindow.En el panel Start building your pipeline, haga clic en + Custom stage y copie y pegue lo siguiente JSON en el cuadro de texto que aparece. Esto define una etapa
$tumblingWindow$groupcon una etapa anidada que deriva la temperatura máxima y las potencias máxima, mínima y promedio de cada dispositivo 10solar en intervalos de segundos.Esto significa, por ejemplo, que si la etapa
$groupcalcula un valor paramax_watts, extrae el valor máximo de los valoresobs.wattsde todos los documentos con ungroup_iddado ingeridos en los 10 segundos anteriores.{ "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } }] } } En el campo Sink, seleccione
mongodb1de la lista desplegable Connection.En el cuadro de texto que aparece, copia y pega el siguiente JSON. Esto configura una etapa
$mergeque escribe los datos de transmisión procesados en una colección llamadasolarCollen la base de datossolarDbde tu clúster Atlas conectado:{ "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } Haga clic en Create stream processor.
El procesador de flujo se crea y se muestra en la pestaña Stream Processors de la página Stream Processing.
Haga clic en Use JSON editor.
El editor JSON se abre con un cuadro de texto donde puede configurar su procesador de flujo en formato JSON.
Define el procesador de flujo.
Copie y pegue la siguiente definición JSON en el cuadro de texto del editor JSON para definir un procesador de flujo llamado
solarDemo. Este procesador de flujo utiliza una etapa$tumblingWindowcon una etapa anidada$grouppara derivar la temperatura máxima y la potencia máxima, mínima y promedio de cada dispositivo solar en intervalos de 10segundos, y luego guardar los resultados en una colección llamadasolarCollen la base de datosCluster0del clúster Atlas conectado.Esto significa, por ejemplo, que si la etapa
$groupcalcula un valor paramax_watts, extrae el valor máximo de los valoresobs.wattsde todos los documentos con ungroup_iddado ingeridos en los 10 segundos anteriores.{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "Cluster0", "coll": "solarColl" }, "parallelism":16, } } ] }
Ejecuta los siguientes comandos en mongosh para crear un procesador de flujo persistente llamado solarDemo:
Conéctate a tu espacio de trabajo de stream processing.
Utiliza la cadena de conexión asociada a tu espacio de trabajo de Stream Processing para conectarte usando
mongosh.En el panel de tu espacio de trabajo de procesamiento de flujos, haz clic en Connect.
En el cuadro de diálogo Connect to your workspace, seleccione la pestaña Shell.
Copie la cadena de conexión que aparece en el cuadro de diálogo. Tiene el siguiente formato, donde
<atlas-stream-processing-url>es la URL de tu espacio de trabajo de Stream Processing y<username>es el nombre de usuario de un usuario de base de datos con el rolatlasAdmin:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Pegue la cadena de conexión en su terminal y reemplace el marcador
<password>con las credenciales del usuario.Presiona Enter para ejecutarlo y conecta a tu espacio de trabajo de Stream Processing.
Configura una etapa
$source.Defina una variable para una etapa
$sourceque ingiera datos de la fuentesample_stream_solar.let s = { source: { connectionName: "sample_stream_solar" } } Configure una etapa
$group.Defina una variable para una etapa
$groupque derive la temperatura máxima y los valores promedio, máximo y mínimo de potencia de cada dispositivo solar según sugroup_id.let g = { group: { _id: "$group_id", max_temp: { $max: "$obs.temp" }, avg_watts: { $avg: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } Configura una etapa
$tumblingWindow.Para realizar acumulaciones como
$groupen datos de streaming, Atlas Stream Processing utiliza ventanas para delimitar el conjunto de datos. Defina una variable para una$tumblingWindowetapa que divida el flujo en 10intervalos consecutivos de segundos.Esto significa, por ejemplo, que si la etapa
$groupcalcula un valor paramax_watts, extrae el valor máximo de los valoresobs.wattsde todos los documentos con ungroup_iddado ingeridos en los 10 segundos anteriores.let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } Configure un escenario de $merge.
Define una variable para una etapa de
$mergeque guarda los datos procesados por transmisión a una colección denominadasolarCollen la base de datossolarDbde tu clúster conectado de Atlas.let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } Crear el procesador de flujo.
Utilice el método
sp.createStreamProcessor()para asignar un nombre a su nuevo procesador de flujo y declarar su pipeline de agregación. La etapa$grouppertenece a la pipeline anidada del$tumblingWindow, y no debe incluirse en la definición de pipeline del procesador.sp.createStreamProcessor("solarDemo", [s, t, m]) Esto crea un procesador de flujo llamado
solarDemoque aplica la previamente definida query y guarda los datos procesados en la colecciónsolarCollde la base de datossolarDben el clúster al que se conectó. Devuelve diversas mediciones derivadas de intervalos de observación de 10segundos de tus dispositivos solares.Para obtener más información sobre cómo Atlas Stream Processing escribe en bases de datos en reposo, consulta
$merge(Stream Processing).
Inicie el procesador de flujo.
En la lista de procesadores de flujo para tu espacio de trabajo de stream processing, haz clic en el icono Start para tu procesador de flujo.
Utilice el sp.processor.start() método mongosh en:
sp.solarDemo.start()
Verifique la salida del procesador de flujo.
Para verificar que el procesador de flujo esté desplegando datos en su clúster de Atlas:
En Atlas, se debe ir a la página Clusters del proyecto.
Si aún no se muestra, seleccione la organización que contiene su proyecto deseado en el menú Organizations de la barra de navegación.
Si aún no aparece, selecciona el proyecto deseado en el menú Projects de la barra de navegación.
En la barra lateral, haz clic en Clusters en la sección Database.
La página de clústeres se muestra.
En Atlas, se debe ir a la página Data Explorer del proyecto.
Si aún no aparece, se debe seleccionar la organización que contiene el proyecto en el menú Organizations de la barra de navegación.
Si aún no se muestra, seleccione su proyecto en el menú Projects de la barra de navegación.
En la barra lateral, haz clic en Data Explorer en la sección Database.
El Data Explorer se muestra.
Nota
También puede hacer clic en el nombre de un grupo para abrir la barra lateral Cluster y luego hacer clic en Data Explorer debajo del encabezado Shortcuts.
Ver la colección
MySolar.
Para verificar que el procesador esté activo, utiliza el método sp.processor.stats() en mongosh:
sp.solarDemo.stats()
Este método informa las estadísticas operativas del procesador de flujo solarDemo.
También puedes utilizar el método sp.processor.sample() en mongosh para devolver una muestra de documentos procesados en el terminal.
sp.solarDemo.sample()
{ _id: 10, max_temp: 16, avg_watts: 232, max_watts: 414, min_watts: 73 }
Nota
El resultado anterior es un ejemplo representativo. Los datos en transmisión no son estáticos y cada usuario ve documentos distintos.
Descarta el procesador de flujos.
En la lista de procesadores de Stream Processing para tu espacio de trabajo de Stream Processing, haz clic en el icono Delete () de tu procesador de Stream Processing.
En el cuadro de diálogo de confirmación que aparece, escriba el nombre del procesador de flujo (solarDemo) para confirmar que desea eliminarlo y luego haga clic en Delete.
Utilice el método en sp.processor.drop() para mongosh solarDemoeliminar:
sp.solarDemo.drop()
Para confirmar que haya descartado solarDemo, utilice el sp.listStreamProcessors() método para enumerar todos sus procesadores de flujo disponibles:
sp.listStreamProcessors()
Próximos pasos
Aprenda a: