Hi Everyone,
I’m currently trying to implement change stream in spark using Databricks.
Unfortunately, I’m unable to read (Neither Batch nor Streaming) from MongoDB. Even though, I’m able to get collection’s schema.
Background:
-
Source: Azure Cosmos DB for MongoDB v4.0
-
Databricks Environment:
Runtime: 10.4.x-cpu-ml-scala2.12
Library: org.mongodb.spark:mongo-spark-connector_2.12:10.1.1 -
Documentation read (as new user, can’t paste more than 3 links):
- Microsoft
- Databricks
- MongoDB
MongoDB Spark Connector - Read Config Options
MongoDB Spark Connector - Read From Mongo
MongoDB Spark Connector - Structured Streaming with MongoDB
- Tutorials followed:
After read and followed docs and tutorial, I’m still unable to read from MongoDB.
Here, I get collection’s schema
base_read_config = {
‘connection.uri’: mongo_endpoint,
‘database’: mongo_database,
‘collection’: mongo_collection
}
schema_df = spark.read.format(“mongodb”).options(**base_read_config).load()
But if I tried to display batch data, I got the following error
com.mongodb.spark.sql.connector.exceptions.MongoSparkException: Partitioning failed. Partitioner calling collStats command failed
Caused by: com.mongodb.spark.sql.connector.exceptions.MongoSparkException: Partitioner calling collStats command failed
Caused by: com.mongodb.MongoCommandException: Command failed with error 40324 (40324): ‘Unrecognized pipeline stage name: $collStats’ on server XXXX. The full response is {“ok”: 0.0, “errmsg”: “Unrecognized pipeline stage name: $collStats”, “code”: 40324, “codeName”: “40324”}
Then, I tried to display streaming data (getting collection’s schema)
But again, with an error
Stream stopped…
org.apache.spark.SparkException: Writing job aborted
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 8) (10.248.224.5 executor 0): com.mongodb.spark.sql.connector.exceptions.MongoSparkException: Could not create the change stream cursor.
Caused by: com.mongodb.MongoCommandException: Command failed with error 2 (BadValue): ‘Change stream must be followed by a match and then a project stage’ on server XXXX. The full response is {“ok”: 0.0, “errmsg”: “Change stream must be followed by a match and then a project stage”, “code”: 2, “codeName”: “BadValue”}
I’m not sure but I think the agregation.pipeline
option is unable to identify the match and project stages in pipeline variable.
I would really appreciate if someone can help me to identify what I’m doing wrong.
Regards