Get Started with Atlas Stream Processing: Creating Your First Stream Processor
Rate this tutorial
If you're not already familiar, Atlas Stream Processing enables processing high-velocity streams of complex data using the same and that's used in MongoDB Atlas databases. Streaming data is increasingly critical to building responsive, for your customers. Stream processing is a fundamental building block powering these applications, by helping to tame the firehouse of data coming from many sources, by finding important events in a stream, or by combining data in motion with data in rest.
This is what you'll need to follow along:
- An Atlas user with atlasAdmin permission. For the purposes of this tutorial, we'll have the user "tutorialuser".
Under the Services tab in the Atlas Project click, "Stream Processing". Then click the "Create Instance" button.
This will launch the Create Instance dialog.
Enter your desired cloud provider and region, and then click "Create". You will receive a confirmation dialog upon successful creation.
The connection registry stores connection information to the external data sources you wish to use within a stream processor. In this example, we will use a sample data generator that is available without any extra configuration, but typically you would connect to either Kafka or an Atlas database as a source.
To manage the connection registry, click on "Configure" to navigate to the configuration screen.
Once on the configuration screen, click on the "Connection Registry" tab.
Next, click on the "Add Connection" button. This will launch the Add Connection dialog.
From here, you can add connections to Kafka, other Atlas clusters within the project, or a sample stream. In this tutorial, we will use the Sample Stream connection. Click on "Sample Stream" and select "sample_stream_solar" from the list of available sample streams. Then, click "Add Connection".
The new "sample_stream_solar" will show up in the list of connections.
To obtain the connection string to the SPI, return to the main Stream Processing page by clicking on the "Stream Processing" menu under the Services tab.
Next, locate the "Tutorial" SPI we just created and click on the "Connect" button. This will present a connection dialog similar to what is found when connecting to MongoDB Atlas clusters.
For connecting, we'll need to add a connection IP address and create a database user, if we haven't already.
Then we'll choose our connection method. If you do not already have mongosh installed, install it using the instructions provided in the dialog.
Once mongosh is installed, copy the connection string from the "I have the MongoDB Shell installed" view and run it in your terminal.
To confirm your sample_stream_solar is added as a connection, issue
sp.listConnections(). Our connection to sample_stream_solar is shown as expected.
If you are reading through this post as a prerequisite to another tutorial, you can return to that tutorial now to continue.
In this section, we will wrap up by creating a simple stream processor to process the sample_stream_solar source that we have used throughout this tutorial. This sample_stream_solar source represents the observed energy production of different devices (unique solar panels). Stream processing could be helpful in measuring characteristics such as panel efficiency or when replacement is required for a device that is no longer producing energy at all.
.process lets us sample our source data and quickly test the stages of a stream processor to ensure that it is set up as intended. A sample of this data is as follows:
In this tutorial, we started by introducing Atlas Stream Processing and why stream processing is a building block for powering modern applications. We then walked through the basics of creating a stream processor – we created a Stream Processing Instance, configured a source in our connection registry using sample solar data (included in Atlas Stream Processing), connected to a Stream Processing Instance, and finally tested our first stream processor using .process. You are now ready to explore Atlas Stream Processing and create your own stream processors, adding advanced functionality like windowing and validation.
Leveraging OpenAI and MongoDB Atlas for Improved Search Functionality
Sep 29, 2023 | 5 min read
Configure Email/Password Authentication in MongoDB Atlas App Services
Jan 11, 2023 | 3 min read