Spark structure streaming with mongodb change stream: allowDiskUse doesn't work

Spark 3.2.1, mongodb 4.4.15, Spark connector 10.x

I can use Spark Structure Streaming to read from mongodb change stream for some tables. But there is a big table on mongodb that cause OOM.

I use .option(“aggregation.allowDiskUse”, “true”) but it’s still the same. Even though according to the documentation: https://www.mongodb.com/docs/spark-connector/current/configuration/read/ that value should be true by default.

Error message
com.mongodb.MongoCommandException: Command failed with error 292 (QueryExceededMemoryLimitNoDiskUseAllowed): 'Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in

To overcome this issue, I filter the change steam with .option(“spark.mongodb.read.aggregation.pipeline”, “[{’$match’: {createdAt: {$gte: new ISODate(‘2022-07-17’)}}}]”) or {createdAt: {$gte: ISODate(‘2022-07-17’)}}. Now the streaming job is running but there is no data in the change stream.

Did you set the option via " `spark.mongodb.read.aggregation.allowDiskUse" or just “aggregation.allowDiskUse” ?

I tried both and none of them work.

are you using a free tier of MongoDB Atlas?

No I’m using enterprise version

We filed https://jira.mongodb.org/browse/SPARK-355 to investigate as it might be a bug with the connector not enabling this parameter for some queries. Thank you for raising the question!

@khang_pham The fix is now checked in to 10.0.3, it is available here Central Repository: org/mongodb/spark/mongo-spark-connector/10.0.3.