I’m experimenting with the new Spark Connector 10.0.2. With the current setup, I’m ingesting data with the MongoDB Kafka sink connector into collections. From there, I would like to further process the data with Spark Structured Streaming.
I can successfully batch read the existing data with
spark.read. But I was hoping that I could also fetch the existing data with
spark.readStream in addition to newly inserted data. However, I only get the data that is inserted after opening the stream with Spark.
I have enabled replication on my instance before ingesting any data, but this didn’t change the outcome. Is there a way to achieve this? For example, with Kafka I can define the
Here is my current code:
# pyspark shell account_df = spark.readStream.format("mongodb") \ .option('spark.mongodb.connection.uri', 'mongodb://<HOST>:27017') \ .option('spark.mongodb.database', 'algorand') \ .option('spark.mongodb.collection', 'account') \ .option('spark.mongodb.change.stream.publish.full.document.only','true') \ .option("forceDeleteTempCheckpointLocation", "true") \ .load() res = account_df.writeStream \ .outputMode("append") \ .option("forceDeleteTempCheckpointLocation", "true") \ .format("console") \ .trigger(continuous="1 second") \ .start().awaitTermination() # returns empty dataframes