How to unwatch/close change stream

Flask api has different end points for start and stop the collection…

Start endpoint

with collection.watch() as stream:
   while stream.alive
   print(change)

How to unwatch/stop change stream with different endpoint called - Stop end point

Tried as below…but change stream is not closing…

with collection.watch() as stream
  If stream.alive == True:
    stream.close()

Please advise…

any update for closing xhange stream from flask api

Hi @Krishnamoorthy_Kalidoss

I think you can use Pymongo’s documentation to do this (change_stream – Watch changes on a collection, database, or cluster — PyMongo 4.2.0 documentation). With a little change:

    with db.collection.watch(pipeline) as stream:
        for insert_change in stream:
            print(insert_change)
            if <some condition>:
                break

You might want to tailor the <some condition> above to suit your closing criteria.

Also from https://www.mongodb.com/docs/manual/changeStreams/ :

Note that you need to have certain requirements (e.g. WiredTiger, recent MongoDB versions, etc.) to be able to open a changestream.

If this is not working for you, could you post:

  • Your MongoDB version
  • A minimal code example that is not working so it can be reproduced locally
  • Any error message you’re seeing

Best regards
Kevin

2 Likes

Ok thanks for reply… i do have few questions before i try…

As i mentioned i have two different endpoints for start and stop changestream…

When i start chanestream using start end point…it keeps on watching the collection…

When i want to stop watching, i wil cal stopchangestream endpoint using the code u provided…(based on condition)…

So we are using two different method for start and stop, what will happen the call for start which we started earlier(as it is keep on watching)… stopchangestram endpoint will stop watching which we called from start end point?

Hi

I tried with break…still it is not working…
Because if there is any change, cs keep on watching using for loop

for change in stream:
.
.

I used kind of flag…if flag is stop, break( with in for loop)…else continue…

As for loop keeps on execute on change…flag doesnt play role here…

My requirement is i have multiple collections in mongo db…i can start any collection to watch and stop any collection to stop watching at any time…

Please advise…

I’m not sure I follow. Are you saying that you have two different endpoints you can trigger:

  1. One process start the changestream by calling db.collection.watch(). How does this process handle the change events?
  2. Another process (unrelated to the first, perhaps started as a separate process in a different script) is designed to trigger closing the changestream opened by the first process

Is this accurate?

Can you provide some small example code for both processes?

Best regards
Kevin

2 Likes

startChangeStream:

with collection.watch() as stream:

for change in stream:
print(change)

stopChangeStrean:

with collection.watch() as stream:

for change in stream:
break

i tried with separate methods for start and stop…

i have also tried with passing flag to start change stream.
both didnt help me

.
i have multiple collections… i will start watching any collection and stop watching any collection at any time …

Please advise…

If startChangeStream and stopChangeStream are different scripts (or even different functions within the same script), then I don’t think this approach will work, since they are basically two different change streams, so one cannot affect the other.

Also in the stopChangeStream script/function:

with collection.watch() as stream:
    for change in stream:
        break

wouldn’t this open a changestream and immediately close it?

I believe what you need is to combine them into one function, something like what I posted earlier:

with collection.watch() as stream:
    print(change)
    if <some external condition>:
        break

The <some external condition> above could be anything you need, e.g. a flag that was set somewhere, some timer, etc.

Best regards
Kevin

1 Like

Yeah, i tried calling stop from same method using flag …as below:

with collection.watch() as stream:
for change in stream:
if (status == ‘stop’)
break

But once change stream started watching changes, it keep on watching. status variable doesn’t set the value as ’ stop’ externally

Please advise

I think this is the main cause of the issue. I believe you’ll need to ensure that this “stop” flag gets passed on to the function. However I think this is not really a MongoDB issue, rather, it’s more of a coding issue :slight_smile:

Having said that, I would check:

  • Is the status “stop” spelled correctly, since the variable check depends on it being exactly “stop” instead of True or False binary values
  • You might want to insert debugging statements in the code that sets the status variable, and the code that checks the flag (i.e. inside that for loop above) to see if the flag is set properly
  • I would also ensure that the variable status is within the scope of the for loop above

If these does not solve the issue, you might want to also ask on specific coding-oriented sites such as StackOverflow to see if there are any issues with the Python code.

Best regards
Kevin

2 Likes

Hi,

Instead of checking stop status within for loop do we have any better way to stop the change stream…

Because, irrespective of changes whether we have changes or not in stream i want to stop the change stream for the collection mentioned…

Please advise…

Not that I know of, since that is how the code works. Currently, the watch() method works like an infinite loop and it will keep processing each change event as it arrives.

At this point, killing the script is the only thing I can think of.

Best regards
Kevin

1 Like

Hi Kevin,

Thanks for confirming my observation…

Killing the script also will not help us…because change stream has to watch other collections if i stop watching one collection…so if i kill the script none of the collections will be watched…

So i think as per your earlier mail, we dont have option to achieve this…

I can think of two immediate ones off the top of my head:

One: is to run a separate script per change stream, so you can just kill the one that you need. This is probably the most straightforward, but would require a bit of management.

Two: is to run the change stream watchers in threads. Perhaps something like this:

stop = False
def watch_test():
    with collection.watch() as stream:
        for change in stream:
            if stop == True:
                break
            print(change)

w = threading.Thread(target=watch_test)
w.start()

You can set the stop variable to True and it will be visible inside the threaded function due to the scope, so it will stop the thread’s execution. See threading for more information, and please note that I wrote that untested code in a couple of minutes so it’s not the best.

But again this is Python coding, of which we’re not really experts in :slight_smile:

Best regards
Kevin

2 Likes

Ok, let me check the first option…

And in second approach, when the var stop is false and started watching collections using for loop, it wont come out of for loop till the changes are in ( as it is infinite) and dont have the scope to assiign stop var to true when we are supposed to stop…

Hi @Krishnamoorthy_Kalidoss

Yes I wrote that in a couple of minutes so it won’t be perfect and definitely not production quality, as it was intended to serve as an illustration to potential solutions. I was hoping to put forth some ideas for you to try on and modify to suit your workflow.

As we have agreed that this is a Python coding question and not a MongoDB question, I encourage you to ask for more direction and better code examples in programming sites such as StackOverflow.

Best regards
Kevin

2 Likes