How to unwatch or stop a running change stream with mongodb java driver

Hi, I’m trying to know how to stop or cancel a change stream that is currently running, in case I no longer need it.

For example, if I have the following Java code snippet:

// ...

private static final Logger log = LoggerFactory.getLogger(MySynchronousBlockingChangeStream.class);

@Autowired
private MongoTemplate mongoTemplate;

// ...

public void startChangeStream() {
	String query = "{operationType: {$in: ['insert', 'update']}, 'fullDocument.total': {$gt: 100}}";
	
	try {
		List<Bson> changestream = Collections.singletonList(match(
				Document.parse(query).toBsonDocument(BsonDocument.class, MongoClientSettings.getDefaultCodecRegistry())
			)	
		);
		
		ChangeStreamIterable<Document> iter = collection.watch(changestream);
		MongoCursor<ChangeStreamDocument<Document>> cursor = iter.iterator();
		cursor.forEachRemaining(stream -> {
		
			Document targetDoc = stream.getFullDocument();
			log.debug("Received ChangeStream: " + targetDoc);
			mongoTemplate.insert(targetDoc, "nameOfAuditCollection");
		});
		
	} catch (MongoException ex) {
		log.error("ChangeStream error :: " + ex.getMessage());
	}
}

how can I stop this running change stream through a method invocation, for example “stopChangeStream() {…}” ?

I’ve tried to close the cursor “cursor.close()” while looping, but it throws an Exception.

Hello @Adianfer, welcome to the forum.

You can only close the cursor from outside the change stream’s cursor-loop. But, you can exit (or break from) the loop based upon a condition - assuming the cursor is on a while-loop. For example (for demonstration only), after iterating three times you can break the loop. Then the cursor.close() method is executed.

MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor 
    = collection.watch().cursor();

int counter = 0;
	
while (cursor.hasNext()) {
  System.out.println(cursor.next());
  if (counter++ > 2) break;
}

cursor.close();

You have to know what is the condition upon which you want to close the cursor; in the example its just a count. And, how the condition is fulfilled is your application functionality.

From the documentation, after a change stream’s cursor is acquired

… the cursor remains open until one of the following occurs:

  • The cursor is explicitly closed.
  • An invalidate event occurs.
  • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.

Hi @Prasad_Saya, thank you!
Your suggestion works correctly, but I was obliged to remove the Java Lambda Expression because it doesn’t allow to use break; inside the loop.

cursor.forEachRemaining(stream -> {
	// ...
});

So I’ve replaced it with the following while loop in order to break and then close the cursor.

while (cursor.hasNext()) {
	if (isChangeStreamDisabled()) {
		break;
	}
	
	// ...
}

cursor.close();

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.