Hi,
I am working on a project where I have the following data pipeline:
Twitter → Tweepy API (Stream) → Kafka → Spark (Real-Time Sentiment Analysis) → MongoDB → Tableau
I was able to get tweets stream using Tweepy into Kafka Producer and from Producer into Kafka Consumer. I then used data stream in Kafka Consumer as the data source, I created a “Streaming Data Frame” in Spark (PySpark) , performed real-time pre-processing & sentiment analysis, the resultant “Streaming Data Frame” needs to go into MongoDB, this is where the problem lies.
I am able to write “static” PySpark Data Frame into MongoDB, but not the streaming Data Frame.
Details are below:
mongo_conn = "mongodb+srv://<username>:<password>@cluster0.afic7p0.mongodb.net/?retryWrites=true&w=majority"
conf = SparkConf()
# Download mongo-spark-connector and its dependencies.
conf.set("spark.jars.packages","org.mongodb.spark:mongo-spark-connector:10.0.5")
conf.set("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")
# Set up read connection :
conf.set("spark.mongodb.read.connection.uri", mongo_conn)
conf.set("spark.mongodb.read.database", "mySecondDataBase")
conf.set("spark.mongodb.read.collection", "TwitterStreamv2")
# Set up write connection
conf.set("spark.mongodb.write.connection.uri", mongo_conn)
conf.set("spark.mongodb.write.database", "mySecondDataBase")
conf.set("spark.mongodb.write.collection", "TwitterStreamv2")
SparkContext.getOrCreate(conf=conf)
Reading Kafka Data Frame (Streaming)
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("startingOffsets", "earliest") \
.option("kafka.group.id", "group1") \
.option("subscribe", "twitter") \
.load()
Skipping Pre-Processing & Sentiment Analysis Code
Writing Data Stream to MongoDB
def write_row(batch_df , batch_id):
batch_df.write.format("mongodb").mode("append").save()
pass
sentiment_tweets.writeStream.foreachBatch(write_row).start().awaitTermination()
Where sentiment_tweets is the resultant Streaming Data Frame. The code above doesn’t work.
ERROR:py4j.clientserver:There was an exception while executing the Python Proxy on the Python Side.
Traceback (most recent call last):
File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 617, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 272, in call
raise e
File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 269, in call
self.func(DataFrame(jdf, self.session), batch_id)
File "<ipython-input-34-a3fa83af6c03>", line 2, in write_row
batch_df.write.format("mongodb").mode("append").save()
File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/readwriter.py", line 966, in save
self._jwrite.save()
File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 190, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.7/dist-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o159.save.
: java.lang.ClassNotFoundException:
Failed to find data source: mongodb. Please find packages at
https://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)