Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
Atlas
/

Get Started with Atlas Stream Processing

This tutorial takes you through the steps of setting up Atlas Stream Processing and running your first stream processor.

To complete this tutorial you need:

  • An Atlas project with an empty cluster. This cluster serves as the data sink for your stream processor.

  • A database user with the atlasAdmin role to create and run stream processors

  • mongosh version 2.0 or higher

  • An Atlas user with the Project Owner or the Project Stream Processing Owner role to manage a Stream Processing Instance and Connection Registry

    Note

    The Project Owner role allows you to create database deployments, manage project access and project settings, manage IP Access List entries, and more.

    The Project Stream Processing Owner role enables Atlas Stream Processing actions such as viewing, creating, deleting, and editing stream processing instances, and viewing, adding, modifying, and deleting connections in the connection registry.

    See Project Roles to learn more about the differences between the two roles.

This tutorial guides you through creating an stream processing instance, connecting it to an existing Atlas cluster, and setting up a stream processor to ingest sample data from solar streaming devices and write the data to your connected cluster.

1
  1. In Atlas, go to the Stream Processing page for your project.

    WARNING: Navigation Improvements In Progress

    We're currently rolling out a new and improved navigation experience. If the following steps don't match your view in the Atlas UI, see the preview documentation.

    1. If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.

    2. If it's not already displayed, select your project from the Projects menu in the navigation bar.

    3. In the sidebar, click Stream Processing under the Services heading.

      The Stream Processing page displays.

  2. Click Create a workspace.

  3. On the Create a stream processing instance page, configure your instance as follows:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. Click Create.

2

Add a connection to an existing empty Atlas cluster to your connection registry. Your stream processor will use this connection as a streaming data sink.

  1. In the pane for your stream processing instance, click Configure.

  2. In the Connection Registry tab, click + Add Connection in the upper right.

  3. From the Connection Type drop-down list, click Atlas Database.

  4. In the Connection Name field, enter mongodb1.

  5. From the Atlas Cluster drop-down list, select an Atlas cluster without any data stored on it.

  6. From the Execute as drop-down list, select Read and write to any database.

  7. Click Add connection.

3

Your stream processing instance comes preconfigured with a connection to a sample data source called sample_stream_solar. This source generates a stream of reports from various solar power devices. Each report describes the observed wattage and temperature of a single solar device at a specific point in time, as well as that device's maximum wattage.

The following document represents a report from this data source:

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
}
}

To verify that this source emits messages, create a stream processor interactively using mongosh:

  1. Connect to your stream processing instance.

    Use the connection string associated with your stream processing instance to connect using mongosh.

    1. In the pane for your stream processing instance, click Connect.

    2. In the Connect to your instance dialog, select the Shell tab.

    3. Copy the connection string displayed in the dialog. It has the following format, where <atlas-stream-processing-url> is the URL of your stream processing instance and <username> is the username of a database user with the atlasAdmin role:

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. Paste the connection string into your terminal and replace the <password> placeholder with the credentials for the user.

      Press Enter to run it and connect to your stream processing instance.

  2. In the mongosh prompt, use the sp.process() method to create the stream processor interactively.

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    Verify that data from the sample_stream_solar connection displays to the console, and terminate the process.

    Stream processors you create with sp.process() don't persist after you terminate them.

4

A persistent stream processor continuously ingests, processes, and writes streaming data to a specified data sink until you drop the processor. The following stream processor is an aggregation pipeline that derives the maximum temperature and the average, maximum, and minimum wattages of each solar device across 10-second intervals, then writes the results to your connected empty cluster.

Select one of the following tabs to create a stream processor using the Atlas UI or mongosh:

To create a stream processor in the Atlas UI, go to the Stream Processing page for your Atlas project and click Configure in the pane for your stream processing instance. Then choose between using the visual builder or the JSON editor to configure a stream processor named solarDemo:

  1. Click Create with visual builder.

    The Visual Builder opens with a form where you can configure your stream processor.

  2. In the Stream processor name field, enter solarDemo.

  3. In the Source field, select sample_stream_solar from the Connection drop-down list.

    This adds the following $source stage to your aggregation pipeline:

    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    }
  4. Configure a $tumblingWindow stage.

    In the Start building your pipeline pane, click + Custom stage and copy and paste the following JSON into the text box that appears. This defines a $tumblingWindow stage with a nested $group stage that derives the maximum temperature and the maximum, minimum, and average wattages of each solar device over 10-second intervals.

    This means, for example, that when the $group stage computes a value for max_watts, it extracts the maximum value from the obs.watts values for all documents with a given group_id ingested in the previous 10 seconds.

    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [ {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }]
    }
    }
  5. In the Sink field, select mongodb1 from the Connection drop-down list.

    In the text box that appears, copy and paste the following JSON. This configures a $merge stage that writes the processed streaming data to a collection named solarColl in the solarDb database of your connected Atlas cluster:

    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
  6. Click Create stream processor.

    The stream processor is created and listed on the Stream Processors tab of the Stream Processing page.

  1. Click Use JSON editor.

    The JSON editor opens with a text box where you can configure your stream processor in JSON format.

  2. Define the stream processor.

    Copy and paste the following JSON definition into the JSON editor text box to define a stream processor named solarDemo. This stream processor uses a $tumblingWindow stage with a nested $group stage to derive the maximum temperature and the maximum, minimum, and average wattages of each solar device over 10-second intervals, then writes the results to a collection named solarColl in the solarDb database of your connected Atlas cluster.

    This means, for example, that when the $group stage computes a value for max_watts, it extracts the maximum value from the obs.watts values for all documents with a given group_id ingested in the previous 10 seconds.

    {
    "name": "solarDemo",
    "pipeline": [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
    ]
    }
    [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "avg_watts": {
    "$avg": "$obs.watts"
    },
    "max_temp": {
    "$avg": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "coll": "solarColl",
    "connectionName": "mongodb1",
    "db": "solarDb"
    }
    }
    }
    ]

Run the following commands in mongosh to create a persistent stream processor named solarDemo:

  1. Connect to your stream processing instance.

    Use the connection string associated with your stream processing instance to connect using mongosh.

    1. In the pane for your stream processing instance, click Connect.

    2. In the Connect to your instance dialog, select the Shell tab.

    3. Copy the connection string displayed in the dialog. It has the following format, where <atlas-stream-processing-url> is the URL of your stream processing instance and <username> is the username of a database user with the atlasAdmin role:

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. Paste the connection string into your terminal and replace the <password> placeholder with the credentials for the user.

      Press Enter to run it and connect to your stream processing instance.

  2. Configure a $source stage.

    Define a variable for a $source stage that ingests data from the sample_stream_solar source.

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  3. Configure a $group stage.

    Define a variable for a $group stage that derives the maximum temperature and the average, maximum, and minimum wattages of each solar device according to its group_id.

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $max: "$obs.temp"
    },
    avg_watts: {
    $avg: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  4. Configure a $tumblingWindow stage.

    In order to perform accumulations such as $group on streaming data, Atlas Stream Processing uses windows to bound the data set. Define a variable for a $tumblingWindow stage that separates the stream into consecutive 10-second intervals.

    This means, for example, that when the $group stage computes a value for max_watts, it extracts the maximum value from the obs.watts values for all documents with a given group_id ingested in the previous 10 seconds.

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  5. Configure a $merge stage.

    Define a variable for a $merge stage that writes the processed streaming data to a collection named solarColl in the solarDb database of your connected Atlas cluster.

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  6. Create the stream processor.

    Use the sp.createStreamProcessor() method to assign a name to your new stream processor and declare its aggregation pipeline. The $group stage belongs to the nested pipeline of the $tumblingWindow, and you must not include it in the processor pipeline definition.

    sp.createStreamProcessor("solarDemo", [s, t, m])

    This creates a stream processor named solarDemo that applies the previously defined query and writes the processed data to the solarColl collection of the solarDb database on the cluster you connected to. It returns various measurements derived from 10-second intervals of observations from your solar devices.

    To learn more about how Atlas Stream Processing writes to at-rest databases, see $merge (Stream Processing).

5

In the list of stream processors for your stream processing instance, click the Start icon for your stream processor.

Use the sp.processor.start() method in mongosh:

sp.solarDemo.start()
6

To verify that the stream processor is writing data to your Atlas cluster:

  1. In Atlas, go to the Clusters page for your project.

    WARNING: Navigation Improvements In Progress

    We're currently rolling out a new and improved navigation experience. If the following steps don't match your view in the Atlas UI, see the preview documentation.

    1. If it's not already displayed, select the organization that contains your desired project from the Organizations menu in the navigation bar.

    2. If it's not already displayed, select your desired project from the Projects menu in the navigation bar.

    3. If it's not already displayed, click Clusters in the sidebar.

      The Clusters page displays.

  2. Click the Browse Collections button for your cluster.

    The Data Explorer displays.

  3. View the MySolar collection.

To verify that the processor is active, use the sp.processor.stats() method in mongosh:

sp.solarDemo.stats()

This method reports operational statistics of the solarDemo stream processor.

You can also use the sp.processor.sample() method in mongosh to return a sampling of processed documents in the terminal.

sp.solarDemo.sample()
{
_id: 10,
max_temp: 16,
avg_watts: 232,
max_watts: 414,
min_watts: 73
}

Note

The preceding output is a representative example. Streaming data are not static, and each user sees distinct documents.

7

In the list of stream processors for your stream processing instance, click the Delete () icon for your stream processor.

In the confirmation dialog that appears, type the name of the stream processor (solarDemo) to confirm that you want to delete it, and then click Delete.

Use the sp.processor.drop() method in mongosh to drop solarDemo:

sp.solarDemo.drop()

To confirm that you have dropped solarDemo, use the sp.listStreamProcessors() method to list all your available stream processors:

sp.listStreamProcessors()

Learn how to:

Back

Overview

On this page