Hi,
I am building a small python app that monitors a singlecollection in MongoDB Atlas and returns new document insertions via change_streams and websocket to ultimately visualize the new documents via dash . The data is inserted into the MongoDB collection from another python script running on a AWS EC2 instance.
I am able to read the data from the collection successfully from my local jupyter notebook.
and if I put the above collection.find()
function in a for loop, the last entry in the collection is read continuously.
But when I try to use change_streams to monitor for new insertions nothing happens and the function just returns nothing.
I tried the following three methods, but none of them is returning any new inserted document.
## Method 1
change_stream = client.change_stream.collection.watch()
for change in change_stream:
print(dumps(change))
print('') # for readability only
## Method 2
try:
with db.collection.watch([{'$match': {'operationType': 'insert'}}]) as stream:
for insert_change in stream:
print(insert_change)
except pymongo.errors.PyMongoError as e:
# The ChangeStream encountered an unrecoverable error or the
# resume attempt failed to recreate the cursor.
print(e)
## Method 3
try:
resume_token = None
pipeline = [{'$match': {'operationType': 'insert'}}]
with db.collection.watch(pipeline) as stream:
for insert_change in stream:
print(insert_change)
resume_token = stream.resume_token
except pymongo.errors.PyMongoError:
# The ChangeStream encountered an unrecoverable error or the
# resume attempt failed to recreate the cursor.
if resume_token is None:
# There is no usable resume token because there was a
# failure during ChangeStream initialization.
print("Error")
else:
# Use the interrupted ChangeStream's resume token to create a new ChangeStream.
# The new stream will continue from the last seen insert change without missing any events.
with db.collection.watch(pipeline, resume_after=resume_token) as stream:
for insert_change in stream:
print(insert_change)
I am a bit new to MongoDB, so can any expert on change_streams / websocket expert kindly give any guidance on making the above change_streams
code work and what should I do to make the change_streams into a web socket?
Best Regards,
Dilip