Streaming Data with Apache Spark and MongoDB

Robert WaltersPublished May 05, 2022 • Updated May 19, 2022
MongoDB has released a version 10.0 of the MongoDB Connector for Apache Spark that leverages the new Spark Data Sources API V2 with support for Spark Structured Streaming.

Why a new version?

The current version of the MongoDB Spark Connector was originally written in 2016 and is based upon V1 of the Spark Data Sources API. While this API version is still supported, Databricks has released an updated version of the API, making it easier for data sources like MongoDB to work with Spark. By having the MongoDB Spark Connector use V2 of the API, an immediate benefit is a tighter integration with Spark
Structured Streaming
Note: With respect to the previous version of the MongoDB Spark Connector that supported the V1 API, MongoDB will continue to support this release until such a time as Databricks depreciates V1 of the Data Source API. While no new features will be implemented, upgrades to the connector will include bug fixes and support for the current versions of Spark only.

What version should I use?

The new MongoDB Spark Connector release (Version 10.0) is not intended to be a direct replacement for your applications that use the previous version of MongoDB Spark Connector.
The new Connector uses a different namespace with a short name, “mongodb” (full path is “com.mongodb.spark.sql.connector.MongoTableProvider”), versus “mongo” (full path of “com.mongodb.spark.DefaultSource”). Having a different namespace makes it possible to use both versions of the connector within the same Spark application! This is helpful in unit testing your application with the new Connector and making the transition on your timeline.
Also, we are changing how we version the MongoDB Spark Connector. The previous versions of the MongoDB Spark Connector aligned with the version of Spark that was supported—e.g., Version 2.4 of the MongoDB Spark Connector works with Spark 2.4. Keep in mind that going forward, this will not be the case. The MongoDB documentation will make this clear as to which versions of Spark the connector supports.

Structured Streaming to MongoDB

Apache Spark comes with a stream processing engine called
Structured Streaming
, which is based on Spark's SQL engine and DataFrame APIs. Spark Structured Streaming treats each incoming stream of data as a micro-batch, continually appending each micro-batch to the target dataset. This makes it easy to convert existing Spark batch jobs into a streaming job. Structured Streaming has evolved over Spark releases and in Spark 2.3 introduced Continuous Processing mode, which took the micro-batch latency from over 100ms to about 1ms. In the following example, we’ll show you how to stream data between MongoDB and Spark using Structured Streams and continuous processing. First, we’ll look at reading data from MongoDB.
Reading streaming data from MongoDB
You can stream data from MongoDB to Spark using the new Spark Connector. Consider the following example that streams stock data from a MongoDB Atlas cluster. A sample document in MongoDB is as follows:
In this code example, we will use the new MongoDB Spark Connector and read from the StockData collection. When the Spark Connector opens a streaming read connection to MongoDB, it opens the connection and creates a
MongoDB Change Stream
for the given database and collection. A change stream is used to subscribe to changes in MongoDB. As data is inserted, updated, and deleted, change stream events are created. It’s these
change events
that are passed back to the client in this case the Spark application. There are configuration options that can change the structure of this event message. For example, if you want to return just the document itself and not include the change stream event metadata, set “” to true.
The schema is inferred from the MongoDB collection. You can see from the printSchema command that our document structure is as follows:
root |-- _id: string (nullable = true) |-- company_name: string (nullable = true) |-- company_symbol: string (nullable = true) |-- price: double (nullable = true) |-- tx_time: string (nullable = true)
We can verify that the dataset is streaming with the isStreaming command.
Next, let’s read the data on the console as it gets inserted into MongoDB.
When the above code was run through spark-submit, the output resembled the following:
… removed for brevity …

Batch: 2

+--------------------+--------------------+--------------+-----+-------------------+ | _id| company_name|company_symbol|price| tx_time| +--------------------+--------------------+--------------+-----+-------------------+ |62476caa6df0f7dd8...| HUNGRY SYNDROME LLC| HSL|45.99|2022-04-01 17:20:42| |62476caa6df0f7dd8...|APPETIZING MARGIN...| AMP|12.81|2022-04-01 17:20:42| |62476caa6df0f7dd8...|EMBARRASSED COCKT...| ECC|38.18|2022-04-01 17:20:42| |62476caa6df0f7dd8...|PERFECT INJURY CO...| PIC|86.85|2022-04-01 17:20:42| |62476caa6df0f7dd8...|GIDDY INNOVATIONS...| GMI|84.46|2022-04-01 17:20:42| +--------------------+--------------------+--------------+-----+-------------------+
… removed for brevity …

Batch: 3

+--------------------+--------------------+--------------+-----+-------------------+ | _id| company_name|company_symbol|price| tx_time| +--------------------+--------------------+--------------+-----+-------------------+ |62476cab6df0f7dd8...| HUNGRY SYNDROME LLC| HSL|46.04|2022-04-01 17:20:43| |62476cab6df0f7dd8...|APPETIZING MARGIN...| AMP| 12.8|2022-04-01 17:20:43| |62476cab6df0f7dd8...|EMBARRASSED COCKT...| ECC| 38.2|2022-04-01 17:20:43| |62476cab6df0f7dd8...|PERFECT INJURY CO...| PIC|86.85|2022-04-01 17:20:43| |62476cab6df0f7dd8...|GIDDY INNOVATIONS...| GMI|84.46|2022-04-01 17:20:43| +--------------------+--------------------+--------------+-----+-------------------+
Writing streaming data to MongoDB
Next, let’s consider an example where we stream data from Apache Kafka to MongoDB. Here the source is a kafka topic “stockdata.Stocks.StockData.” As data arrives in this topic, it’s run through Spark with the message contents being parsed, transformed, and written into MongoDB. Here is the code listing with comments in-line:
Note that Kafka topic message arrives in this format -> key (binary), value (binary), topic (string), partition (int), offset (long), timestamp (long), timestamptype (int). See
Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
for more information on the Kafka and Spark integration.
To process the message for consumption into MongoDB, we want to pick out the value which is in binary format and convert it to JSON.
For reference, here is an example of an event (the value converted into a string) that is on the Kafka topic:
We want to isolate the payload field and convert it to a JSON representation leveraging the shcemaStock defined above. For clarity, we have broken up the operation into multiple steps to explain the process. First, we want to convert the value into JSON.
The dataset now contains data that resembles
Next, we want to capture just the value of the payload field and convert that into JSON since it’s stored as a string.
Now we can do whatever transforms we would like to do on the data. In this case, let’s convert the tx_time into a timestamp.
The Dataset is in a format that’s ready for consumption into MongoDB, so let’s stream it out to MongoDB. To do this, use the writeStream method. Keep in mind there are various options to set. For example, when present, the “trigger” option processes the results in batches. In this example, it’s every 10 seconds. Removing the trigger field will result in continuous writing. For more information on options and parameters, check out the Structured Streaming Guide.

Go forth and stream!

Streaming data is a critical component of many types of applications. MongoDB has evolved over the years, continually adding features and functionality to support these types of workloads. With the MongoDB Spark Connector version 10.0, you can quickly stream data to and from MongoDB with a few lines of code.
For more information and examples on the new MongoDB Spark Connector version 10.0, check out the
online documentation
. Have questions about the connector or MongoDB? Post a question in the
MongoDB Developer Community Connectors & Integrations

