Stream from MongoDB to MongoDB in PySpark

Hi there,

I am working on a use case that I need to stream a collection to pyspark and then write stream back to the same collection on MongoDB. I did not find any document about stream read in pyspark.

Can anyone make me clear about it? Maybe It will be a new feature?

Best,

This is possible today, can you provide some more detail as to what roadblocks you are running into?

Tell me how is that possible? How can I read MongoDB collection in spark streaming in python?

I want something like below:

lines = ssc.MongoStream("localhost", 27017,database="test",collection="test")

or

lines = spark \
    .readStream \
    .format("mongo") \
    .option("host", "localhost") \
    .option("port", 27017) \
    .load()
1 Like

Today Structured Streaming isn’t supported, but you can make a connection to MongoDB via PyMongo and create a change_stream cursor to watch for data changes.

Is supporting Structured Streaming something that would benefit you? Can you describe your use case a bit more?

2 Likes

Thanks for yor clarification. Connecting via PyMongo is not streaming and It is not distributed I think, It means that you run a single thread python instance to read all the data. With the help of Spark Streaming(and Structured Streaming) all the workers can read stream a piece of data. Am I right?

My use case is to analyse(running A.I. algorithms) a collection of data in MongoDB which is being updated every 5 seconds and write the results.

Thanks.

1 Like

I also have a use case similar to yours. Can you please help me out?
You can reach to me at erum9964@hotmail.com
Thanks

Hi @mobin and @Erum_79340, can you add your scenario to https://jira.mongodb.org/projects/SPARK/issues/SPARK-85. We are looking at leveraging V2 of the Spark connector API which will enable native Spark streaming support for the connector. Your input will help us prioritize this.

1 Like

Thanks you. I sent my comment to the jira page. But I think the title of issue must have both source and sink there.

Hi Erum,

I commented my alternative solution to overcome this lack of support on the jira page that @Robert_Walters sent.

Thanks, I will check.