Hi community.
I’m experimenting with the new Spark Connector 10.0.2. With the current setup, I’m ingesting data with the MongoDB Kafka sink connector into collections. From there, I would like to further process the data with Spark Structured Streaming.
I can successfully batch read the existing data with spark.read
. But I was hoping that I could also fetch the existing data with spark.readStream
in addition to newly inserted data. However, I only get the data that is inserted after opening the stream with Spark.
I have enabled replication on my instance before ingesting any data, but this didn’t change the outcome. Is there a way to achieve this? For example, with Kafka I can define the
startingOffsets
as earliest
.
Here is my current code:
# pyspark shell
account_df = spark.readStream.format("mongodb") \
.option('spark.mongodb.connection.uri', 'mongodb://<HOST>:27017') \
.option('spark.mongodb.database', 'algorand') \
.option('spark.mongodb.collection', 'account') \
.option('spark.mongodb.change.stream.publish.full.document.only','true') \
.option("forceDeleteTempCheckpointLocation", "true") \
.load()
res = account_df.writeStream \
.outputMode("append") \
.option("forceDeleteTempCheckpointLocation", "true") \
.format("console") \
.trigger(continuous="1 second") \
.start().awaitTermination()
# returns empty dataframes