Docs Menu
Docs Home
/ /

Comenzar con Atlas Stream Processing

Este tutorial te guía a través de los pasos para configurar Atlas Stream Processing y ejecutar tu primer procesador de streams.

Para completar este tutorial necesitas:

  • Un proyecto de Atlas con un clúster vacío. Este clúster sirve como sumidero de datos para tu procesador de flujo.

  • Un usuario de base de datos con la atlasAdmin Rol para crear y ejecutar procesadores de flujo

  • Un usuario de Atlas con el Project Owner o el rol para administrar un espacio de trabajo de procesamiento de flujo y un registro de Project Stream Processing Owner conexión

    Nota

    El Project Owner rol le permite crear implementaciones de bases de datos, administrar el acceso al proyecto y la configuración del proyecto, administrar entradas de la lista de acceso IP y más.

    El Project Stream Processing Owner rol permite realizar acciones de Atlas Stream Processing tales como ver, crear, borrar y editar espacios de trabajo de stream processing, así como ver, añadir, modificar y borrar conexiones en el registro de conexiones.

    Consulta Roles del proyecto para obtener más información sobre las diferencias entre los dos roles.

Este tutorial te guía para crear un espacio de trabajo de procesamiento de flujos, conectarlo a un clúster existente de Atlas, y configurar un procesador de flujos para ingerir datos de ejemplo desde dispositivos solares de transmisión y guardar los datos en tu clúster conectado.

1
  1. En Atlas, vaya a la Stream Processing Página para su proyecto.

    1. Si aún no aparece, se debe seleccionar la organización que contiene el proyecto en el menú Organizations de la barra de navegación.

    2. Si aún no se muestra, seleccione su proyecto en el menú Projects de la barra de navegación.

    3. En la barra lateral, haz clic en Stream Processing en la sección Streaming Data.

      Se muestra la página Procesamiento de transmisión.

  2. Haga clic en Create a workspace.

  3. En la página Create a stream processing workspace, configure su espacio de trabajo de la siguiente manera:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Workspace Name: tutorialWorkspace

  4. Haga clic en Create.

2

Agregue una conexión a un clúster Atlas vacío existente a su registro de conexiones. Su procesador de flujo usará esta conexión como receptor de datos de transmisión.

  1. En el panel de su espacio de trabajo de procesamiento de flujo, haga clic en Manage.

  2. En la pestaña Connection Registry, haga clic en + Add Connection en la parte superior derecha.

  3. En la lista desplegable Connection Type, haga clic en Atlas Database.

  4. En el campo Connection Name, introduce mongodb1.

  5. En la lista desplegable Atlas Cluster, seleccione un clúster Atlas sin datos almacenados en él.

  6. En la lista desplegable Execute as, seleccione Read and write to any database.

  7. Haga clic en Add connection.

3

Su espacio de trabajo de procesamiento de flujos viene preconfigurado con una conexión a una fuente de datos de muestra llamada sample_stream_solar. Esta fuente genera un flujo de informes de varios dispositivos solares. Cada informe describe la potencia y la temperatura observadas de un dispositivo solar en un momento específico, así como su potencia máxima.

El siguiente documento representa un informe de esta fuente de datos:

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
}
}

Para verificar que esta fuente emite mensajes, cree un procesador de flujo de forma interactiva mongosh utilizando:

  1. Conéctese a su espacio de trabajo de procesamiento de flujo.

    Utilice la cadena de conexión asociada con su espacio de trabajo de procesamiento de flujo para conectarse mongosh usando.

    1. En el panel de su espacio de trabajo de procesamiento de flujo, haga clic en Connect.

    2. En el diálogo de conexión del espacio de trabajo, haz clic en Choose a connection method y luego selecciona la pestaña Shell.

    3. Copie la cadena de conexión que se muestra en el cuadro de diálogo. Tiene el siguiente formato:<atlas-stream-processing-url> es la URL de su espacio de trabajo de procesamiento de flujos y <username> es el nombre de usuario de la base de datos con el atlasAdmin rol.

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. Pegue la cadena de conexión en su terminal y reemplace el marcador <password> con las credenciales del usuario.

      Presione Enter para ejecutarlo y conectarse a su espacio de trabajo de procesamiento de transmisión.

  2. En el indicador,mongosh sp.process() utilice el método para crear el procesador de flujo de forma interactiva.

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    Verifique que los datos de la conexión sample_stream_solar se muestren en la consola y finalice el proceso.

    Los procesadores de flujo que cree con sp.process() no persisten después de finalizarlos.

4

Un procesador de flujo persistente ingiere, procesa y escribe continuamente datos de flujo en un receptor de datos específico hasta que se desactiva. El siguiente procesador de flujo es una canalización de agregación que obtiene la temperatura máxima y los vatajes promedio, máximo y mínimo de cada dispositivo solar en 10intervalos de segundos y luego escribe los resultados en el clúster vacío conectado.

Seleccione una de las siguientes pestañas para crear un procesador de flujo mediante la interfaz de usuario de Atlas mongosh o:

Para crear un procesador de flujo en la interfaz de usuario de Atlas, vaya a la página Stream Processing de su proyecto Atlas y haga clic en Manage en el panel de su espacio de trabajo de procesamiento de flujo. A continuación, elija entre usar el constructor visual o el editor JSON para configurar un procesador de flujo llamado solarDemo:

  1. Haga clic en Create with visual builder.

    El Visual Builder se abre con un formulario donde puede configurar su procesador de flujo.

  2. En el campo Stream processor name, introduce solarDemo.

  3. En el campo Source, seleccione sample_stream_solar de la lista desplegable Connection.

    Esto agrega la siguiente etapa a su canal de $source agregación:

    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    }
  4. Configurar una $tumblingWindow etapa.

    En el panel Start building your pipeline, haga clic en + Custom stage y copie y pegue lo siguiente JSON en el cuadro de texto que aparece. Esto define una etapa $tumblingWindow $group con una etapa anidada que deriva la temperatura máxima y las potencias máxima, mínima y promedio de cada dispositivo 10solar en intervalos de segundos.

    Esto significa, por ejemplo, que cuando la etapa $group calcula un valor para max_watts, extrae el valor máximo de los valores obs.watts para todos los documentos con un group_id determinado ingerido en los 10 segundos anteriores.

    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [ {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }]
    }
    }
  5. En el campo Sink, seleccione mongodb1 de la lista desplegable Connection.

    En el cuadro de texto que aparece, copie y pegue el siguiente JSON. Esto configura una etapa que escribe los datos de streaming procesados ​​en una colección $merge llamada solarColl en la solarDb base de datos de su clúster Atlas conectado:

    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
  6. Haga clic en Create stream processor.

    El procesador de flujo se crea y aparece en la pestaña Stream Processors de la página Stream Processing.

  1. Haga clic en Use JSON editor.

    El editor JSON se abre con un cuadro de texto donde puedes configurar tu procesador de flujo en formato JSON.

  2. Defina el procesador de flujo.

    Copie y pegue la siguiente definición JSON en el cuadro de texto del editor JSON para definir un procesador de flujo llamado solarDemo. Este procesador de flujo utiliza una etapa$tumblingWindowcon una etapa$groupanidada para obtener la temperatura máxima y los vatajes máximo, mínimo y promedio de cada dispositivo solar en intervalos de 10segundos. Luego, escribe los resultados en una colección llamada solarColl en la base de datos Cluster0 de su clúster Atlas conectado.

    Esto significa, por ejemplo, que cuando la etapa $group calcula un valor para max_watts, extrae el valor máximo de los valores obs.watts para todos los documentos con un group_id determinado ingerido en los 10 segundos anteriores.

    {
    "name": "solarDemo",
    "pipeline": [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "Cluster0",
    "coll": "solarColl"
    },
    "parallelism":16,
    }
    }
    ]
    }

Ejecute los siguientes comandos en para crear un procesador de flujo mongosh persistente solarDemo llamado:

  1. Conéctese a su espacio de trabajo de procesamiento de flujo.

    Utilice la cadena de conexión asociada con su espacio de trabajo de procesamiento de flujo para conectarse mongosh usando.

    1. En el panel de su espacio de trabajo de procesamiento de flujo, haga clic en Connect.

    2. En el cuadro de diálogo Connect to your workspace, seleccione la pestaña Shell.

    3. Copie la cadena de conexión que se muestra en el cuadro de diálogo. Tiene el siguiente formato:<atlas-stream-processing-url> es la URL de su espacio de trabajo de procesamiento de flujos y <username> es el nombre de usuario de la base de datos con el atlasAdmin rol.

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. Pegue la cadena de conexión en su terminal y reemplace el marcador <password> con las credenciales del usuario.

      Presione Enter para ejecutarlo y conectarse a su espacio de trabajo de procesamiento de transmisión.

  2. Configurar una $source etapa.

    Define una variable para una etapa $source que ingiera datos de la fuente sample_stream_solar.

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  3. Configurar una $group etapa.

    Defina una variable para una etapa $group que derive la temperatura máxima y los vatajes promedio, máximo y mínimo de cada dispositivo solar de acuerdo con su group_id.

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $max: "$obs.temp"
    },
    avg_watts: {
    $avg: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  4. Configurar una $tumblingWindow etapa.

    Para realizar acumulaciones como $group en datos de streaming, Atlas Stream Processing utiliza ventanas para delimitar el conjunto de datos. Defina una variable para una $tumblingWindow etapa que divida el flujo en 10intervalos consecutivos de segundos.

    Esto significa, por ejemplo, que cuando la etapa $group calcula un valor para max_watts, extrae el valor máximo de los valores obs.watts para todos los documentos con un group_id determinado ingerido en los 10 segundos anteriores.

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  5. Configurar una etapa $merge.

    Define una variable para una etapa de $merge que guarda los datos procesados por transmisión a una colección denominada solarColl en la base de datos solarDb de tu clúster conectado de Atlas.

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  6. Crear el procesador de flujo.

    Utilice el método sp.createStreamProcessor() para asignar un nombre a su nuevo procesador de flujo y declarar su pipeline de agregación. La etapa $group pertenece a la pipeline anidada del $tumblingWindow, y no debe incluirse en la definición de pipeline del procesador.

    sp.createStreamProcessor("solarDemo", [s, t, m])

    Esto crea un procesador de flujo llamado solarDemo que aplica la consulta definida previamente y escribe los datos procesados ​​en la colección solarColl de la base de datos solarDb del clúster al que se conectó. Devuelve diversas mediciones derivadas de intervalos de 10segundos de observaciones de sus dispositivos solares.

    Para obtener más información sobre cómo Atlas Stream Processing escribe en bases de datos en reposo, consulte $merge (Procesamiento de flujo).

5

En la lista de procesadores de flujo para tu espacio de trabajo de stream processing, haz clic en el icono Start para tu procesador de flujo.

Utilice el sp.processor.start() método mongosh en:

sp.solarDemo.start()
6

Para verificar que el procesador de flujo esté escribiendo datos en su clúster Atlas:

  1. En Atlas, se debe ir a la página Clusters del proyecto.

    1. Si aún no se muestra, seleccione la organización que contiene su proyecto deseado en el menú Organizations de la barra de navegación.

    2. Si aún no aparece, selecciona el proyecto deseado en el menú Projects de la barra de navegación.

    3. En la barra lateral, haz clic en Clusters en la sección Database.

      La página de clústeres se muestra.

  2. En Atlas, se debe ir a la página Data Explorer del proyecto.

    1. Si aún no aparece, se debe seleccionar la organización que contiene el proyecto en el menú Organizations de la barra de navegación.

    2. Si aún no se muestra, seleccione su proyecto en el menú Projects de la barra de navegación.

    3. En la barra lateral, haz clic en Data Explorer en la sección Database.

      El Data Explorer se muestra.

    Nota

    También puede ir a la página Clusters y hacer clic en Data Explorer debajo del encabezado Shortcuts.

  3. Ver la colección MySolar.

Para verificar que el procesador esté activo, utilice el sp.processor.stats() método mongosh en:

sp.solarDemo.stats()

Este método informa las estadísticas operativas del procesador de flujo solarDemo.

También puede utilizar el sp.processor.sample() método en para devolver una muestra de documentos procesados ​​en la mongosh terminal.

sp.solarDemo.sample()
{
_id: 10,
max_temp: 16,
avg_watts: 232,
max_watts: 414,
min_watts: 73
}

Nota

El resultado anterior es un ejemplo representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.

7

En la lista de procesadores de flujo de su espacio de trabajo de procesamiento de flujo, haga clic en el Delete ícono ( ) para su procesador de flujo.

En el cuadro de diálogo de confirmación que aparece, escriba el nombre del procesador de flujo (solarDemo) para confirmar que desea eliminarlo y luego haga clic en Delete.

Utilice el método en sp.processor.drop() para mongosh solarDemoeliminar:

sp.solarDemo.drop()

Para confirmar que ha solarDemo descartado, utilice el método para enumerar todos los procesadores de flujo sp.listStreamProcessors() disponibles:

sp.listStreamProcessors()

Aprenda a:

Volver

Atlas Stream Processing

En esta página