MongoDB Connector for Spark V10 and Change Stream

Hello,

I’m trying to use the new MongoDB Connector for Spark (V10), mainly for the better support of Spark Structured Streaming.

This is my reading stream, watching for changes on a MongoDB collection:

read_from_mongo = (
    spark.readStream.format("mongodb")
    .option("connection.uri", <mongodb-atlas-uri>)
    .option("database", "streaming")
    .option("collection", "events")
    .option("lookup.full.document", "updateLookup")
    .load()
    .writeStream
    .trigger(continuous="10 seconds")
    .format("memory")
    .queryName("v10_stream")
    .outputMode("append")
)

y = read_from_mongo.start()

And this is a writing stream on the same collection in order to generate inserts and change events.

import pyspark.sql.functions as F
import pyspark.sql.types as T

periodic_data = (
    spark
    .readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .load()
    .withColumn(
        'purpose',
        F.concat_ws(' ', F.lit('one row per second stream to memory'), (F.rand() * 100))
    )
)

write_to_mongo = (
    periodic_data
    .writeStream
    .format("mongodb")
    .option("checkpointLocation", "/tmp/pyspark/periodic_data")
    .option("forceDeleteTempCheckpointLocation", "true")
    .option("connection.uri", <mongodb-atlas-uri>)
    .option("database", "streaming")
    .option("collection", "events")
    .outputMode("append")
)

x = write_to_mongo.start()

Problem is, the reading stream is only returning the token data and everything else is empty. It’s not even a change stream document, but an incomplete mix of a collection document with the _id replaced with the change event token value.

------------------------------------
Batch: 14
------------------------------------
+--------------------+-------+-----+
|                 _id|purpose|value|
+--------------------+-------+-----+
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
+--------------------+-------+-----+
only showing top 20 rows

------------------------------------
Batch: 15
------------------------------------
+--------------------+-------+-----+
|                 _id|purpose|value|
+--------------------+-------+-----+
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
+--------------------+-------+-----+

Am I missing something or misconfigured the streams?
The relevant documentation: https://www.mongodb.com/docs/spark-connector/current/configuration/read/#change-streams

Thanks. Best regards.

Hi,

I think you may want to do:

.option("change.stream.publish.full.document.only", "true")

Instead of:

.option("change.stream.lookup.full.document", "updateLookup")

That way it will output the full document only and not the full change event.

You may also want to explicitly set the schema on the readStream if the above change doesn’t work.

Ross


Edit: Fixed option names.

Also, another read example is here Streaming Data with Apache Spark and MongoDB

Hello,

Thanks for your replies.
I had already tested the “publish.full.document.only” option without success. I retried it now just in case with the same result.

So I copied and pasted the example you linked. Now, it seems that the only time it’s returning data as expected is when specifying this option with the complete prefix:

.option("spark.mongodb.change.stream.publish.full.document.only", "true")

And then I get the full document with data. But in reality what I would want is in fact the full change event to do an Extract-Load ingestion regardless of document structure (fields and types).

I know it’s in fact called Structured Streaming for a reason and that it expects a fixed structure, but I would want to utilize the Change Event structure instead of the document collection one to be able to process schema updates on a collection on the fly, even when the streaming process is running.

PS: I’m using the this connector “org.mongodb.spark:mongo-spark-connector:10.0.1”

Thanks.
Best regards.

Hello,

So no matter how I try, I still can’t make this option to work:

.option("spark.mongodb.change.stream.lookup.full.document", "updateLookup")

In the meantime, now that “change.stream.publish.full.document.only” returns data, I’m trying to write and save the data back. But because the “mongodb” read stream only works as continuous processing it’s impossible to write in a file sink. Even this example from the documentation returns the following error:

# define a streaming query
query = (spark
  .readStream
  .format("mongodb")
  .option("spark.mongodb.connection.uri", <mongodb-connection-string>)
  .option('spark.mongodb.database', <database-name>)
  .option('spark.mongodb.collection', <collection-name>)
  .schema(readSchema)
  .load()
  # manipulate your streaming data
  .writeStream
  .format("csv")
  .option("path", "/output/")
  .trigger(continuous="1 second")
  .outputMode("append")
)
# run the query
query.start()
Py4JJavaError: An error occurred while calling o1087.start.
: java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:64)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:300)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:349)
	at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:458)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:437)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)

I tried the “csv”, “parquet” and “delta” output sinks.