Introducing the Newest Version of the MongoDB Spark Connector

Robert Walters

#Spark

MongoDB has just released an all-new version of our Spark Connector. This article discusses the background behind the MongoDB Spark Connector and some of the key features of the new release.

Why a new version?

The current version of the MongoDB Spark Connector was written in 2016 and is based on Version 1 of the Spark Data Source API. This API is still supported, but Databricks has released an updated version of the API, making it easier for data sources like MongoDB to work within the Spark ecosystem. By using Version 2 of the MongoDB Spark Connector, you’ll immediately benefit from capabilities such as tighter integration with Spark Structured Streaming.

MongoDB will continue to support Version 1 until Databricks deprecates its Datasource API, but no new features will be implemented, and upgrades to the Connector will include only bug fixes and support for the current version.

Which version should I use?

The new Spark Connector (Version 10.0) is not intended to be a direct replacement for applications that use the current MongoDB Spark Connector. Note that the new connector uses a different namespace, “com.mongodb.spark.sql.connector.MongoTableProvider”, versus the original Spark Connector, which uses “com.mongodb.spark.DefaultSource”. Having a different namespace makes it possible to use both versions of the Connector within the same Spark application. This is helpful in unit testing your application with the new Connector and making the transition on your timeline.

Also note a change with versioning of the MongoDB Spark Connector. The current version of the existing MongoDB Spark Connector is 3.0. Up until now, as MongoDB released versions of the connector, the number was aligned with the version of Spark that was supported—i.e., Version 2.4 of the MongoDB Spark Connector works with Spark 2.4. Going forward, this will not be the case. MongoDB's documentation will make clear which versions of Spark the Connector supports and provide the appropriate information.

Structured Streaming to MongoDB

Apache Spark comes with a stream processing engine called Structured Streaming, which is based on Spark's SQL engine and DataFrame APIs. Spark Structured Streaming treats each incoming stream of data as a microbatch, continually appending each microbatch to the target dataset. This makes it easy to convert existing Spark batch jobs into streaming jobs. Structured Streaming provides maximum throughput via the same distributed capabilities that have made Spark such a popular platform. In the following example, we’ll show you how to stream data to MongoDB using Structured Stream.

Consider a CSV file that contains natural gas prices. The following PySpark code will read the CSV file into a stream, compute a moving average, and stream the results into MongoDB.

from pyspark.sql.types import StructType, DateType, StringType, TimestampType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import lit, count

sc.setLogLevel('DEBUG')

readSchema = ( StructType()
  .add('Type', StringType())
  .add('Date', TimestampType())
  .add('Price', DoubleType())
)


ds = (spark
  .readStream.format("csv")
  .option("header", "true")
  .schema(readSchema)
  .load("daily*.csv"))


slidingWindows = (ds
  .withWatermark("Date", "1 minute")
  .groupBy(ds.Type, F.window(ds.Date, "7 day"))
  .avg()
  .orderBy(ds.Type,'window'))

dsw = (
  slidingWindows
    .writeStream
    .format("mongodb")
    .queryName("7DaySlidingWindow")
    .option("checkpointLocation", "/tmp/pyspark/")
    .option("forceDeleteTempCheckpointLocation", "true")
    .option('spark.mongodb.connection.uri', 'MONGODB CONNECTION HERE')
    .option('spark.mongodb.database', 'Pricing')
    .option('spark.mongodb.collection', 'NaturalGas')
    .outputMode("complete"))

query = dsw.start()
query.processAllAvailable()
query.stop()

For more information and examples on the new MongoDB Spark Connector V10.0, check out our documentation.

Ask questions and give feedback on the MongoDB Community Forum. The Connector is open sourced; feel free to contribute at GitHub.