Support resumeAfter or startAfter in Spark Connector for readStreams

Browsing the Spark Connector Change Stream Configuration Docs and the source code on Github, I’ve been unable to figure out how to specify a resumeAfter/startAfter token when consuming a Mongo db or collection as a readStream the way I would using a Python client like Motor.

Resuming consumption from a particular offset is a hard requirement for our use of the Spark Connector as we cannot guarantee 100% consumer uptime, yet need to be able to propagate 100% of the change feed to our sinks.

Is resumeAfter/startAfter supported and I’m just missing the documentation? And if not, would it be possible to support this as a read configuration option?

3 Likes

I am unable to find this option in the documentation too.
@Robert_Walters Could you please confirm if this feature is available in version 10.0?
Thanks in Advance.

Currently it is not possible, I added https://jira.mongodb.org/browse/SPARK-380

Can you add your use case to that ticket? If you don’t have a jira account, can you elaborate on what you expect to provide as a resume value? epoch time or Timestamp value ?

Is it possible right now to pass in the resume token to the spark connector?

@Robert_Walters I have been unable to locate the documentation for passing resume token to Spark connector.

Today it is not possible to pass the resume token. We created https://jira.mongodb.org/browse/SPARK-380 to add this functionality

Was this feature ever implemented? I see the ticket @Robert_Walters created was closed along with the linked tickets but Robert’s ticket was never assigned.

@Prakul_Agarwal now owns the Spark Connector and can comment on this specifically. I am not sure where the priority of this ended up.

We can use checkpointing while writing write streams. From the offset, the stream will be resumed where it left off.

Ex:
query = streaming_df.writeStream.format(“parquet”)\

.option(“checkpointLocation”, checkpoint_location) \

.foreachBatch(writeData)\

.trigger(once=True)\

.start()\

.awaitTermination()

“sources” : [ {
“description” : “com.mongodb.spark.sql.connector.read.MongoMicroBatchStream@6793d7d6”,
“startOffset” : {
“version” : 1,
“offset” : {
“$timestamp” : {
“t” : 1713179033,
“i” : 0
}
}
},
“endOffset” : {
“version” : 1,
“offset” : {
“$timestamp” : {
“t” : 1713179298,
“i” : 0
}
}
},
“latestOffset” : {
“version” : 1,
“offset” : {
“$timestamp” : {
“t” : 1713179298,
“i” : 0
}
}
},

Please refer to startOffset - which is the the offset from which stream starts.