MongoDB Spark Connector 10.0.2 - Read Existing Data as Stream

Hi community.

I’m experimenting with the new Spark Connector 10.0.2. With the current setup, I’m ingesting data with the MongoDB Kafka sink connector into collections. From there, I would like to further process the data with Spark Structured Streaming.

I can successfully batch read the existing data with But I was hoping that I could also fetch the existing data with spark.readStream in addition to newly inserted data. However, I only get the data that is inserted after opening the stream with Spark.

I have enabled replication on my instance before ingesting any data, but this didn’t change the outcome. Is there a way to achieve this? For example, with Kafka I can define the
startingOffsets as earliest.

Here is my current code:

# pyspark shell

account_df = spark.readStream.format("mongodb") \
		.option('spark.mongodb.connection.uri', 'mongodb://<HOST>:27017') \
  	.option('spark.mongodb.database', 'algorand') \
  	.option('spark.mongodb.collection', 'account') \
		.option('','true') \
  	.option("forceDeleteTempCheckpointLocation", "true") \

res = account_df.writeStream \
  .outputMode("append") \
  .option("forceDeleteTempCheckpointLocation", "true") \
  .format("console") \
  .trigger(continuous="1 second") \

# returns empty dataframes

We have a Jira to track the ability to copy existing data.

Feel free to track and comment.


1 Like

Hi Rob, thanks for the information! Looking forward to this feature.
Do I understand this correctly: to be able to use Spark Structured Streaming, replication on the MongoDB instance is needed to enable change streams?

Change streams requires a MongoDB cluster so yes, that said you can theoretically create a one node cluster if you want to test it.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.