How to add pipelibe in structured streaming.. pls. Provide the syntax..i want to capture only the changss done on collection

How to add pipelibe to structurd straming.pls provide the syntax…i need to capture the changes on collection, adding new document and for deleting

1 Like

please let me know the syntax using sparksession(instead of sparkconf)

‘’’
SparkSession spark = SparkSession.builder()
.appName(“testprogram”)
.config(“spark.jars.package”, “org.mongodbspark.mongo-spark-connector10.0.5”)
.getOrCreate();
pipeline = [{‘match’: {‘status’: ‘A’}}]

dataStreamDf = spark.readStream()
.format(“mongodb”)
.option(“spark.mongodb.connection.uri”, “”)
.option(“spark.mongodb.database”, “”)
.option(“spark.mongodb.change.publish.full.document.only”,“true”)
.option(“spark.ongodb.read.aggreation.pipeline”,pipeline")
.option(“spark.mongodb.collection”, “”)
.schema(“inferschema”,“true”)
.load()

.writeStream()
.format(“mongodb”)
.option(“checkpointLocation”, “/tmp/”)
.option(“forceDeleteTempCheckpointLocation”, “true”)
.option(“spark.mongodb.connection.uri”, “”)
.option(“spark.mongodb.database”, “”)
.option(“spark.mongodb.collection”, “”)
.trigger(continuous=“1 second”)
.outputMode(“append”);
.start() ‘’’

With this i can capture when i do update and insert to read collection. (without pipeline)…

here i want to acheive the following with structred streaming with mongo-spark-connector10.0.5”:

1). i have set the pipeline in the above code - pipeline is not working with the above code…
2) how to capture operation type (insert, update, delete)
3) how to caputre only the changes during update

Please help me as soon as

can anyone please help on this…it is quite urgent for me…

Any update here… Do we have a way to use pipeline ?

Hello MongoDb Community,

I believe we can not trigger pipeline in structured streaming using
‘’’

“spark.mongodb.read.aggregation.pipeline”
‘’’’

Please confirm…

(Streaming from mongo to mongo using continuous)