Para guardar datos en MongoDB, llama a la writeStream() método en tu objeto Dataset<Row>. Este método devuelve un objeto DataStreamWriter, que puedes usar para especificar el formato y otras configuraciones para tu operación de guardado en transmisión.
Debe especificar las siguientes configuraciones para escribir en MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de salida subyacente. Use |
| Especifica la configuración del stream, incluyendo la implementación de MongoDB. cadena de conexión, base de datos y colección MongoDB y directorio de puntos de control. Para obtener una lista de las opciones de configuración del flujo de guardar, consulte el Guía de opciones de configuración de escritura por transmisión. |
| Especifica cómo los datos de un DataFrame en transmisión se escriben en un destino de transmisión. Para ver una lista de todos los modos de salida compatibles, consulta la documentación del outputMode de Java. |
| Especifica con qué frecuencia el Spark Connector escribe resultados en el destino de transmisión. Llame a este método en el Para usar el procesamiento continuo, pase Para ver una lista de todas las políticas de procesamiento admitidas, consulte la documentación del activador de Java. |
El siguiente snippet de código muestra cómo utilizar la configuración anterior para enviar datos por streaming a MongoDB:
<streaming DataFrame>.writeStream() .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append");
Para obtener una lista completa de métodos, consulte la referencia de Java Structured Streaming.
Para escribir datos en MongoDB, llame a la función writeStream en su objeto DataFrame. Esta función devuelve un objeto DataStreamWriter, que puede usar para especificar el formato y otras opciones de configuración para su operación de escritura en streaming.
Debe especificar las siguientes configuraciones para escribir en MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de salida subyacente. Use |
| Especifica la configuración del flujo, incluido la cadena de conexión de la implementación de MongoDB, la base de datos y colección de MongoDB, y el directorio de puntos de control. Para obtener una lista de opciones de configuración de transmisión de guardar, consulta la guía Opciones de configuración de guardar de transmisión. |
| Indica cómo el Spark Connector guarda un DataFrame de transmisión a un sink de transmisión. Para ver una lista de todos los modos de salida compatibles, consulta la documentación de pyspark outputMode. |
| Especifica con qué frecuencia el Spark Connector escribe resultados en el destino de transmisión. Llame a este método en el Para usar el procesamiento continuo, pase a la función un valor de tiempo mediante el parámetro Para consultar la lista de todas las políticas de procesamiento admitidas, consulta la documentación de activador de pyspark. |
El siguiente snippet de código muestra cómo utilizar la configuración anterior para enviar datos por streaming a MongoDB:
<streaming DataFrame>.writeStream \ .format("mongodb") \ .option("spark.mongodb.connection.uri", <mongodb-connection-string>) \ .option("spark.mongodb.database", <database-name>) \ .option("spark.mongodb.collection", <collection-name>) \ .outputMode("append")
Para obtener una lista completa de las funciones, consulta la referencia de Structured transmisión de pyspark.
Para guardar datos en MongoDB, llame al método write en su objeto DataFrame. Este método devuelve un objeto DataStreamWriter, que puedes usar para especificar el formato y otras configuraciones para tu operación de guardado en transmisión.
Debe especificar las siguientes configuraciones para escribir en MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de salida subyacente. Use |
| Especifica la configuración del flujo, incluido la cadena de conexión de la implementación de MongoDB, la base de datos y colección de MongoDB, y el directorio de puntos de control. Para obtener una lista de opciones de configuración de transmisión de guardar, consulta la guía Opciones de configuración de guardar de transmisión. |
| Especifica cómo el Spark Connector guarda un DataFrame de transmisión en un sink de transmisión. Para ver una lista de todos los modos de salida admitidos, consulta la documentación del modo de salida de Scala. |
| Especifica con qué frecuencia el Spark Connector escribe resultados en el destino de transmisión. Llame a este método en el Para usar el procesamiento continuo, pase Para ver una lista de todas las políticas de procesamiento admitidas, consulta la documentación del activador de Scala. |
El siguiente snippet de código muestra cómo utilizar la configuración anterior para enviar datos por streaming a MongoDB:
<streaming DataFrame>.writeStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append")
Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de Scala.
Ejemplo
El siguiente ejemplo muestra cómo transmitir datos desde un CSV archivo a MongoDB:
Crea un objeto
DataStreamReaderque lea desde el archivo CSV.Para crear un objeto
DataStreamWriter, llame al métodowriteStream()en la transmisiónDataset<Row>que creó conDataStreamReader. Use el métodoformat()para especificarmongodbcomo formato de datos subyacente.Llame al método
start()en el objetoDataStreamWriterpara comenzar la transmisión.
A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("csv") .option("header", "true") .schema("<csv-schema>") .load("<csv-file-name>") // manipulate your streaming data .writeStream() .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
Crea un objeto
DataStreamReaderque lea desde el archivo CSV.Para crear un objeto
DataStreamWriter, llama a la funciónwriteStreamen la transmisiónDataFrameque creaste con elDataStreamReader. Usa la funciónformat()para especificarmongodbcomo el formato de datos subyacente.Llama a la función
start()en la instanciaDataStreamWriterpara iniciar el flujo.
A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.
# create a local SparkSession spark = SparkSession.builder \ .appName("writeExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define a streaming query dataStreamWriter = (spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) # manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") ) # run the query query = dataStreamWriter.start()
Crea un objeto
DataStreamReaderque lea desde el archivo CSV.Para crear un objeto
DataStreamWriter, llame al métodowriteStreamen la transmisiónDataFrameque creó conDataStreamReader. Use el métodoformat()para especificarmongodbcomo formato de datos subyacente.Llama al método
start()en la instanciaDataStreamWriterpara iniciar el flujo.
A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.
// create a local SparkSession val spark = SparkSession.builder .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define a streaming query val dataStreamWriter = spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) // manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") // run the query val query = dataStreamWriter.start()
Documentación de la API
Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark: