MongoDB Spark Connector v10 - Failing to write to a file sink

Hi - I am currently trying to read the change data from MongoDB and persisting the results to a file sink but getting a java.lang.UnsupportedOperationException: Data source mongodb does not support microbatch processing. error

Here is my code snippet:

query=(spark.readStream.format("mongodb")
.option('spark.mongodb.connection.uri', 'mongodb+srv://<<mongo-connection>>')
    	.option('spark.mongodb.database', 'xxx') \
    	.option('spark.mongodb.collection', 'xxx') \
        .option('spark.mongodb.change.stream.publish.full.document.only','true') \
    	.option("forceDeleteTempCheckpointLocation", "true") \
    	.load())

query.printSchema()

query.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation", "s3://xxxx/checkpoint") \
    .option("path", "s3://xxx") \
    .start()

Environment details :
MongoDB Atlas : 5.0.8
Spark : 3.2.1
MongoDB-Spark connector : 10.0.2

Does this connector support writing to file sinks? Any suggestions?

full error log:

java.lang.UnsupportedOperationException: Data source mongodb does not support microbatch processing.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.microBatchUnsupportedByDataSourceError(QueryExecutionErrors.scala:1579)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:123)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:519)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:97)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:194)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:194)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:342)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:323)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:250)

I have also tried writing the change data back to mongoDB using the example given here but also giving the same error. is this connector working?

query.writeStream \
    .format("mongodb") \
    .option("checkpointLocation", "s3://xxxx") \
    .option("forceDeleteTempCheckpointLocation", "true") \
    .option('spark.mongodb.connection.uri', 'mongodb+srv://xxx') \
    .option('spark.mongodb.database', 'xxx') \
    .option('spark.mongodb.collection', 'xxx') \
    .outputMode("append") \
    .start()

At this time the Spark Connector only support continuous processing not microbatch. The file sink supports microbatch which is why it doesn’t work.

1 Like

so any idea if the connector would eventually support microbatch?

I’m here at the same stage any solution for the . Data source mongodb does not support microbatch processing. error

Did you got any solution. for Data source mongodb does not support microbatch processing. error

@Krishna_Kumar_Sahu Can you add your requirements to this ticket? Specifically the use case and what destinations you are writing to that require microbatch?

https://jira.mongodb.org/browse/SPARK-368

We are considering this for next quarter as it has come up a few times.

If anyone else has this issue please comment on the ticket.

Hi @Robert_Walters as per my requirement I’m added the details into that ticket https://jira.mongodb.org/browse/SPARK-368 please Give me solution as soon as posible. Thanks

no i did not find any soluition - saw you have created a ticket, thanks for helping to create a ticket on this!

According to ticket https://jira.mongodb.org/browse/SPARK-368 the seems it’s resolved in fixed Version 10.1.0 of mongo spark connector but I can’t find the updated mongo spark connector in Maven please provide the link. or makes available that fixed version of it.

10.1 is not released yet, if you’d like to try it out now you can download the build here

https://oss.sonatype.org/content/repositories/snapshots/org/mongodb/spark/mongo-spark-connector_2.12/10.1.0-SNAPSHOT/

for scala 2.12

or Index of /repositories/snapshots/org/mongodb/spark/mongo-spark-connector_2.13/10.1.0-SNAPSHOT

for scala 2.13

Hi @Robert_Walters,

to try version 10.1 on databricks, Do I have to download all these files and add to the cluster? or which files from that list are needed? how can I reference in my spark session? Thank you very much.

I do not have the Databricks setup available to verify but I think you would download the latest “-all.jar” file like this one for 2.12

mongo-spark-connector_2.12-10.1.0-20221215.104635-18-all.jar

Then in the Databricks once you create your cluster, under Libraries import the library and download the -all

That is from a blog I wrote ^^ you might find helpful Exploring Data with MongoDB Atlas, Databricks, and Google Cloud | MongoDB Blog.

Just remember that in Spark 3.x we used"mongo" in Spark 10.x version of the connector its “mongodb” such as

query=(slidingWindows.writeStream.format("mongodb").option('spark.mongodb.connection.uri', 'mongodb://mongo1:27017,mongo2:27018,mongo3:27019/Stocks.Source?replicaSet=rs0')\
    .option('spark.mongodb.database', 'Pricing') \
    .option('spark.mongodb.collection', 'NaturalGas') \
    .option("checkpointLocation", "/tmp") \
    .outputMode("complete") \
    .start())

The error message indicates that the MongoDB Spark Connector does not support microbatch processing for the mongodb data source. This means that you cannot use the writeStream API with the mongodb data source. Instead, you may want to consider using the foreachBatch API to write the streaming data to MongoDB.

Here’s an example of how you can use foreachBatch to write streaming data to MongoDB:

def write_mongodb(df, epoch_id):
    df.write \
        .format("mongo") \
        .option("uri", "mongodb+srv://<<mongo-connection>>") \
        .option("database", "xxx") \
        .option("collection", "xxx") \
        .mode("append") \
        .save()

query = (spark.readStream
         .format("mongodb")
         .option("uri", "mongodb+srv://<<mongo-connection>>")
         .option("database", "xxx")
         .option("collection", "xxx")
         .option("changeStream.fullDocument", "updateLookup")
         .load())

stream = (query.writeStream
          .foreachBatch(write_mongodb)
          .option("checkpointLocation", "s3://xxxx/checkpoint")
          .outputMode("append")
          .start())

stream.awaitTermination()

In this example, the query variable reads data from the MongoDB collection, and the write_mongodb function writes the data to MongoDB. The foreachBatch API is used to write the data in batches to MongoDB. You can modify the write_mongodb function to write the data to a file instead of MongoDB.

I hope this helps! Let me know if you have any further questions.