Saving change stream resume token

Using java Spring Boot framework. MongoDB Java drive 4.2.
I want to save the resume token and timestamp to a collection and when app starts up, if I have a resume token or timestamp, saved, I want to resume from that token, or timestamp.

My issue is I cannot successfully save and then retrieve the resume token from my collection. I have tried defining as BsonDocument in my class. That results in an error upon retrieval stating cannot convert from string to bson object. If I define and save as a string it fails upon retrieval stating cannot convert from bson document to string.

Has anyone successfully stored and then retrieved a resume token from your own collection? If so, how did you define it in your class? Can you share the java code?

The purpose is when we release a new version of the app, or if server failed, when the app restarts I want it to be able to resume where it left off, or close to it. I plan to save the last resume token every so many inserts.

Other idea is to bypass resume token and store the last action timestamp from my latest document and write my own query process to catchup all documents between that last action timestamp and the last action timestamp retrieved off first document captured from the change stream. This is probably needed in case oplog has rolled anyway.

1 Like

I finally figured out a solution. Might not be the most straight forward but it worked.

I can restart with either the timestamp or the token.

This is all POC code. Not yet ready for production.

How I defined in my cdrConfig.class.

private BsonString token;
private BsonTimestamp startAtOperationTime;

How I captured the 2 values to insert into my collection.

BsonTimestamp startAtOperationTime = changeStreamDocument.getClusterTime().asTimestamp();
BsonDocument resumeToken = changeStreamDocument.getResumeToken().asDocument();
BsonString token = resumeToken.getString("_data");

How I built the resumeToken after receiving data back from my collection.

BsonTimestamp startAtOperationTime = cdrConfig.getStartAtOperationTime();
BsonString bsonString = cdrConfig.getToken();
BsonDocument resumeToken2.put("_data", bsonString);

How I started the changestream watch.

if (resumeToken2 != null) {
	changeStream = claims.watch(pipeline).resumeAfter(resumeToken2).fullDocument(FullDocument.UPDATE_LOOKUP);
	} else {
		if (startAtOperationTime != null) {
			changeStream = claims.watch(pipeline).startAtOperationTime(startAtOperationTime).fullDocument(FullDocument.UPDATE_LOOKUP);
		} else {
			changeStream = claims.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP);
			}
}

Struggling with the same issue, this is very helpful, thanks. Curious if you ever found a “cleaner” method to do this.

I am Stuck with the same problem can you share the exact code for this, if possible? Thanks in advance

Did you try the code I posted above?

Yes, I have tried but in my case, it didn’t work.

Is your changestream code working if you ignore my example code? If not, you need to get that working first. If you do have the Changestream working, what part of the above code is not working for you? What error do you see? To me, the only part that could cause an issue is the claims.watch(pipeline) if you didn’t build the “pipeline” correctly or if a newer driver wants something different.
I posted that code well over a year ago and it is possible the example needs tweaked for a newer MongoDB java driver. We have not yet move forward with using ChangeStreams, the project is delayed until 2023.

The above code is for use after you get your ChangeStream process working and decide you want to store the resume token and/or the resume timestamp for the last document read from the ChangeStream process. You can use the stored Token or timestamp to restart your changestream at that specific point, if the document is still in the Opslog.

1 Like

In getting the resume Token from the last document, I didn’t get that one. Also, I need to store the last resume Token in a file or somewhere I can access when the app gets close. How to do those things

I show how to retrieve the token and timestamp in the code I posted above.
Look under section “How I captured the 2 values to insert into my collection”.

Then I stored the values in a collection using the data type described under section “How I defined in my cdrConfig.class”. I named my storage collection “cdrConfig”, you name yours whatever name fits your environment.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.