Streaming Data From Spark to MongoDB

Hi,

I am working on a project where I have the following data pipeline:

Twitter → Tweepy API (Stream) → Kafka → Spark (Real-Time Sentiment Analysis) → MongoDB → Tableau

I was able to get tweets stream using Tweepy into Kafka Producer and from Producer into Kafka Consumer. I then used data stream in Kafka Consumer as the data source, I created a “Streaming Data Frame” in Spark (PySpark) , performed real-time pre-processing & sentiment analysis, the resultant “Streaming Data Frame” needs to go into MongoDB, this is where the problem lies.

I am able to write “static” PySpark Data Frame into MongoDB, but not the streaming Data Frame.

Details are below:

mongo_conn = "mongodb+srv://<username>:<password>@cluster0.afic7p0.mongodb.net/?retryWrites=true&w=majority"
conf = SparkConf()
# Download mongo-spark-connector and its dependencies.
conf.set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector:10.0.5")
conf.set("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")
 # Set up read connection :
conf.set("spark.mongodb.read.connection.uri", mongo_conn)
conf.set("spark.mongodb.read.database", "mySecondDataBase")
conf.set("spark.mongodb.read.collection", "TwitterStreamv2")
 # Set up write connection
conf.set("spark.mongodb.write.connection.uri", mongo_conn)
conf.set("spark.mongodb.write.database", "mySecondDataBase")
conf.set("spark.mongodb.write.collection", "TwitterStreamv2")
SparkContext.getOrCreate(conf=conf)

Reading Kafka Data Frame (Streaming)

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("startingOffsets", "earliest") \
        .option("kafka.group.id", "group1") \
        .option("subscribe", "twitter") \
        .load()

Skipping Pre-Processing & Sentiment Analysis Code

Writing Data Stream to MongoDB

def write_row(batch_df , batch_id):
    batch_df.write.format("mongodb").mode("append").save()
    pass

sentiment_tweets.writeStream.foreachBatch(write_row).start().awaitTermination()

Where sentiment_tweets is the resultant Streaming Data Frame. The code above doesn’t work.

ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 272, in call
    raise e
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 269, in call
    self.func(DataFrame(jdf, self.session), batch_id)
  File "<ipython-input-34-a3fa83af6c03>", line 2, in write_row
    batch_df.write.format("mongodb").mode("append").save()
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/readwriter.py", line 966, in save
    self._jwrite.save()
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1322, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.7/dist-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o159.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: mongodb. Please find packages at
https://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)

Resolved. I was overwriting mongodb configuration with that of kafka.
Below is the correct format:

conf.set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector:10.0.5,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")