MongoDB Developer
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right

Get Started with Atlas Stream Processing: Creating Your First Stream Processor

Robert Walters4 min read • Published Sep 22, 2023 • Updated Sep 22, 2023
AtlasStream Processing
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
Following this tutorial requires private preview access to Atlas Stream Processing. If you do not already have access, please request it here or learn more about Atlas Stream Processing in our announcement blog.
If you're not already familiar, Atlas Stream Processing enables processing high-velocity streams of complex data using the same data model and Query API that's used in MongoDB Atlas databases. Streaming data is increasingly critical to building responsive, event-driven experiences for your customers. Stream processing is a fundamental building block powering these applications, by helping to tame the firehouse of data coming from many sources, by finding important events in a stream, or by combining data in motion with data in rest.
In this tutorial, we will create a stream processor that uses sample data included in Atlas Stream Processing. By the end of the tutorial, you will have an operational Stream Processing Instance (SPI) configured with a stream processor. This environment can be used for further experimentation and Atlas Stream Processing tutorials in the future.
Tutorial Prerequisites
This is what you'll need to follow along:
  • An Atlas user with atlasAdmin permission. For the purposes of this tutorial, we'll have the user "tutorialuser".
  • MongoDB shell (Mongosh) version 2.0+

Create the Stream Processing Instance

Let's first create a Stream Processing Instance (SPI). Think of an SPI as a logical grouping of one or more stream processors. When created, the SPI has a connection string similar to a typical MongoDB Atlas cluster.
Under the Services tab in the Atlas Project click, "Stream Processing". Then click the "Create Instance" button.
Stream Processing Instance button
This will launch the Create Instance dialog.
Create Stream Processing Instance dialog in the Atlas UI
Enter your desired cloud provider and region, and then click "Create". You will receive a confirmation dialog upon successful creation.

Configure the connection registry

The connection registry stores connection information to the external data sources you wish to use within a stream processor. In this example, we will use a sample data generator that is available without any extra configuration, but typically you would connect to either Kafka or an Atlas database as a source.
To manage the connection registry, click on "Configure" to navigate to the configuration screen.
The Stream Processing page in Atlas will be empty until the first stream processor is created
Once on the configuration screen, click on the "Connection Registry" tab. Configuration screen with Atlas Stream Processing page in Atlas
Next, click on the "Add Connection" button. This will launch the Add Connection dialog. Add Connection screen within the Stream Processing page in Atlas
From here, you can add connections to Kafka, other Atlas clusters within the project, or a sample stream. In this tutorial, we will use the Sample Stream connection. Click on "Sample Stream" and select "sample_stream_solar" from the list of available sample streams. Then, click "Add Connection".
The new "sample_stream_solar" will show up in the list of connections. A list of your connection names and types is visible on the Stream Processing page in Atlas

Connect to the Stream Processing Instance (SPI)

Now that we have both created the SPI and configured the connection in the connection registry, we can create a stream processor. First, we need to connect to the SPI that we created previously. This can be done using the MongoDB Shell (mongosh).
To obtain the connection string to the SPI, return to the main Stream Processing page by clicking on the "Stream Processing" menu under the Services tab.
Navigate to Stream Processing within the Services pane in Atlas
Next, locate the "Tutorial" SPI we just created and click on the "Connect" button. This will present a connection dialog similar to what is found when connecting to MongoDB Atlas clusters.
In the Atlas UI, each Stream Processing Instance is visible for users to connect and configure
For connecting, we'll need to add a connection IP address and create a database user, if we haven't already.
The connection dialog for a Stream Processing Instance steps through setting up connection security, choosing a connection method, and finally, connecting to the SPI
Then we'll choose our connection method. If you do not already have mongosh installed, install it using the instructions provided in the dialog.
Stream processor connection screen in Atlas
Once mongosh is installed, copy the connection string from the "I have the MongoDB Shell installed" view and run it in your terminal.
To confirm your sample_stream_solar is added as a connection, issue sp.listConnections(). Our connection to sample_stream_solar is shown as expected.

Create a stream processor

If you are reading through this post as a prerequisite to another tutorial, you can return to that tutorial now to continue.
In this section, we will wrap up by creating a simple stream processor to process the sample_stream_solar source that we have used throughout this tutorial. This sample_stream_solar source represents the observed energy production of different devices (unique solar panels). Stream processing could be helpful in measuring characteristics such as panel efficiency or when replacement is required for a device that is no longer producing energy at all.
First, let's define a $source stage to describe where Atlas Stream Processing will read the stream data from.
Now we will issue .process to view the contents of the stream in the console. sp.process([solarstream])
.process lets us sample our source data and quickly test the stages of a stream processor to ensure that it is set up as intended. A sample of this data is as follows:

Wrapping up

In this tutorial, we started by introducing Atlas Stream Processing and why stream processing is a building block for powering modern applications. We then walked through the basics of creating a stream processor – we created a Stream Processing Instance, configured a source in our connection registry using sample solar data (included in Atlas Stream Processing), connected to a Stream Processing Instance, and finally tested our first stream processor using .process. You are now ready to explore Atlas Stream Processing and create your own stream processors, adding advanced functionality like windowing and validation.
If you enjoyed this tutorial and would like to learn more check out the MongoDB Atlas Stream Processing announcement blog post. For more on stream processors in Atlas Stream Processing, visit our documentation.
Request access to participate in the private preview and check out more Atlas Stream Processing tutorials and content on the MongoDB Developer Center!

Facebook Icontwitter iconlinkedin icon
Rate this tutorial

Leveraging OpenAI and MongoDB Atlas for Improved Search Functionality

Sep 29, 2023 | 5 min read

Configure Email/Password Authentication in MongoDB Atlas App Services

Jan 11, 2023 | 3 min read

How to Deploy MongoDB Atlas with Terraform on AWS

Jan 30, 2023 | 12 min read

Getting Started with MongoDB Atlas, NodeJS, and Azure App Service

Mar 24, 2023 | 5 min read
Table of Contents