Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Introducing MongoDB 8.0, the fastest MongoDB ever!
MongoDB Developer
Atlas
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Atlaschevron-right

Using the Confluent Cloud with Atlas Stream Processing

Robert Walters5 min read • Published May 02, 2024 • Updated May 02, 2024
Stream ProcessingAtlas
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Atlas Stream Processing is now available. Learn more about it here.
Apache Kafka is a massively popular streaming platform today. It is available in the open-source community and also as software (e.g., Confluent Platform) for self-managing. Plus, you can get a hosted Kafka (or Kafka-compatible) service from a number of providers, including AWS Managed Streaming for Apache Kafka (MSK), RedPanda Cloud, and Confluent Cloud, to name a few.
In this tutorial, we will configure network connectivity between MongoDB Atlas Stream Processing instances and a topic within the Confluent Cloud. By the end of this tutorial, you will be able to process stream events from Confluent Cloud topics and emit the results back into a Confluent Cloud topic.
Confluent Cloud dedicated clusters support connectivity through secure public internet endpoints with their Basic and Standard clusters. Private network connectivity options such as Private Link connections, VPC/VNet peering, and AWS Transit Gateway are available in the Enterprise and Dedicated cluster tiers.
Note: At the time of this writing, Atlas Stream Processing only supports internet-facing Basic and Standard Confluent Cloud clusters. This post will be updated to accommodate Enterprise and Dedicated clusters when support is provided for private networks.
The easiest way to get started with connectivity between Confluent Cloud and MongoDB Atlas is by using public internet endpoints. Public internet connectivity is the only option for Basic and Standard Confluent clusters. Rest assured that Confluent Cloud clusters with internet endpoints are protected by a proxy layer that prevents types of DoS, DDoS, SYN flooding, and other network-level attacks. We will also use authentication API keys with the SASL_SSL authentication method for secure credential exchange.
In this tutorial, we will set up and configure Confluent Cloud and MongoDB Atlas for network connectivity and then work through a simple example that uses a sample data generator to stream data between MongoDB Atlas and Confluent Cloud.

Tutorial prerequisites

This is what you’ll need to follow along:
  • An Atlas project (free or paid tier)
  • An Atlas database user with atlasAdmin permission
    • For the purposes of this tutorial, we’ll have the user “tutorialuser.”
  • MongoDB shell (Mongosh) version 2.0+
  • Confluent Cloud cluster (any configuration)

Configure Confluent Cloud

For this tutorial, you need a Confluent Cloud cluster created with a topic, “solardata,” and an API access key created. If you already have this, you may skip to Step 2.
To create a Confluent Cloud cluster, log into the Confluent Cloud portal, select or create an environment for your cluster, and then click the “Add Cluster” button.
In this tutorial, we can use a Basic cluster type.
Confluent Cloud create cluster
Once your cluster is created, create an API key by clicking on the “API Keys” menu under the Cluster Overview on the left side of the page.
Confluent Cloud API keys dialog
Click on “Create Key” and provide a description for your key pair as shown below.
Confluent Cloud Create key dialog
Make a note of the API key and password before you download and continue. You will need these when creating the connection in Atlas Stream Processing. Note that Confluent OAuth and Confluent Single Sign-on are not supported as authentication methods in Atlas Stream Processing.
Next, create a topic by clicking on the “Topics” menu item and then the “Add topic” button. Accept the default settings and give the topic a name: “solardata.” We are now ready to configure MongoDB Atlas Stream Processing.

Configure Atlas Stream Processing Connection Registry

In MongoDB Atlas, click on “Stream Processing” from the Services menu. Next, click on the “Create Instance” button. Provide a name, cloud provider, and region. Note: For a lower network cost, choose the cloud provider and region that matches your Confluent Cloud cluster. In this tutorial, we will use AWS us-east-1 for both Confluent Cloud and MongoDB Atlas.
The Atlas Stream Processing Stream Processor Instance dashboard
Once the Stream Processing Instance (SPI) is created, we can create our connection to the Confluent Cloud using the Connection Registry. Click on “Configure,” and then click on the “Connection Registry” tab as shown below.
Configure Atlas Stream Processor Instance dashboard
To create the connection to the Confluent Cloud, click on “Add Connection.”
Select, “Kafka” and enter “confluentcloud” for the connection name. Fill out the following details from the information in your Confluent Cloud cluster.
  • Bootstrap server: Provided in Confluent Cloud under Cluster Settings/Endpoints
  • Security Protocol: SASL_SSL
  • SASL Mechanism: PLAIN
  • Username: Paste in the API KEY
  • Password: Paste in the API SECRET
An example of add connection dialog is shown below.
Add connection registry connection dialog
Click on “Add Connection” and your new connection to the Confluent Cloud will show up in the list.
Connection Registry connection dashboard
Next, create another connection by clicking on the “Add Connection” button. This time, we will select “Sample Stream” and “sample_stream_solar” in the drop-down as shown below.
Add sample stream connection
This will make a sample data generator called “sample_stream_solar” available in our SPI.
Next, let’s test the connectivity to Confluent and run our first Atlas Stream Processor with data from Confluent Cloud.

Create the stream processor in Atlas

Note: To connect to the SPI, you will need a database user with Atlas Admin permissions or a member of the Project Owner role. If you do not already have this, create it now before continuing this tutorial.
Connection information can be found by clicking on the “Connect” button on your SPI. The connect dialog is similar to the connect dialog when connecting to an Atlas cluster. To connect to the SPI, you will need to use the mongosh command line tool.
Connect dialog for the stream processor instance
To connect to the SPI, use the connection string provided in the connect dialog.
Once connected, you can enumerate the available connections using the sp.listConnections() command.
1AtlasStreamProcessing> sp.listConnections()
2[
3 { name: 'sample_stream_solar', type: 'inmemory' },
4 { name: 'confluentcloud', type: 'kafka' }
5]
Now that we have confirmed both our ‘sample_stream_solar’ sample data and our ‘confluentcloud’ Kafka topic are available, let’s use the solar sample source to create a streaming query that calculates the average power output and writes this to a Kafka topic “solardata”.
1s_solar={$source: { connectionName: "sample_stream_solar"}}
2
3Twindow = { $tumblingWindow: { interval: { size: NumberInt(30), unit: "second" }, pipeline: [ { $group: { _id: "$device_id", max: { $max: "$obs.watts" }, avg: { $avg: "$obs.watts" } } }] } }
4
5write_kafka={ $emit: { "connectionName": "confluentcloud", "topic" : "solardata"}}
Now that we have our pipeline variables defined, let’s use the .process to run this stream processor in the foreground.
1AtlasStreamProcessing> sp.process([s_solar,Twindow,write_kafka])
To read the topic data, open another terminal window and connect to the SPI. Define a variable for the Kafka topic as shown below.
1s_kafka={$source: { connectionName: "confluentcloud", "topic": "solardata"}}
Next, use the .process() command to read the data from the ‘solardata’ topic.
1AtlasStreamProcessing> sp.process([s_solar])
After about 30 seconds, you will see data output from the ‘solardata’ topic.

Wrapping up

In this tutorial, we used Atlas Stream Processing to create a stream processor with sample data and wrote the aggregation results to a Kafka topic in Confluent Cloud. We also streamed data from Confluent Cloud into Atlas Stream Processing and confirmed that the transformed data was written to the topic. This tutorial was done without any extra network configuration.
You might recall that by default, no network connections are allowed into Atlas. Users need to either open their cluster to the world via adding 0.0.0.0 or specify specific IP ranges. What is important to note is connections from Atlas Stream Processing originate within Atlas and connect out to the Confluent Cloud. Thus, there is no network access IP that needs to be opened or IP allowlisted.
In Confluent Cloud, there is no concept of IP filtering or IP allowlisting. For this reason, there is nothing extra to perform on the Confluent Cloud side with respect to networking configuration. At the time of this writing, September 2023, private networking options available in Confluent Cloud such as PrivateLink are not supported in Atlas Stream Processing. This tutorial will be updated when these private networking options are supported.

Learn more about MongoDB Atlas Stream Processing

For more on managing stream processors in Atlas Stream Processing, visit our documentation.
Log in today to get started. Atlas Stream Processing is now available to all developers in Atlas. Give it a try today!

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

Using the Node.js MongoDB Driver with AWS Lambda


Jan 23, 2024 | 5 min read
Tutorial

Integrate Atlas Application Services Logs Into Datadog on AWS


Sep 09, 2024 | 2 min read
Tutorial

How to Implement Databricks Workflows and Atlas Vector Search for Enhanced Ecommerce Search Accuracy


Sep 18, 2024 | 6 min read
Code Example

EHRS-Peru


Sep 11, 2024 | 3 min read
Table of Contents