To write data to MongoDB, call the writeStream() method on your
Dataset<Row> object. This method returns a
DataStreamWriter
object, which you can use to specify the format and other configuration settings
for your streaming write operation.
You must specify the following configuration settings to write to MongoDB:
Setting | Description |
|---|---|
| Specifies the format of the underlying output data source. Use |
| Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and checkpoint directory. For a list of write stream configuration options, see the Streaming Write Configuration Options guide. |
| Specifies how data of a streaming DataFrame is written to a streaming sink. To view a list of all supported output modes, see the Java outputMode documentation. |
| Specifies how often the Spark Connector writes results
to the streaming sink. Call this method on the To use continuous processing, pass To view a list of all supported processing policies, see the Java trigger documentation. |
The following code snippet shows how to use the previous configuration settings to stream data to MongoDB:
<streaming DataFrame>.writeStream() .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append");
For a complete list of methods, see the Java Structured Streaming reference.
To write data to MongoDB, call the writeStream function on your
DataFrame object. This function returns a
DataStreamWriter
object, which you can use to specify the format and other configuration settings for your
streaming write operation.
You must specify the following configuration settings to write to MongoDB:
Setting | Description |
|---|---|
| Specifies the format of the underlying output data source. Use |
| Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and checkpoint directory. For a list of write stream configuration options, see the Streaming Write Configuration Options guide. |
| Specifies how the Spark Connector writes a streaming DataFrame to a streaming sink. To view a list of all supported output modes, see the pyspark outputMode documentation. |
| Specifies how often the Spark Connector writes results
to the streaming sink. Call this method on the To use continuous processing, pass the function a time value
using the To view a list of all supported processing policies, see the pyspark trigger documentation. |
The following code snippet shows how to use the previous configuration settings to stream data to MongoDB:
<streaming DataFrame>.writeStream \ .format("mongodb") \ .option("spark.mongodb.connection.uri", <mongodb-connection-string>) \ .option("spark.mongodb.database", <database-name>) \ .option("spark.mongodb.collection", <collection-name>) \ .outputMode("append")
For a complete list of functions, see the pyspark Structured Streaming reference.
To write data to MongoDB, call the write method on your
DataFrame object. This method returns a
DataStreamWriter
object, which you can use to specify the format and other configuration settings
for your streaming write operation.
You must specify the following configuration settings to write to MongoDB:
Setting | Description |
|---|---|
| Specifies the format of the underlying output data source. Use |
| Specifies stream settings, including the MongoDB deployment connection string, MongoDB database and collection, and checkpoint directory. For a list of write stream configuration options, see the Streaming Write Configuration Options guide. |
| Specifies how the Spark Connector writes a streaming DataFrame to a streaming sink. To view a list of all supported output modes, see Scala outputMode documentation. |
| Specifies how often the Spark Connector writes results
to the streaming sink. Call this method on the To use continuous processing, pass To view a list of all supported processing policies, see the Scala trigger documentation. |
The following code snippet shows how to use the previous configuration settings to stream data to MongoDB:
<streaming DataFrame>.writeStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append")
For a complete list of methods, see the Scala Structured Streaming reference.
Example
The following example shows how to stream data from a CSV file to MongoDB:
Create a
DataStreamReaderobject that reads from the CSV file.To create a
DataStreamWriterobject, call thewriteStream()method on the streamingDataset<Row>that you created with theDataStreamReader. Use theformat()method to specifymongodbas the underlying data format.Call the
start()method on theDataStreamWriterobject to begin the stream.
As the connector reads data from the CSV file, it adds that
data to MongoDB according to the outputMode you specify.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("csv") .option("header", "true") .schema("<csv-schema>") .load("<csv-file-name>") // manipulate your streaming data .writeStream() .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
Create a
DataStreamReaderobject that reads from the CSV file.To create a
DataStreamWriterobject, call thewriteStreamfunction on the streamingDataFramethat you created with theDataStreamReader. Use theformat()function to specifymongodbas the underlying data format.Call the
start()function on theDataStreamWriterinstance to begin the stream.
As the connector reads data from the CSV file, it adds that
data to MongoDB according to the outputMode you specify.
# create a local SparkSession spark = SparkSession.builder \ .appName("writeExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define a streaming query dataStreamWriter = (spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) # manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") ) # run the query query = dataStreamWriter.start()
Create a
DataStreamReaderobject that reads from the CSV file.To create a
DataStreamWriterobject, call thewriteStreammethod on the streamingDataFramethat you created with theDataStreamReader. Use theformat()method to specifymongodbas the underlying data format.Call the
start()method on theDataStreamWriterinstance to begin the stream.
As the connector reads data from the CSV file, it adds that
data to MongoDB according to the outputMode you specify.
// create a local SparkSession val spark = SparkSession.builder .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define a streaming query val dataStreamWriter = spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) // manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") // run the query val query = dataStreamWriter.start()
API Documentation
To learn more about the types used in these examples, see the following Apache Spark API documentation: