Data Source Details:
Database: Mongo DB 6.0
Collection Size: 3.35 GB
Total Documents: 3000
Avg. Document Size: ~1 MB
Total Available Cores: 3
Allocated Executor Cores: 3
Total Available Memory: 3 GB
Total Allocated Memory: 3 GB
Allocated Driver Memory: 1 GB
Number of Executors: 1
Spark-Mongo-Connector Version: 10.1.1
Using default fraction values.
The below program is crashing due to OOM: Java heap space. On analyzing the heap dump, this code is actually fetching and putting the data into an ArrayList internally causing OOM.
My Spark Program:
import com.mongodb.client.model.{Aggregates, Filters}
import com.mongodb.spark.sql.connector.config.{MongoConfig, ReadConfig}
import com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.bson.BsonValue
object WorkWithMongoTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("Spark-MongoDB-Connector-Tests " + new java.util.Date(System.currentTimeMillis()))
val spark = SparkSession.builder().config(conf).getOrCreate()
try {
val docIds: Array[BsonValue] = Array.empty[BsonValue] // array of 300 document ids
val filterPipeLine = Aggregates.`match`(Filters.in("_id", docIds: _*))
val configMap = Map(
"connection.uri" ->
"mongodb://admin:admin@localhost:27017/heavy_db.heavy_data?readPreference=primaryPreferred&authSource=admin&authMechanism=SCRAM-SHA-1",
"aggregation.pipeline" -> filterPipeLine.toBsonDocument.toJson(),
"partitioner" -> classOf[PaginateBySizePartitioner].getName,
"partitioner.options.partition.size" -> "64"
)
spark.read.format("mongodb")
.options(configMap)
.load()
} catch {
case e: Exception => e.printStackTrace()
} finally {
println("========================== PROGRAM END ==============================")
spark.stop()
}
}
}
Q1. The method load() is not an action. Then why is it still fetching the data and loading it into the heap?
Q2. Why am I possibly facing the Java Heap Space issue while reading 300 documents? Any suggestions?
Great question and thank you for including lots of information. I think I know the cause and have a solution for you.
Q1. The method load() is not an action. Then why is it still fetching the data and loading it into the heap?
This is slightly naunced as load() essentially takes a DataFrameReader and outputs a Dataset<Row>. The Dataset<Row> has a schema and there is no set schema on the DataFrameReader so there is an “action” the connector has to infer the schema.
See: https://www.mongodb.com/docs/spark-connector/current/read-from-mongodb
Q2. Why am I possibly facing the Java Heap Space issue while reading 300 documents? Any suggestions?
Schema inference uses the sampleSize configuration which in turn utilizes the $sample operator and then compares the documents to produce a schema. This is done on the Spark Driver machine and it appears that is what requires more memory.
So to fix please either explicitly provide a Schema to the Dataset eg:
Thanks for your response, Ross. That clarifies the root cause. In my case, the schema is unknown and it may or may not be the same for all the documents in the target collection.
Since I have to process TBs of such kind of data in the future, I am trying to read the documents in batches so that it doesn’t run OOM. The physical memory is limited, If I allocate let’s say 4GB memory for the spark driver, it can still throw OOM at some point. A batch of 300 documents could be of 300 Kb or 3GB. Also, it’s not necessary that reading documents of 100 MB size will consume exactly 100MB of heap space. Creating small batches could lead to underutilization of the resources and large batches would lead to OOM.
Hi @Ross_Lawley, I could see that setting the value for sampleSize in the spark conf to a lesser value like 10 does the trick. I would like to know how important schema inferring is, since in my case the documents may not have a similar structure always? what impact will it have if I set the sampleSize to 1?
So sampleSize is important in that directly relates to the number of documents used to infer the schema. If you chose 1 - then only a single document’s schema would be used. If your data is mixed and many documents have different shapes, then a larger sample will be required. As the sample is randomly selected from the collection - you need a relatively large size to ensure a representative sample.
When reading from the collection the documents are then shaped into the schema. So not having a schema that is representative of the data is problematic as data could be missed or type errors can occur converting into the corresponding Spark type.