Unable to run GCP Dataproc job to read MongoDB Atlas on AWS

I have created Spark-Scala application which uses Spark-Mongo Connector MongoDB Atlas hosted in AWS to GCP Dataproc. I have also used Cloud NAT Gateway to establish the connection. Below are the version details

Mongo Atlas - 4.4.22
Spark - 3.3
Spark-Mongo Connector - 10.1.1
Dataproc Cluster - Master n2-highmem-16 (1), Worker n2-highmem-96 (5) -> [496 vCPU]
Note: Also used standard nodes with similar configs

I was able to read the smaller collections having data of around 10 Million records and around 200GB in size. My challenge is to read a collection having 2.2TB in size and close to 2 Billion records. I have used all the possible Dataproc cluster combinations (max combination 500 vCPU) and am unable to process the data. In any of the cluster configurations, I could not see Spark triggering any job. The Dataproc job kept running for hours without Spark executing any tasks and I had to kill it. I am using the below sample code to process data.

spark-shell --conf 'spark.executor.extraJavaOptions=--add-exports=jdk.naming.dns/com.sun.jndi.dns=java.naming' --packages org.mongodb.spark:mongo-spark-connector_2.12:10.1.1
import org.apache.spark.sql.SparkSession
import com.mongodb.spark.sql._
url = mongodb+srv://user:pwd@host/?authSource=admin&readPreference=secondary
val spark = SparkSession.builder().appName("Test")
 .getOrCreate()
val df = spark.read.format("mongodb")
  .option("connection.uri", uri)
  .option("database", "db")
  .option("collection", "table")
  .load()
df.show(false)

What additionally I can do here to process this huge Mongo data? I have also tried applying filters, but could not even get explained plan since it was taking too long (a few hours)

Hello @Shruthi_Madumbu ,
Is the mongoDB cluster using any sharding? Have you tried using the “partitioners” in MongoDB spark connector?
You can partition your dataset using that and then be able to process the data in parallel across multiple workers in your Dataproc cluster.

The Default partitioner is the SamplePartitioner.
Configuration information for the various partitioners:

Let us know if this helps you get the processing started.

1 Like

Thanks for your reply Prakul. I did use ShardedPartitioner. I also used few filters to limit the data volume. Now I could atleast see that Spark job is being triggered. But the job is running very slow. It has all the cluster resources. However, the issue is from Mongo side. Every-time I run the Spark job, I see the Disk Utilization becomes 100% and the performance is degraded. Since I am only reading from secondary it is not affecting primary. Is there any better way of handling this issue ?

import org.apache.spark.sql.SparkSession
import com.mongodb.spark.sql._
url = mongodb+srv://user:pwd@host/?authSource=admin&readPreference=secondary
val spark = SparkSession.builder().appName("Test")
 .getOrCreate()
val df = spark.read.format("mongodb")
  .option("connection.uri", uri)
  .option("database", "db")
  .option("collection", "table")    
.option("spark.mongodb.input.partitioner","com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner")
.option("partitioner.options.partition.size","100")
  .load()

df.filter(col("owner") === ownerId).filter(col("product") === productId)

Seems like the reads are expensive and consuming all resources. Can you try ensuring that the filter is covered by an index. You can setup indexes using Atlas UI or cli and can also use hint in your queries to nudge the use of those indexes. If using the sharded partitioner then the shard key should also be part of the index.