Saving change stream resume token

Using java Spring Boot framework. MongoDB Java drive 4.2.
I want to save the resume token and timestamp to a collection and when app starts up, if I have a resume token or timestamp, saved, I want to resume from that token, or timestamp.

My issue is I cannot successfully save and then retrieve the resume token from my collection. I have tried defining as BsonDocument in my class. That results in an error upon retrieval stating cannot convert from string to bson object. If I define and save as a string it fails upon retrieval stating cannot convert from bson document to string.

Has anyone successfully stored and then retrieved a resume token from your own collection? If so, how did you define it in your class? Can you share the java code?

The purpose is when we release a new version of the app, or if server failed, when the app restarts I want it to be able to resume where it left off, or close to it. I plan to save the last resume token every so many inserts.

Other idea is to bypass resume token and store the last action timestamp from my latest document and write my own query process to catchup all documents between that last action timestamp and the last action timestamp retrieved off first document captured from the change stream. This is probably needed in case oplog has rolled anyway.

I finally figured out a solution. Might not be the most straight forward but it worked.

I can restart with either the timestamp or the token.

This is all POC code. Not yet ready for production.

How I defined in my cdrConfig.class.

private BsonString token;
private BsonTimestamp startAtOperationTime;

How I captured the 2 values to insert into my collection.

BsonTimestamp startAtOperationTime = changeStreamDocument.getClusterTime().asTimestamp();
BsonDocument resumeToken = changeStreamDocument.getResumeToken().asDocument();
BsonString token = resumeToken.getString("_data");

How I built the resumeToken after receiving data back from my collection.

BsonTimestamp startAtOperationTime = cdrConfig.getStartAtOperationTime();
BsonString bsonString = cdrConfig.getToken();
BsonDocument resumeToken2.put("_data", bsonString);

How I started the changestream watch.

if (resumeToken2 != null) {
	changeStream = claims.watch(pipeline).resumeAfter(resumeToken2).fullDocument(FullDocument.UPDATE_LOOKUP);
	} else {
		if (startAtOperationTime != null) {
			changeStream = claims.watch(pipeline).startAtOperationTime(startAtOperationTime).fullDocument(FullDocument.UPDATE_LOOKUP);
		} else {
			changeStream = claims.watch(pipeline).fullDocument(FullDocument.UPDATE_LOOKUP);
			}
}

Struggling with the same issue, this is very helpful, thanks. Curious if you ever found a “cleaner” method to do this.