How to use resumeAfter: spark structured streaming + mongodb change stream

I have Spark Structure Streaming read data from mongodb change stream then send data to Kafka.

I want to be able to resume the job when something happen. I know mongodb supports resumeAfter and startAfter using resumeToken.

But I can’t find instruction on how to use it with Spark Structure Streaming (spark-connector). One way I try to do is use the read configuration, something like this:

.option(“”, “[{“createdAt”: {$gt: “2022-01-01”}}]”)

but it returns error

Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type DOCUMENT is of unexpected type NULL
at org.bson.BsonValue.throwIfInvalidType(
at org.bson.BsonValue.asDocument(
at org.bson.BsonDocument.getDocument(
at org.apache.spark.sql.execution.streaming.continuous.ContinuousQueuedDataReader$

Can someone point me to the tutorial for this?

1 Like

I also tried

.option(“”, “xxxsome real _id”)

but it shows same error message.

Were you able to figure this one out?