Connecting Datafusion to to MongoDB Atlas

Hi Team,

I’m trying to create a pipeline in Google Cloud Datafusion to extract data from MongoDB Atlas to load in BigQuery. I’m using the google provided Mongo DB driver (v 2.0.0) in order to achieve this but I haven’t had any luck connecting to Atlas.

I’m trying to connect via standard connection and I’ve enabled the BI connection for our cluster and I’ve whitelisted the necessary IP’s in the network settings with no luck.

The MongoDB pipeline settings looks like this in Datafusion (I’m trying to connect using the host, port and user defined in the BI connection) :

However this is not working and I’m getting the following errors in Datafusion logs:


ERROR
Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Job aborted. at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:105) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsNewAPIHadoopDataset$1(PairRDDFunctions.scala:1077) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1075) at org.apache.spark.api.java.JavaPairRDD.saveAsNewAPIHadoopDataset(JavaPairRDD.scala:833) at io.cdap.cdap.etl.spark.batch.RDDUtils.saveHadoopDataset(RDDUtils.java:58) at io.cdap.cdap.etl.spark.batch.RDDUtils.saveUsingOutputFormat(RDDUtils.java:47) at io.cdap.cdap.etl.spark.batch.SparkBatchSinkFactory.writeFromRDD(SparkBatchSinkFactory.java:175) at io.cdap.cdap.etl.spark.batch.BaseRDDCollection$1.run(BaseRDDCollection.java:239) at io.cdap.cdap.etl.spark.SparkPipelineRunner.runPipeline(SparkPipelineRunner.java:383) at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:227) at io.cdap.cdap.app.runtime.spark.SparkTransactional$2.run(SparkTransactional.java:236) at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:208) at io.cdap.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:138) at io.cdap.cdap.app.runtime.spark.AbstractSparkExecutionContext.execute(AbstractSparkExecutionContext.scala:229) at io.cdap.cdap.app.runtime.spark.SerializableSparkExecutionContext.execute(SerializableSparkExecutionContext.scala:63) at io.cdap.cdap.app.runtime.spark.DefaultJavaSparkExecutionContext.execute(DefaultJavaSparkExecutionContext.scala:91) at io.cdap.cdap.api.Transactionals.execute(Transactionals.java:63) at io.cdap.cdap.etl.spark.batch.BatchSparkPipelineDriver.run(BatchSparkPipelineDriver.java:158) at io.cdap.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:87) at io.cdap.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732) Caused by: com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=transmit-staging-cluster-biconnector.o4mkl.mongodb.net:27015, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketWriteException: Exception sending message}, caused by {javax.net.ssl.SSLException: Unsupported or unrecognized SSL message}}] at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182) at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41) at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:136) at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:94) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:249) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:172) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:161) at com.mongodb.DB.executeCommand(DB.java:774) at com.mongodb.DBCollection.getStats(DBCollection.java:2282) at com.mongodb.hadoop.splitter.MongoSplitterFactory.getSplitterByStats(MongoSplitterFactory.java:76) at com.mongodb.hadoop.splitter.MongoSplitterFactory.getSplitter(MongoSplitterFactory.java:127) at com.mongodb.hadoop.MongoInputFormat.getSplits(MongoInputFormat.java:56) at io.cdap.cdap.etl.batch.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:45) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at io.cdap.cdap.app.runtime.spark.data.DatasetRDD.getPartitions(DatasetRDD.scala:61) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2257) at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83) ... 28 more

The connection seems to be timing out. I’ve tested connecting using MySQL Workbench (using the B.I connection and I could connect fine.

Does anyone here have experience with Datafusion and MongoDB Atlas and can help?

Thank you

Hi Marck - We have better ways to move extract data from Atlas to BigQuery like AtlasSQL, Datflow templates. Would be able to help based on the use case. Happy to have quick call to discuss on the same. Please reach out to me on - paresh.saraf@mongodb.com