Spark error reading nested document: org.bson.BsonInvalidOperationException: Invalid state INITIAL

I’m encountering a strange error in spark when reading nested doc into spark dataset:

/* 1 */
 {
    "_id": "user001",
    "group": 100,
    "profile": {
      "age": 21,
      "fname": "John",
      "lname": "Doe",
      "email": "johndoe@example.com"
    }
  }
/* 2 */
  {
    "_id": "user002",
    "group": 400,
    "profile": {
      "age": 23,
      "fname": "Jane",
      "lname": "Doe",
      "email": "janedoe@example.com"
    }
  }
case class User(_id: String, group: Option[Long], profile: Map[String, Option[String]])    

val spark = SparkSession
    .builder
    .appName("test-app")
    .enableHiveSupport()
    .getOrCreate()

import spark.implicits._    
val readConfig = ReadConfig(Map("uri" -> xxxx,"collection" -> xxx,"database" -> xxxx))

val userMongoDF: DataFrame =  MongoSpark.load[User](spark, readConfig)
val userDF: DataFrame = userMongoDF.filter(userMongoDF("group") > 50)

userDF.printSchema()
userDF.show()
root
 |-- _id: string (nullable = true)
 |-- group: long (nullable = true)
 |-- profile: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



+-------+------+------------------------------------------------------------------------+
|_id    |group |profile                                                                 |                                                                                                                                                                                                                                                                                       |createdWhen            |updatedByUser                                                                                                                                                                                                                                                                                                                        |updatedWhen            |realizedWhen         |opp_source  |opp_owner                                                                                                                                                                                                                                                                                             |
+-------+------+------------------------------------------------------------------------+
|user001|100   |Map(age -> 21, email -> johndoe@example.com, fname -> John, lname -> Doe|
|user002|400   |Map(age -> 23, email -> janedoe@example.com, fname -> Jane, lname -> Doe|
+-------+------+------------------------------------------------------------------------+   

But when I select the nested field profile , I encounter this error message:

userDF.select(col("_id"),col("profile.email").cast(StringType) as "email").show()

ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 437, XXXXX, executor 7): org.bson.BsonInvalidOperationException: Invalid state INITIAL

unlike select without profile which works fine: userDF.select(col("_id")).show()
I’m on Spark 2.1 and Mongo 3.16, using mongo-spark-connector_2.11-2.1.0 and mongo-java-driver-3.6.1