EventGet 50% off your ticket to MongoDB.local London on October 2. Use code WEB50Learn more >>
MongoDB Developer
Atlas
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Atlaschevron-right

The Atlas Stream Processing Set-up Guide for Kafka Connector Users

Robert Walters15 min read • Published Aug 23, 2024 • Updated Aug 23, 2024
KafkaStream ProcessingAtlas
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
MongoDB offers two primary interfaces— the MongoDB Connector for Kafka (Kafka Connector) and Atlas Stream Processing—for integrating your operational data on MongoDB with streaming data from Apache Kafka. This makes for a powerful combination when building modern, event-driven applications.
This post will cover:
  • The basics of setting up Atlas Stream Processing
    • How to connect to a Kafka topic and stream data using Atlas Stream Processing
    • How to transform data with Atlas Stream Processing
    • How to stream data to MongoDB Atlas
  • A deep-dive into the Kafka ecosystem
    • Core components
    • Equivalent Kafka Connector configuration parameters that are available to you in Atlas Stream Processing
    • Current limitations of Atlas Stream Processing
Prerequisites:
  • MongoDB Atlas Account
  • An Apache Kafka Source (In this example, we’ll use Redpanda)
  • An Atlas Database
Note: This article will be updated frequently as we continue development of Atlas Stream Processing.

Comparing the Kafka Connector and Atlas Stream Processing

Apache Kafka Connect is an optional server component in an Apache Kafka deployment. This component makes streaming data between Apache Kafka and other data systems easier by providing data source-specific connectors, such as the MongoDB Connector for Apache Kafka. These connectors can connect to their respective data sources and read/write data to an Apache Kafka topic via the Kafka Connect framework.
The deployment of Kafka Connect can be complicated and costly, as it involves multiple servers supporting a distributed mode that enables the scalability and availability of connector workers. Cloud providers make hosting this infrastructure easier by providing Kafka Connect as a service. This simplifies connector management, but it also means a high per-hour cost to run the infrastructure.
In many scenarios, with MongoDB Atlas Stream Processing, you no longer need to leverage Kafka Connect when moving data to and from Apache Kafka and MongoDB. Atlas Stream Processing is a feature within MongoDB Atlas that enables continually processing, validating, and merging streaming data from sources such as Apache Kafka and MongoDB Change Streams. Atlas Stream Processing also makes reading and writing data between Apache Kafka and MongoDB simple without needing Kafka Connect or the MongoDB Connector for Apache Kafka.
In this article, we will compare and contrast the methods for moving data between Kafka and MongoDB using an example data flow. We will start with source data in Apache Kafka and process it using a window function. Then, we’ll land the data in MongoDB.
First, let’s use the MongoDB Connector for Apache Kafka. Then, we’ll set things up in Atlas Stream Processing without relying on Kafka Connect or any extra infrastructure, saving us infrastructure costs and management time.
A visual representation of a data flow in Atlas Stream Processing.

Streaming data with MongoDB Connector for Apache Kafka

You’ll need access to an Apache Kafka topic with Apache Kafka Connect and the MongoDB Connector for Apache Kafka installed. Note: If you do not have access to an environment, you can use the Docker compose script available in the MongoDB Kafka Connector source GitHub repository. Since the data resides in the Kafka topic, we define the MongoDB Connector for Apache Kafka as a sink with the following definition:
To transform your data, you’ll need to use another service such as Apache Flink to stream the output of the window function to another topic. Setting this up and configuring the necessary infrastructure is outside the scope of this article. However, you can imagine there are many moving parts to get this working.
Next, let’s take a look at MongoDB Atlas Stream Processing.

Streaming data with Atlas Stream Processing

This set up is intentionally high-level. For a step-by-step tutorial that includes additional details like security configuration, see the Get Started with Atlas Stream Processing tutorial in our documentation.

Step 1: Create the Atlas Stream Processing instance

Using the MongoDB Atlas UI, log in to your project, click “Stream Processing” from the services menu, and click the “Create Instance” button. Provide a name, “KafkaTestSPI,” and click the “Create” button.
Note: For this walkthrough, you can choose any available Stream Processing Instance (SPI) tier. Billing begins once you have running stream processor jobs, not when creating SPIs. See the documentation for more information.

Step 2: Create a connection to a hosted Kafka service

Once you have created an SPI, click “Configure” and then “Connection Registry.” Here, you will define the connections that Atlas Stream Processing will use, such as connections to Kafka brokers or other Atlas clusters within your Atlas project.
The Stream Processing Instance view in Atlas.
Next, click “Create Connection” and select “Kafka.”
Connecting to an Apache Kafka data source from the Stream Processing interface in Atlas.
Atlas Stream Processing uses the Kafka Wire Protocol and can connect to any Kafka-compliant service, such as Confluent Cloud, Redpanda Cloud, AWS MSK, or Azure Event Hubs. We support SASL_PLAINTEXT and SASL_SSL security configurations.

Step 3: Create a connection to a MongoDB Atlas cluster

Once we have added the connection to Kafka, create another connection and select “Atlas Database.” Give it a connection name and select any Atlas cluster you have available in your current project.
Connecting to an Atlas database from the Stream Processing interface in Atlas.
Now that we have defined a connection to our Kafka deployment and a connection to our Atlas cluster, we can connect to the Stream Processing instance and create stream processors.
Unlike the MongoDB Connector for Apache Kafka, no configuration files define source and sink specifically. Atlas Stream Processing uses connection registry connections for both cases. The behavior depends on whether the connection is used in a $source or $merge pipeline operator. If the connection is used as a $source, it reads data from the connection into the pipeline. If Atlas Stream Processing is writing data out to the connection, it uses $emit or $merge depending on if it's writing to Kafka or MongoDB, respectively. To demonstrate this, let’s continue the example, creating a stream processor that will read data from ($source) our Kafka hosted topic and write data ($merge) into our Atlas cluster.

Step 4: Create the Stream processor

Use the Connect button to connect to the “KafkaTestSPI” SPI that we created earlier. This will launch a familiar connection dialog, as shown below.
The connection dialog for connecting to an SPI.
Using a database user account, use the MongoDB Shell (mongosh) to connect to the SPI. Once connected, we can build the stream processor by defining variables for each stage in the pipeline, ultimately combining them when we’re ready to create the stream processor. Our Kafka topic will be populated with fictitious stock data in the form of the following value:
Let’s create the $source variable for our HostedKafka connection.
Next, let’s create a tumbling window over every five seconds of data.
Note: To get an idea of what the stream of data would look like, we can use the .process command, passing each stage of the pipeline and displaying the results to the console.
Next, let’s write this stream of data into the Atlas cluster we defined in the connection registry.
Finally, let’s create the stream processor using the createStreamProcessor command.
Then we will start the stream processor by issuing the .start command.
You can view statistics on your stream processor by using the .stats command.
Here is an example of the return result:
That’s it for moving data from Kafka to MongoDB! Moving data from MongoDB to Kafka is just as simple. Simply define a $source for your Atlas cluster and use $emit for your Kafka Connection to quickly stream data with no complex Kafka Connect deployment or configuration and lower cost.
Next, we’ll cover in-depth considerations and options available to you in more advanced Kafka configurations. Let’s start with some context about the Apache Kafka ecosystem.

Components of the Apache Kafka ecosystem

This section highlights some of the key Kafka components to keep in mind when setting up an integration between Kafka and MongoDB, but it is not intended to provide a comprehensive list of components. We have included components most relevant to stream processing use cases, but going through a full list is beyond the scope of this article. Refer to Kafka and/or Kafka Connect documentation for anything we haven’t covered.

Schema Registry

The Schema Registry is an optional component of Apache Kafka that provides a centralized repository for managing and validating schemas used in data processing and serialization. Since everything in Apache Kafka is binary, developers use the schema registry to ensure conformity of data structure, data validation, compatibility checking, versioning, and evolution, all of which are useful for downstream applications. Atlas Stream Processing does not interact with the Schema Registry at the time of this writing. Please let us know via UserVoice if you have a strong need for this.

Kafka Connect

Kafka Connect offers a method for integrating data between Kafka and external systems such as MongoDB. The MongoDB Connector for Apache Kafka is built on this method of integration. With Atlas Stream Processing, there is no need to use Kafka Connect to move data between Kafka and MongoDB.

Key and value converters

At the time of this writing, Atlas Stream Processing only supports the JSON data format. When additional formats are available, this article will be updated accordingly. If you have an urgent need, please let us know via UserVoice.

Error tolerance

The Kafka Connector has settings to determine how to deal with errors. Configuration parameters such as mongo.errors.tolerance and errors.tolerance can be used alongside their corresponding Dead Letter Queue (DLQ) configuration parameters errors.deadletterqueue.topic.name and errors.deadletterqueue.topic.replication.factor. This gives developers the ability to stop processing when an error occurs, tolerate errors without stopping the processing of messages, or write errors to a DLQ (either a Kafka topic or Atlas collection, depending on if it’s a source or sink connector). Similarly, Atlas Stream Processing supports writing errors to a DLQ using a MongoDB Atlas collection. To do this, users simply use the $validate stage to send events that do not conform to a specific schema to the DLQ.

Single Message Transforms (SMTs)

SMTs are part of Kafka Connect and transform inbound messages after a source connector has produced them and before they are written to Kafka. SMTs are also used at the sink, transforming outbound messages before sending them to the Kafka Connector. Rather than use SMTs, Atlas Stream Processing uses the MongoDB Query API and aggregation framework to transform documents. When setting up a stream processor, note that Atlas Stream Processing does not yet support custom functions to transform messages. The table below describes the equivalent aggregation pipeline operator or procedure to use for each of the built-in SMTs.
TransformDescriptionAtlas Stream Processing Support
CastCast fields or the entire key or value to a specific typeSupport coming soon via $convert.
DropDrop either a key or a value from a record and set it to null.Use the $set operator:
{ $set: { _stream.meta.key: null } },
DropHeadersDrop one or more headers from each record.Use the $pull operator:
{ $pull: { _stream.meta.headers: { k: 'header2' } } }
EventRouterRoute Debezium outbox events using a regex configuration option.Not supported.
ExtractFieldExtract the specified field from a struct when schema present, or a map in the case of schemaless data. Any null values are passed through unmodified.Atlas Stream Processing can extract just the value from a field using $project, but it will still be in JSON format with a field name.
ExtractTopicReplace the record topic with a new topic derived from its key or value.Limited support. You can assign a topic when writing back to Kafka via $emit pipeline operator. The $emit allows for an explicit topic name, a reference to a value in the document, or an expression.
Filter (Apache Kafka)Drop all records. Designed to be used in conjunction with a predicate.Use $match and $project to filter values from the stream.
FlattenFlatten a nested data structure. This generates names for each field by concatenating the field names at each level with a configurable delimiter character.Use $addFields with references to the nested field you desire to flatten. If you wish to use a delimiter like a period, simply concatenate using $concat as follows:
{ $addFields: { fullName: { $concat: ["$name", ".", "$lastname"] } } }
GzipDecompressGzip-decompress the entire byteArray key or value input.Not supported.
HeaderFromMoves or copies fields in a record key or value into the record’s header.Use $emit.config.headers to specify the Kafka header. Note: you can reference record keys or values by using the $ symbol. This is used to denote field paths and expressions within aggregation pipeline stages and queries. Example: $emit.config.headers = "$foo"
HoistFieldWrap data using the specified field name in a struct when schema present, or a map in the case of schemaless data.Currently, event data must be in JSON format.
InsertFieldInsert field using attributes from the record metadata or a configured static value.Use $addFields or $project to create a new field.
InsertHeaderInsert a literal value as a record header.Use $emit.config.headers. Value must resolve to a document or array structured like [{k': .., 'v': …}, {'k': …, 'v': …}].
MaskFieldMask specified fields with a valid null value for the field type.Use $set to provide a null or any arbitrary value to a field.
MessageTimeStampRouterUpdate the record’s topic field as a function of the original topic value and the record’s timestamp field.Not supported.
RegexRouterUpdate the record topic using the configured regular expression and replacement string.You can modify the topic names by specifying the topic name by string or regex in the $emit operator.
ReplaceFieldFilter or rename fields.To drop a field, use $unset or $project. To rename a field, use $project.
SetSchemaMetadataSet the schema name, version, or both on the record’s key or value schema.Not supported.
TimestampConverterConvert timestamps between different formats such as Unix epoch, strings, and Connect Date and Timestamp types.Convert date and time using aggregation operators $dateFromString and Timestamp.
TimestampRouterUpdate the record’s topic field as a function of the original topic value and the record timestamp.Not supported.
TopicRegexRouterUpdate the record topic using the configured regular expression and replacement string.You can modify the topic names by specifying the topic name by string or regex in the $emit operator.
ValueToKeyReplace the record key with a new key formed from a subset of fields in the record value.Specify the key by assigning it in $emit. For example: $emit.config.key: "$foo"
Next, let’s briefly summarize source and sink properties, post-processors, and write model strategies.

Source properties

When configuring the Kafka Connector as a source, it reads data from a MongoDB cluster and writes it into an Apache Kafka topic. Similar to the Kafka Connector, when using Atlas Stream Processing, developers first define a connection registry connection to a MongoDB Atlas cluster and another connection registry connection to their Kafka topic. Next, they use $source to read the data from the MongoDB cluster, and perform any optional transformations or aggregations on the data, before using $emit to write the data into their Kafka topic.

Sink properties

Using the Kafka Connector as a sink, developers specifically define the connection as a sink using the connector class com.mongodb.kafka.connect.MongoSinkConnector. With Atlas Stream Processing, you define a source in the connection registry for use in both sources and sinks. When writing to a Kafka topic “sink,” you use $emit, and when writing to a MongoDB “sink,” you use $merge.

Post-processors

Post processors are used in the Kafka Connector to modify the document before writing it out to a MongoDB cluster sink. In Atlas Stream Processing, you can modify the document using the MongoDB aggregation pipeline stages before writing it to a MongoDB Atlas. There is no need to write your own Java code to perform business-specific transformations.
For example, assume the following event is on the Kafka topic:
When you use the allow list projector within the Kafka Connector, the post-processor only outputs data from the fields that you specify.
In this example, the output would be the following:
Using Atlas Stream Processing, simply add a $project stage in your query to include/exclude fields.
The output would be the following:
The Kafka Connector contains built-in post-processors and processors that determine how to handle the _id field in the destination MongoDB. _id is determined by the DocumentIdAddr post-processor.

DocumentIdAddr

com.mongodb.kafka.connect.sink.processor.DocumentIdAdder
This post processor inserts an _id field determined by the configured strategy. The default strategy is BsonOidStrategy. By default, an _id will be added to the document if one does not already exist.
DocumentIdAddr strategies
DocumentId strategies are defined via document.id.strategy. These are the possible configurations and the Atlas Stream Processing equivalent. Note that with Atlas Stream Processing, to add or change the _id field, simply use the $addFields pipeline operator to add a new “_id” field with the value specified in the table below.
Strategy NameDescriptionAtlas Stream Processing Equivalent
BsonOidStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
DefaultA specific _id field does not need to exist in the document as a new _id with an ObjectId will be included by default.
KafkaMetaDataStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy
Builds a string composed of the concatenation of Kafka topic, partition, and offset.{ $addFields: { "_id": "$_stream_meta.source.topic"}}
FullKeyStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy
Uses the complete key structure to generate the value for the _id field.{ $addFields: { "_id": "$_stream_meta.source.key"}}
ProvidedInKeyStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy
Uses the _id field specified in the key structure.{ $addFields: { "_id": "$_stream_meta.source.key._id"}}
ProvidedInValueStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy
Uses the _id field specified in the value structure of the sink document.Not applicable as when Kafka data is sourced into Atlas Stream Processing, the value is the document itself. Thus, an _id within the document would be used.
PartialKeyStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy
Uses a block list or allow list projection of the sink document key structure.To use a different field name for the _id referencing one that originally came in as the key, use $addFields as follows:
{ $addFields: { "_id": "$_stream_meta.source.key.$your_desired_id_field"}}
PartialValueStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy
Uses a block list or allow list projection of the sink document value structure.To use a different field name for the _id, use $addFields as follows:
{ $addFields: { "_id": "$your_desired_id_field"}}
UuidProvidedInKeyStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidProvidedInKeyStrategy
Converts the _id key field to a UUID. The value must be either a string or binary type and must conform to the UUID format.Not supported.
UuidProvidedInValueStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidProvidedInValueStrategy
Converts the _id value field to a UUID. The value must be either a string or binary type and must conform to the UUID format.Not supported.
UuidStrategy
com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy
Uses a randomly generated UUID in string format.Not supported.
Custom user-defined id strategies are not supported.

Built-in post-processors

The following is a list of built-in Kafka Connector post-processors and their Atlas Stream Processing equivalents:
Post Processor NameDescriptionAtlas Stream Processing Equivalent
BlockListKeyProjector
com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
Removes matching key fields from the sink record.Modify the $_stream_meta.source.key directly using $set.
BlockListValueProjector
com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
Removes matching value fields from the sink record.Use $project to remove any fields from the document.
AllowListKeyProjector
com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector
Includes only matching key fields from the sink record.Modify the $_stream_meta.source.key directly using $set.
AllowListValueProjector
com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
Includes only matching value fields from the sink record.Use $project to show any fields from the document.
KafkaMetaAdder
com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder
Adds a field named "topic-partition-offset" and sets the value to the concatenation of Kafka topic, partition, and offset to the document.Use $set to add the field:
{ $set: { "topic-partition-offset": { $concat: ["$_stream_meta.source.topic", "#", "$_stream_meta.source.partition", "#", "$_stream_meta.source.offset"] } } }
RenameByMapping
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping
Renames fields that are an exact match to a specified field name in the key or value document.Use $project to rename fields. Reference field names using the $ operator.
RenameByRegex
com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex
Renames fields that match a regular expression in the key or value document.Use $project to rename fields.

Write model strategies

A write model strategy in MongoDB Kafka Connector is a custom Java class that defines how a sink connector writes data using write models. There are three strategies available out of the box with the Kafka Connector: ReplaceOneBusinessKeyStrategy, DeleteOneBusinessKeyStrategy, and UpdateOneBusinessKeyTimestampStrategy. Details of using these strategies are as follows:
Write StrategyDescriptionAtlas Stream Processing Equivalent
UpdateOneBusinessKeyTimestampAdds _insertedTS and _modifiedTS fields to the sink.Use $set to add a date to _insertedTS or _modifiedTS
Example:
{ $set: { _modifiedTS: new Date() } }
DeleteOneBusinessKeyStrategyDeletes the document based upon a value in the event.Not supported.
ReplaceOneBusinessKeyStrategyReplaces documents that match the value of the business key.Not supported.

Summary

In this article, we covered everything from a comparison of the Kafka Connector to Atlas Stream Processing, to the basics of setting up Atlas Stream Processing using a Kafka source (in this case, Redpanda), closing with some of the nuances and details to keep in mind when integrating Apache Kafka and MongoDB. There’s a lot to Apache Kafka and the ecosystem around it, so we hope this article helps you implement the solution that works best for your use case and needs. Atlas Stream Processing is a fully managed service integrated into MongoDB Atlas that we built to prioritize ease of use, cost, and performance when processing data between Kafka and MongoDB. While not intended as a direct replacement for the Kafka Connector, Atlas Stream Processing is proving a valuable alternative for customers and we are investing heavily in more features and functionality to support your workloads.
Ready to get started? Log in today! Can’t find a configuration parameter or capability you need? Please let us know in UserVoice.
Top Comments in Forums
There are no comments on this article yet.
Start the Conversation

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Article

Introducing PSC Interconnect and Global Access for MongoDB Atlas


Aug 05, 2024 | 2 min read
Tutorial

Tutorial: Build a Movie Search Application Using Atlas Search


Sep 15, 2023 | 11 min read
Code Example

Get Started with MongoDB Atlas and AWS CloudFormation


Jan 23, 2024 | 3 min read
Tutorial

Leveraging MongoDB Atlas Vector Search With LangChain


Aug 16, 2024 | 6 min read
Table of Contents