MongoDB Spark Connector v10.1.1 - Failing to read from some mongo tables

Hi, I am currently using mongodb v10.1.1 connector to read some tables from my mongo collection. (source)

However I am able to read some collections and some collections are giving errors.

Example (1)

batch_df1 = spark.read.format ("mongodb").option("spark.mongodb.connection.uri", "mongodb+srv://xxx: xxx@xxx/admin?readPreference=secondaryPreferred&retryWrites=true&w=majority")\
.option('spark.mongodb.database', 'db1') \
.option('spark.mongodb.collection' , 'collection1') \
.load()

Example (2)

batch_df2 = spark.read.format ("mongodb").option("spark.mongodb.connection.uri", "mongodb+srv://xxx: xxx@xxx/admin?readPreference=secondaryPreferred&retryWrites=true&w=majority")\
.option('spark.mongodb.database', 'db1') \
.option('spark.mongodb.collection' , 'collection2') \
.load()

Example (1) works fine, but example (2) gives an error:
Error in callback <bound method UserNamespaceCommandHook.post_command_execute of <dbruntime. DatasetInfo.UserNamespaceCommandHook obiect at 0x7fb94374e250>> (for post_execute):

How can I troubleshoot this?

Hi @Yh_G ,

There isnt anything obvious wrong that I can see here. We can try a couple of things to debug

  1. Try increasing the debug level and share the entire stack trace
  2. Double-check the connection URI by trying to connect from spark nodes to MongoDB instance via using the MongoDB Client (ex pymongo if using python) - this way we can rule any networking issues related to the second collection
  3. Can you try removing the readPreference=secondaryPreferred from the connection URI and try again?

Hi @Yh_G

Am also facing the same issue on my end. Did you get any workaround for this issue, please share it here. It will be helpful for the entire community.

@Prakul_Agarwal: Please share is it a known issue or any mitigation is out there.

@Mani_Perumal This is not a known issue.
Are you seeing the exact same issue - successful read on some collection while failing on others?
Can you also please share the mongoDB server version, and the Spark version you are using.

Hi @Prakul_Agarwal

We are using 7.0.2 version with Replica Set and Cluster Tier M400.
For spark version, have tried 3.21, 3.3.0, 3.3.2 and 3.4.1 with Scala version 2.12 and Python 3.10.1.
Mongo DB Connector: org.mongodb.spark:mongo-spark-connector_2.12:10.1.1

@Yh_G @Mani_Perumal thanks for the additional info. Unable to repro the issue. Possible to provide the full stacktrace to help understand the issue further and if its coming from the Spark nodes or the connector?

Hi @Prakul_Agarwal

Configuration 1:

Spark Connector: 2.12:10.2.0
Databricks Runtime version:  11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)/  13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)

Full Stack Trace:

Py4JJavaError: An error occurred while calling o351.load.
: scala.MatchError: com.mongodb.spark.sql.connector.schema.InferSchema$1@409386d7 (of class com.mongodb.spark.sql.connector.schema.InferSchema$1)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeFor(RowEncoder.scala:240)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.externalDataTypeForInput(RowEncoder.scala:236)
	at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1962)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor$2(RowEncoder.scala:157)
	at org.apache.spark.sql.catalyst.expressions.objects.MapObjects$.apply(objects.scala:859)
	at org.apache.spark.sql.catalyst.SerializerBuildHelper$.createSerializerForMapObjects(SerializerBuildHelper.scala:203)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:156)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor$3(RowEncoder.scala:199)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:192)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$serializerFor$3(RowEncoder.scala:199)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.serializerFor(RowEncoder.scala:192)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:73)
	at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:81)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:93)
	at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:215)
	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:162)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:750)

Configuration 2:

Spark Connector: 2.12:10.2.0
Databricks Runtime version:  12.2 LTS (includes Apache Spark 3.3.2, Scala 2.12)

Full Stacktrace:

Error in callback <bound method UserNamespaceCommandHook.post_command_execute of <dbruntime.DatasetInfo.UserNamespaceCommandHook object at 0x7fbe8c4f0040>> (for post_execute):

StreamingQueryException: [STREAM_FAILED] Query [id = e4fe97e6-ead4-490c-b8e4-10b8e0b4d763, runId = 6c59eb1f-66e4-4492-b7d6-aa20bd08b2c2] terminated with exception: Column `location_geohashs` has a data type of struct<lst:array<>,str:string>, which is not supported by JSON.

Second configuration gives some clarity on where the error is, but not sure what exactly the error means here. Happy to provide more steps if you look for.