Write changed data from collections to S3 using Spark Streaming

Hi Community,

We are trying to perform CDC(Changed Data Capture) and write that to S3 in JSON format, from all of our collections created in MongoDB Atlas(v4.4.23) deployment using Spark Structured Streaming. We are using PySpark in AWS Glue(v3.0) to run this Spark Streaming Job. We used mongo-spark-connector_2.12-10.1.1.
Also passed below jars to the streaming job.

  • bson-4.10.2.jar
  • mongodb-driver-core-4.10.2.jar
  • mongodb-driver-sync-4.10.2.jar

However the job is failing with below exception when its being executed in AWS Glue. I ran the similar streaming job in my local system, but I did not encountered any issue.

java.lang.NoSuchMethodError: org.bson.conversions.Bson.toBsonDocument()Lorg/bson/BsonDocument;
at com.mongodb.spark.sql.connector.read.MongoMicroBatchPartitionReader.getCursor(MongoMicroBatchPartitionReader.java:169)
at com.mongodb.spark.sql.connector.read.MongoMicroBatchPartitionReader.next(MongoMicroBatchPartitionReader.java:103)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:454)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:277)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

Providing below the code for more clarity.

def write_to_filesink(batch_df, batch_id, collection, base_dir):
    batch_df.write\
            .format('json')\
            .mode('append')\
            .save(f'{base_dir}/{collection}')

base_s3_path = s3://bucket/dir1/dir2
partial_filesink_writter = partial(write_to_filesink, collection=collection, base_dir=base_s3_path)

streaming_df = spark.readStream\
                 .format('mongodb')\
                 .option('spark.mongodb.connection.uri', connection_uri)\
                 .option('spark.mongodb.database', db_name)\
                 .option('spark.mongodb.collection', collection_name) \
                 .option('spark.mongodb.change.stream.publish.full.document.only', 'true')\
                 .option("forceDeleteTempCheckpointLocation", "true")\
                 .load()

query = streaming_df.writeStream \
                    .foreachBatch(partial_filesink_writter) \
                    .option('checkpointLocation', \
                            f'{base_dir}/{collection}/_checkpoint') \
                    .trigger(processingTime='10 seconds') \
                    .start()

query.awaitTermination()

Would appreciate help in solving this issue.

Thanks.

java.lang.NoSuchMethodError - typically indicates a version mismatch or conflict between libraries in AWS glue environment. possibly between the BSON library and the MongoDB driver or the MongoDB Spark Connector. In the past we have seen some similar incompatibility with AWS glue - https://jira.mongodb.org/browse/SPARK-399
Can you try trying out different version and possibly opening a support ticket on AWS (if possible)

Does mongo-spark-connector v10.1.1 supports availableNow=True Trigger type? Because I encountered some error while used this trigger type.