I have Spark Structure Streaming read data from mongodb change stream then send data to Kafka.
I want to be able to resume the job when something happen. I know mongodb supports resumeAfter and startAfter using resumeToken.
But I can’t find instruction on how to use it with Spark Structure Streaming (spark-connector). One way I try to do is use the read configuration, something like this:
.option(“spark.mongodb.read.aggregation.pipeline”, “[{“createdAt”: {$gt: “2022-01-01”}}]”)
but it returns error
Caused by: org.bson.BsonInvalidOperationException: Value expected to be of type DOCUMENT is of unexpected type NULL
at org.bson.BsonValue.throwIfInvalidType(BsonValue.java:419)
at org.bson.BsonValue.asDocument(BsonValue.java:47)
at org.bson.BsonDocument.getDocument(BsonDocument.java:524)
at com.mongodb.spark.sql.connector.read.MongoStreamPartitionReader.lambda$tryNext$8(MongoStreamPartitionReader.java:144)
at com.mongodb.spark.sql.connector.read.MongoStreamPartitionReader.withCursor(MongoStreamPartitionReader.java:196)
at com.mongodb.spark.sql.connector.read.MongoStreamPartitionReader.tryNext(MongoStreamPartitionReader.java:137)
at com.mongodb.spark.sql.connector.read.MongoStreamPartitionReader.next(MongoStreamPartitionReader.java:112)
at org.apache.spark.sql.execution.streaming.continuous.ContinuousQueuedDataReader$DataReaderThread.run(ContinuousQueuedDataReader.scala:150)
Can someone point me to the tutorial for this?