Hi, I’m trying to read data from a MongoDB collection using Spark Structured Streaming (Python version). On these data I need to apply some operations, such as groupby, which are not supported in the continuous mode. For this reason I’m trying to use the microbatch mode instead.
In order to do so I have tryied to install the version 10.1 of the Spark Connector, which should support the microbatch mode (as said in this post), but it still does not seem to work.
What I’ve been doing is essentialy one of the following two things:
- Download the jar from maven and insert it in the Spark jars folder;
- Inserting the package that I want to use in the definition of the Spark Session.
The code that I use to create the Spark session is the following:
spark = SparkSession \
.builder \
.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.13:10.1.1") \
.getOrCreate()
Here instead a sample of the code that I want to run:
streaming_df = spark \
.readStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", "mongodb://localhost:27017") \
.option("spark.mongodb.database", "database_name") \
.option("spark.mongodb.collection", "collection_name") \
.schema(data_schema) \
.load()
...
# operations, among which there is a groupby
...
source_aggregation \
.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", "mongodb://localhost:27017") \
.option("spark.mongodb.database", "database_name") \
.option("spark.mongodb.collection", "other_collection_name") \
.option('replacedocument', 'true') \
.option("checkpointLocation", os.path.join("checkpoint", "mongodb_checkpoint")) \
.start()\
.awaitTermination()
Some informations about my working environment:
- OS: Windows 10
- MongoDB Community version: 6.0.3
- Spark: 3.3.1
- Python version: 3.10
- Scala version: 2.13.10
Do you have any suggestions? Thank you in advance