Change stream must be followed by a match and then a project stage

Hi Everyone,

I’m currently trying to implement change stream in spark using Databricks.

Unfortunately, I’m unable to read (Neither Batch nor Streaming) from MongoDB. Even though, I’m able to get collection’s schema.

Background:

  1. Source: Azure Cosmos DB for MongoDB v4.0

  2. Databricks Environment:

    Runtime: 10.4.x-cpu-ml-scala2.12
    Library: org.mongodb.spark:mongo-spark-connector_2.12:10.1.1

  3. Documentation read (as new user, can’t paste more than 3 links):

  1. Tutorials followed:

    Streaming Data with Apache Spark and MongoDB

After read and followed docs and tutorial, I’m still unable to read from MongoDB.

Here, I get collection’s schema

base_read_config = {
‘connection.uri’: mongo_endpoint,
‘database’: mongo_database,
‘collection’: mongo_collection
}

schema_df = spark.read.format(“mongodb”).options(**base_read_config).load()

But if I tried to display batch data, I got the following error

com.mongodb.spark.sql.connector.exceptions.MongoSparkException: Partitioning failed. Partitioner calling collStats command failed
Caused by: com.mongodb.spark.sql.connector.exceptions.MongoSparkException: Partitioner calling collStats command failed
Caused by: com.mongodb.MongoCommandException: Command failed with error 40324 (40324): ‘Unrecognized pipeline stage name: $collStats’ on server XXXX. The full response is {“ok”: 0.0, “errmsg”: “Unrecognized pipeline stage name: $collStats”, “code”: 40324, “codeName”: “40324”}

Then, I tried to display streaming data (getting collection’s schema)

But again, with an error

Stream stopped…
org.apache.spark.SparkException: Writing job aborted
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 8) (10.248.224.5 executor 0): com.mongodb.spark.sql.connector.exceptions.MongoSparkException: Could not create the change stream cursor.
Caused by: com.mongodb.MongoCommandException: Command failed with error 2 (BadValue): ‘Change stream must be followed by a match and then a project stage’ on server XXXX. The full response is {“ok”: 0.0, “errmsg”: “Change stream must be followed by a match and then a project stage”, “code”: 2, “codeName”: “BadValue”}

I’m not sure but I think the agregation.pipeline option is unable to identify the match and project stages in pipeline variable.

I would really appreciate if someone can help me to identify what I’m doing wrong.

Regards

1 Like

The problem is you are using CosmosDB which isn’t the same as MongoDB i.e. collStats doesn’t exist in CosmosDB. Use MongoDB Atlas which is available in Azure it should be a direct replacement in your environment. It is available on the Azure Marketplace Microsoft Azure Marketplace

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.