Spark error reading nested document: org.bson.BsonInvalidOperationException: Invalid state INITIAL

I’m encountering a strange error in spark when reading nested doc into spark dataset:

/* 1 */
    "_id": "user001",
    "group": 100,
    "profile": {
      "age": 21,
      "fname": "John",
      "lname": "Doe",
      "email": ""
/* 2 */
    "_id": "user002",
    "group": 400,
    "profile": {
      "age": 23,
      "fname": "Jane",
      "lname": "Doe",
      "email": ""
case class User(_id: String, group: Option[Long], profile: Map[String, Option[String]])    

val spark = SparkSession

import spark.implicits._    
val readConfig = ReadConfig(Map("uri" -> xxxx,"collection" -> xxx,"database" -> xxxx))

val userMongoDF: DataFrame =  MongoSpark.load[User](spark, readConfig)
val userDF: DataFrame = userMongoDF.filter(userMongoDF("group") > 50)

 |-- _id: string (nullable = true)
 |-- group: long (nullable = true)
 |-- profile: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

|_id    |group |profile                                                                 |                                                                                                                                                                                                                                                                                       |createdWhen            |updatedByUser                                                                                                                                                                                                                                                                                                                        |updatedWhen            |realizedWhen         |opp_source  |opp_owner                                                                                                                                                                                                                                                                                             |
|user001|100   |Map(age -> 21, email ->, fname -> John, lname -> Doe|
|user002|400   |Map(age -> 23, email ->, fname -> Jane, lname -> Doe|

But when I select the nested field profile , I encounter this error message:"_id"),col("").cast(StringType) as "email").show()

ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 437, XXXXX, executor 7): org.bson.BsonInvalidOperationException: Invalid state INITIAL

unlike select without profile which works fine:"_id")).show()
I’m on Spark 2.1 and Mongo 3.16, using mongo-spark-connector_2.11-2.1.0 and mongo-java-driver-3.6.1