Para escribir datos en MongoDB, llame al método writeStream() Método en el objeto Dataset<Row>. Este método devuelve un objeto DataStreamWriter, que puede usar para especificar el formato y otras opciones de configuración para la operación de escritura en streaming.
Debe especificar las siguientes configuraciones para escribir en MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de salida subyacente. Use |
| Especifica la configuración de la transmisión, incluida la implementación de MongoDB cadena de conexión, base de datos y colección MongoDB y directorio de puntos de control. Para obtener una lista de las opciones de configuración del flujo de escritura, consulte la Guíade opciones de configuración de escritura en streaming. |
| Especifica cómo se escriben los datos de un DataFrame de streaming en un receptor de streaming. Para ver una lista de todos los modos de salida compatibles, consulte la documentación de Java outputMode. |
| Especifica la frecuencia con la que el conector Spark escribe resultados en el receptor de streaming. Llame a este método en el objeto Para usar el procesamiento continuo, pase Para ver una lista de todas las políticas de procesamiento admitidas, consulte la documentación del activador de Java. |
El siguiente fragmento de código muestra cómo utilizar la configuración anterior para transmitir datos a MongoDB:
<streaming DataFrame>.writeStream() .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append");
Para obtener una lista completa de métodos, consulte la referencia de Java Structured Streaming.
Para escribir datos en MongoDB, llame a la función writeStream en su objeto DataFrame. Esta función devuelve un objeto DataStreamWriter, que puede usar para especificar el formato y otras opciones de configuración para su operación de escritura en streaming.
Debe especificar las siguientes configuraciones para escribir en MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de salida subyacente. Use |
| Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y la colección de MongoDB y el directorio de puntos de control. Para obtener una lista de las opciones de configuración de secuencias de escritura, consulte la guía Opciones de configuración de escritura en streaming. |
| Indica cómo el Spark Connector guarda un DataFrame de transmisión a un sink de transmisión. Para ver una lista de todos los modos de salida compatibles, consulta la documentación de pyspark outputMode. |
| Especifica la frecuencia con la que el conector Spark escribe resultados en el receptor de streaming. Llame a este método en el objeto Para usar el procesamiento continuo, pase a la función un valor de tiempo mediante el parámetro Para ver una lista de todas las políticas de procesamiento admitidas, consulte la documentación del activador de pyspark. |
El siguiente fragmento de código muestra cómo utilizar la configuración anterior para transmitir datos a MongoDB:
<streaming DataFrame>.writeStream \ .format("mongodb") \ .option("spark.mongodb.connection.uri", <mongodb-connection-string>) \ .option("spark.mongodb.database", <database-name>) \ .option("spark.mongodb.collection", <collection-name>) \ .outputMode("append")
Para obtener una lista completa de funciones, consulte la referencia de transmisión estructurada de pyspark.
Para escribir datos en MongoDB, llame al método write en su objeto DataFrame. Este método devuelve un objeto DataStreamWriter, que puede usar para especificar el formato y otras opciones de configuración para su operación de escritura en streaming.
Debe especificar las siguientes configuraciones para escribir en MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de salida subyacente. Use |
| Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y la colección de MongoDB y el directorio de puntos de control. Para obtener una lista de las opciones de configuración de secuencias de escritura, consulte la guía Opciones de configuración de escritura en streaming. |
| Especifica cómo el conector Spark escribe un DataFrame de streaming en un receptor de streaming. Para ver una lista de todos los modos de salida compatibles, consulte la documentación de Scala outputMode. |
| Especifica la frecuencia con la que el conector Spark escribe resultados en el receptor de streaming. Llame a este método en el objeto Para usar el procesamiento continuo, pase Para ver una lista de todas las políticas de procesamiento compatibles, consulte la documentación del activador de Scala. |
El siguiente fragmento de código muestra cómo utilizar la configuración anterior para transmitir datos a MongoDB:
<streaming DataFrame>.writeStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append")
Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de Scala.
Ejemplo
El siguiente ejemplo muestra cómo transmitir datos desde un ArchivoCSV a MongoDB:
Crea un objeto
DataStreamReaderque lee desde el archivo CSV.Para crear un objeto
DataStreamWriter, llame al métodowriteStream()en la transmisiónDataset<Row>que creó conDataStreamReader. Use el métodoformat()para especificarmongodbcomo formato de datos subyacente.Llame al método
start()en el objetoDataStreamWriterpara comenzar la transmisión.
A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("csv") .option("header", "true") .schema("<csv-schema>") .load("<csv-file-name>") // manipulate your streaming data .writeStream() .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
Crea un objeto
DataStreamReaderque lee desde el archivo CSV.Para crear un objeto
DataStreamWriter, llame a la funciónwriteStreamen la transmisiónDataFrameque creó conDataStreamReader. Use la funciónformat()para especificarmongodbcomo formato de datos subyacente.Llama a la función
start()en la instanciaDataStreamWriterpara iniciar el flujo.
A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.
# create a local SparkSession spark = SparkSession.builder \ .appName("writeExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define a streaming query dataStreamWriter = (spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) # manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") ) # run the query query = dataStreamWriter.start()
Crea un objeto
DataStreamReaderque lee desde el archivo CSV.Para crear un objeto
DataStreamWriter, llame al métodowriteStreamen la transmisiónDataFrameque creó conDataStreamReader. Use el métodoformat()para especificarmongodbcomo formato de datos subyacente.Llame al método
start()en la instanciaDataStreamWriterpara comenzar la transmisión.
A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.
// create a local SparkSession val spark = SparkSession.builder .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define a streaming query val dataStreamWriter = spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) // manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") // run the query val query = dataStreamWriter.start()
Documentación de la API
Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark: