MongoDB Connector for Spark V10 and Change Stream

Hello,

I’m trying to use the new MongoDB Connector for Spark (V10), mainly for the better support of Spark Structured Streaming.

This is my reading stream, watching for changes on a MongoDB collection:

read_from_mongo = (
    spark.readStream.format("mongodb")
    .option("connection.uri", <mongodb-atlas-uri>)
    .option("database", "streaming")
    .option("collection", "events")
    .option("lookup.full.document", "updateLookup")
    .load()
    .writeStream
    .trigger(continuous="10 seconds")
    .format("memory")
    .queryName("v10_stream")
    .outputMode("append")
)

y = read_from_mongo.start()

And this is a writing stream on the same collection in order to generate inserts and change events.

import pyspark.sql.functions as F
import pyspark.sql.types as T

periodic_data = (
    spark
    .readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .load()
    .withColumn(
        'purpose',
        F.concat_ws(' ', F.lit('one row per second stream to memory'), (F.rand() * 100))
    )
)

write_to_mongo = (
    periodic_data
    .writeStream
    .format("mongodb")
    .option("checkpointLocation", "/tmp/pyspark/periodic_data")
    .option("forceDeleteTempCheckpointLocation", "true")
    .option("connection.uri", <mongodb-atlas-uri>)
    .option("database", "streaming")
    .option("collection", "events")
    .outputMode("append")
)

x = write_to_mongo.start()

Problem is, the reading stream is only returning the token data and everything else is empty. It’s not even a change stream document, but an incomplete mix of a collection document with the _id replaced with the change event token value.

------------------------------------
Batch: 14
------------------------------------
+--------------------+-------+-----+
|                 _id|purpose|value|
+--------------------+-------+-----+
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
+--------------------+-------+-----+
only showing top 20 rows

------------------------------------
Batch: 15
------------------------------------
+--------------------+-------+-----+
|                 _id|purpose|value|
+--------------------+-------+-----+
|{"_data": "82627A...|   null| null|
|{"_data": "82627A...|   null| null|
+--------------------+-------+-----+

Am I missing something or misconfigured the streams?
The relevant documentation: https://www.mongodb.com/docs/spark-connector/current/configuration/read/#change-streams

Thanks. Best regards.

Hi,

I think you may want to do:

.option("change.stream.publish.full.document.only", "true")

Instead of:

.option("change.stream.lookup.full.document", "updateLookup")

That way it will output the full document only and not the full change event.

You may also want to explicitly set the schema on the readStream if the above change doesn’t work.

Ross


Edit: Fixed option names.

Also, another read example is here Streaming Data with Apache Spark and MongoDB | MongoDB

Hello,

Thanks for your replies.
I had already tested the “publish.full.document.only” option without success. I retried it now just in case with the same result.

So I copied and pasted the example you linked. Now, it seems that the only time it’s returning data as expected is when specifying this option with the complete prefix:

.option("spark.mongodb.change.stream.publish.full.document.only", "true")

And then I get the full document with data. But in reality what I would want is in fact the full change event to do an Extract-Load ingestion regardless of document structure (fields and types).

I know it’s in fact called Structured Streaming for a reason and that it expects a fixed structure, but I would want to utilize the Change Event structure instead of the document collection one to be able to process schema updates on a collection on the fly, even when the streaming process is running.

PS: I’m using the this connector “org.mongodb.spark:mongo-spark-connector:10.0.1”

Thanks.
Best regards.

Hello,

So no matter how I try, I still can’t make this option to work:

.option("spark.mongodb.change.stream.lookup.full.document", "updateLookup")

In the meantime, now that “change.stream.publish.full.document.only” returns data, I’m trying to write and save the data back. But because the “mongodb” read stream only works as continuous processing it’s impossible to write in a file sink. Even this example from the documentation returns the following error:

# define a streaming query
query = (spark
  .readStream
  .format("mongodb")
  .option("spark.mongodb.connection.uri", <mongodb-connection-string>)
  .option('spark.mongodb.database', <database-name>)
  .option('spark.mongodb.collection', <collection-name>)
  .schema(readSchema)
  .load()
  # manipulate your streaming data
  .writeStream
  .format("csv")
  .option("path", "/output/")
  .trigger(continuous="1 second")
  .outputMode("append")
)
# run the query
query.start()
Py4JJavaError: An error occurred while calling o1087.start.
: java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:64)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:300)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:349)
	at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:458)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:437)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)

I tried the “csv”, “parquet” and “delta” output sinks.

Hey , did you find any solution? I’m getting the same error with writestream with diff formats .

The problem is you are trying to write to a stream to a CSV using a continuous trigger which isn’t supported.

1 Like

I read mongodb change stream and write to kafka. No matter how I change read.change.stream.publish.full.document.only it doesn’t work.

I want to get metadata (operationType etc) but the result only show fullDocument.

If you want to show the operation type but not the fullDocument, just configure the pipeline parameter to project out the operationType and the fields you wish to return.

pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ]

in this example it only shows insert events where the event_id = 321 and does not return the _id or event_id but will return anything else in the document.

1 Like

@Robert_Walters can you send me the instruction about setting pipeline in Spark-mongodb connector? I’m using scala btw.

Is it possible to get all change event (insert, delete etc) and output to Kafka sink? I want to get all the metadata (at least event type) and fullDocument.

Thanks

You won’t get delete and its full document because delete, removes the document so it won’t exist when the event is created. to specify the pipeline see https://www.mongodb.com/docs/spark-connector/current/configuration/read/#read-configuration-options. It will be a SparkConf setting so “spark.mongodb.read.aggregation.pipeline”:“[{”$match": {“operationType”: “insert”}]’ for example

2 Likes

Thanks Robert. It’s very helpful.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.