MongoDB Spark Connector v10 - Failing to write to a file sink

Hi - I am currently trying to read the change data from MongoDB and persisting the results to a file sink but getting a java.lang.UnsupportedOperationException: Data source mongodb does not support microbatch processing. error

Here is my code snippet:

query=(spark.readStream.format("mongodb")
.option('spark.mongodb.connection.uri', 'mongodb+srv://<<mongo-connection>>')
    	.option('spark.mongodb.database', 'xxx') \
    	.option('spark.mongodb.collection', 'xxx') \
        .option('spark.mongodb.change.stream.publish.full.document.only','true') \
    	.option("forceDeleteTempCheckpointLocation", "true") \
    	.load())

query.printSchema()

query.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation", "s3://xxxx/checkpoint") \
    .option("path", "s3://xxx") \
    .start()

Environment details :
MongoDB Atlas : 5.0.8
Spark : 3.2.1
MongoDB-Spark connector : 10.0.2

Does this connector support writing to file sinks? Any suggestions?

full error log:

java.lang.UnsupportedOperationException: Data source mongodb does not support microbatch processing.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.microBatchUnsupportedByDataSourceError(QueryExecutionErrors.scala:1579)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:123)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:519)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.planQuery(MicroBatchExecution.scala:97)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:194)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:194)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:342)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:323)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:250)

I have also tried writing the change data back to mongoDB using the example given here but also giving the same error. is this connector working?

query.writeStream \
    .format("mongodb") \
    .option("checkpointLocation", "s3://xxxx") \
    .option("forceDeleteTempCheckpointLocation", "true") \
    .option('spark.mongodb.connection.uri', 'mongodb+srv://xxx') \
    .option('spark.mongodb.database', 'xxx') \
    .option('spark.mongodb.collection', 'xxx') \
    .outputMode("append") \
    .start()

At this time the Spark Connector only support continuous processing not microbatch. The file sink supports microbatch which is why it doesn’t work.

1 Like

so any idea if the connector would eventually support microbatch?

I’m here at the same stage any solution for the . Data source mongodb does not support microbatch processing. error

Did you got any solution. for Data source mongodb does not support microbatch processing. error

@Krishna_Kumar_Sahu Can you add your requirements to this ticket? Specifically the use case and what destinations you are writing to that require microbatch?

https://jira.mongodb.org/browse/SPARK-368

We are considering this for next quarter as it has come up a few times.

If anyone else has this issue please comment on the ticket.

Hi @Robert_Walters as per my requirement I’m added the details into that ticket https://jira.mongodb.org/browse/SPARK-368 please Give me solution as soon as posible. Thanks

no i did not find any soluition - saw you have created a ticket, thanks for helping to create a ticket on this!