Problems with the spark-connector in version 10.1.1

Hi, I’m trying to read data from a MongoDB collection using Spark Structured Streaming (Python version). On these data I need to apply some operations, such as groupby, which are not supported in the continuous mode. For this reason I’m trying to use the microbatch mode instead.
In order to do so I have tryied to install the version 10.1 of the Spark Connector, which should support the microbatch mode (as said in this post), but it still does not seem to work.
What I’ve been doing is essentialy one of the following two things:

  • Download the jar from maven and insert it in the Spark jars folder;
  • Inserting the package that I want to use in the definition of the Spark Session.
    The code that I use to create the Spark session is the following:
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.13:10.1.1") \
        .getOrCreate()

Here instead a sample of the code that I want to run:

   streaming_df = spark \
        .readStream \
        .format("mongodb") \
        .option("spark.mongodb.connection.uri", "mongodb://localhost:27017") \
        .option("spark.mongodb.database", "database_name") \
        .option("spark.mongodb.collection", "collection_name") \
        .schema(data_schema) \
        .load()
...
# operations, among which there is a groupby
...
    source_aggregation \
        .writeStream \
        .format("mongodb") \
        .option("spark.mongodb.connection.uri", "mongodb://localhost:27017") \
        .option("spark.mongodb.database", "database_name") \
        .option("spark.mongodb.collection", "other_collection_name") \
        .option('replacedocument', 'true') \
        .option("checkpointLocation", os.path.join("checkpoint", "mongodb_checkpoint")) \
        .start()\
        .awaitTermination()

Some informations about my working environment:

  • OS: Windows 10
  • MongoDB Community version: 6.0.3
  • Spark: 3.3.1
  • Python version: 3.10
  • Scala version: 2.13.10

Do you have any suggestions? Thank you in advance

Can you paste the logs? What errors/warnings are you getting?

Thank you for the reply. I get two different kinds of errors based on the way I try to use version 10.1.
In the case of the modified Spark Session I get this:

pyspark.sql.utils.StreamingQueryException: Query Sources writing [id = 2a173727-93d0-41e0-8e22-5ff7e636d5be, runId = 6255dbf8-9010-44f0-aa90-cd6c3d0ca869] terminated with exception: Data source mongodb does not support microbatch processing.

About this error, it may be useful to point out that I’ve been (and I still am) using the version 10.0.5 without any problems. Because of this I interpreted the error as if version 10.1 is not being actually used at all with this configuration.

If instead I insert the downloaded jar (the one named “mongo-spark-connector_2.13-10.1.1.jar” at this link) in the Spark jars folder I get this error:

pyspark.sql.utils.StreamingQueryException: Query Sources writing [id = 2a173727-93d0-41e0-8e22-5ff7e636d5be, runId = 73c29cef-2cd4-4134-b78a-a7f34018ac9a] terminated with exception: org.apache.spark.sql.types.StructType.toAttributes()Lscala/collection/immutable/Seq;

which I have more problems understanding.

Thank you again in advance