How to use resumeAfter: spark structured streaming + mongodb change stream

I have Spark Structure Streaming read data from mongodb change stream then send data to Kafka.

I want to be able to resume the job when something happen. I know mongodb supports resumeAfter and startAfter using resumeToken.

But I can’t find instruction on how to use it with Spark Structure Streaming (spark-connector). One way I try to do is use the read configuration, something like this:

.option(“spark.mongodb.read.aggregation.pipeline”, “[{“createdAt”: {$gt: “2022-01-01”}}]”)

but it returns error

Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type DOCUMENT is of unexpected type NULL
at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
at org.bson.BsonValue.asDocument(BsonValue.java:47)
at org.bson.BsonDocument.getDocument(BsonDocument.java:524)
at com.mongodb.spark.sql.connector.read.MongoStreamPartitionReader.lambda$tryNext$8(MongoStreamPartitionReader.java:144)
at com.mongodb.spark.sql.connector.read.MongoStreamPartitionReader.withCursor(MongoStreamPartitionReader.java:196)
at com.mongodb.spark.sql.connector.read.MongoStreamPartitionReader.tryNext(MongoStreamPartitionReader.java:137)
at com.mongodb.spark.sql.connector.read.MongoStreamPartitionReader.next(MongoStreamPartitionReader.java:112)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousQueuedDataReader$DataReaderThread.run(ContinuousQueuedDataReader.scala:150)

Can someone point me to the tutorial for this?

1 Like

I also tried

.option(“spark.mongodb.read.resumeAfter”, “xxxsome real _id”)

but it shows same error message.

Were you able to figure this one out?