And this is a writing stream on the same collection in order to generate inserts and change events.
import pyspark.sql.functions as F
import pyspark.sql.types as T
periodic_data = (
spark
.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
.withColumn(
'purpose',
F.concat_ws(' ', F.lit('one row per second stream to memory'), (F.rand() * 100))
)
)
write_to_mongo = (
periodic_data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/periodic_data")
.option("forceDeleteTempCheckpointLocation", "true")
.option("connection.uri", <mongodb-atlas-uri>)
.option("database", "streaming")
.option("collection", "events")
.outputMode("append")
)
x = write_to_mongo.start()
Problem is, the reading stream is only returning the token data and everything else is empty. It’s not even a change stream document, but an incomplete mix of a collection document with the _id replaced with the change event token value.
Thanks for your replies.
I had already tested the “publish.full.document.only” option without success. I retried it now just in case with the same result.
So I copied and pasted the example you linked. Now, it seems that the only time it’s returning data as expected is when specifying this option with the complete prefix:
And then I get the full document with data. But in reality what I would want is in fact the full change event to do an Extract-Load ingestion regardless of document structure (fields and types).
I know it’s in fact called Structured Streaming for a reason and that it expects a fixed structure, but I would want to utilize the Change Event structure instead of the document collection one to be able to process schema updates on a collection on the fly, even when the streaming process is running.
PS: I’m using the this connector “org.mongodb.spark:mongo-spark-connector:10.0.1”
In the meantime, now that “change.stream.publish.full.document.only” returns data, I’m trying to write and save the data back. But because the “mongodb” read stream only works as continuous processing it’s impossible to write in a file sink. Even this example from the documentation returns the following error:
# define a streaming query
query = (spark
.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("csv")
.option("path", "/output/")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query.start()
Py4JJavaError: An error occurred while calling o1087.start.
: java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:64)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:300)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:349)
at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:458)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:437)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
I tried the “csv”, “parquet” and “delta” output sinks.
If you want to show the operation type but not the fullDocument, just configure the pipeline parameter to project out the operationType and the fields you wish to return.
in this example it only shows insert events where the event_id = 321 and does not return the _id or event_id but will return anything else in the document.
Is it possible to get all change event (insert, delete etc) and output to Kafka sink? I want to get all the metadata (at least event type) and fullDocument.