I’m encountering a strange error in spark when reading nested doc into spark dataset:
/* 1 */
{
"_id": "user001",
"group": 100,
"profile": {
"age": 21,
"fname": "John",
"lname": "Doe",
"email": "johndoe@example.com"
}
}
/* 2 */
{
"_id": "user002",
"group": 400,
"profile": {
"age": 23,
"fname": "Jane",
"lname": "Doe",
"email": "janedoe@example.com"
}
}
case class User(_id: String, group: Option[Long], profile: Map[String, Option[String]])
val spark = SparkSession
.builder
.appName("test-app")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val readConfig = ReadConfig(Map("uri" -> xxxx,"collection" -> xxx,"database" -> xxxx))
val userMongoDF: DataFrame = MongoSpark.load[User](spark, readConfig)
val userDF: DataFrame = userMongoDF.filter(userMongoDF("group") > 50)
userDF.printSchema()
userDF.show()
root
|-- _id: string (nullable = true)
|-- group: long (nullable = true)
|-- profile: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
+-------+------+------------------------------------------------------------------------+
|_id |group |profile | |createdWhen |updatedByUser |updatedWhen |realizedWhen |opp_source |opp_owner |
+-------+------+------------------------------------------------------------------------+
|user001|100 |Map(age -> 21, email -> johndoe@example.com, fname -> John, lname -> Doe|
|user002|400 |Map(age -> 23, email -> janedoe@example.com, fname -> Jane, lname -> Doe|
+-------+------+------------------------------------------------------------------------+
But when I select the nested field profile
, I encounter this error message:
userDF.select(col("_id"),col("profile.email").cast(StringType) as "email").show()
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 437, XXXXX, executor 7): org.bson.BsonInvalidOperationException: Invalid state INITIAL
unlike select without profile
which works fine: userDF.select(col("_id")).show()
I’m on Spark 2.1 and Mongo 3.16, using mongo-spark-connector_2.11-2.1.0 and mongo-java-driver-3.6.1