Spark 3.2.1, mongodb 4.4.15, Spark connector 10.x
I can use Spark Structure Streaming to read from mongodb change stream for some tables. But there is a big table on mongodb that cause OOM.
I use .option(“aggregation.allowDiskUse”, “true”) but it’s still the same. Even though according to the documentation: https://www.mongodb.com/docs/spark-connector/current/configuration/read/ that value should be true by default.
Error message
com.mongodb.MongoCommandException: Command failed with error 292 (QueryExceededMemoryLimitNoDiskUseAllowed): 'Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in
To overcome this issue, I filter the change steam with .option(“spark.mongodb.read.aggregation.pipeline”, “[{‘$match’: {createdAt: {$gte: new ISODate(‘2022-07-17’)}}}]”) or {createdAt: {$gte: ISODate(‘2022-07-17’)}}. Now the streaming job is running but there is no data in the change stream.