Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Guardar en MongoDB en moda de transmisión.

Para guardar datos en MongoDB, llama a la writeStream() método en tu objeto Dataset<Row>. Este método devuelve un objeto DataStreamWriter, que puedes usar para especificar el formato y otras configuraciones para tu operación de guardado en transmisión.

Debe especificar las siguientes configuraciones para escribir en MongoDB:

Configuración
Descripción

writeStream.format()

Especifica el formato de la fuente de datos de salida subyacente. Use mongodb para guardar en MongoDB.

writeStream.option()

Especifica la configuración del stream, incluyendo la implementación de MongoDB. cadena de conexión, base de datos y colección MongoDB y directorio de puntos de control.

Para obtener una lista de las opciones de configuración del flujo de guardar, consulte el Guía de opciones de configuración de escritura por transmisión.

writeStream.outputMode()

Especifica cómo los datos de un DataFrame en transmisión se escriben en un destino de transmisión. Para ver una lista de todos los modos de salida compatibles, consulta la documentación del outputMode de Java.

writeStream.trigger()

Especifica con qué frecuencia el Spark Connector escribe resultados en el destino de transmisión. Llame a este método en el DataStreamWriter objeto que cree a partir del DataStreamReader que configure.

Para usar el procesamiento continuo, pase Trigger.Continuous(<time value>) como argumento, donde <time value> indica con qué frecuencia quieres que el Spark Connector haga un punto de control asincrónico. Si pasas cualquier otro método estático de la clase Trigger, o si no llamas a writeStream.trigger(), el Spark Connector utiliza el procesamiento por microgrupos en su lugar.

Para ver una lista de todas las políticas de procesamiento admitidas, consulte la documentación del activador de Java.

El siguiente snippet de código muestra cómo utilizar la configuración anterior para enviar datos por streaming a MongoDB:

<streaming DataFrame>.writeStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append");

Para obtener una lista completa de métodos, consulte la referencia de Java Structured Streaming.

Para escribir datos en MongoDB, llame a la función writeStream en su objeto DataFrame. Esta función devuelve un objeto DataStreamWriter, que puede usar para especificar el formato y otras opciones de configuración para su operación de escritura en streaming.

Debe especificar las siguientes configuraciones para escribir en MongoDB:

Configuración
Descripción

writeStream.format()

Especifica el formato de la fuente de datos de salida subyacente. Use mongodb para guardar en MongoDB.

writeStream.option()

Especifica la configuración del flujo, incluido la cadena de conexión de la implementación de MongoDB, la base de datos y colección de MongoDB, y el directorio de puntos de control.

Para obtener una lista de opciones de configuración de transmisión de guardar, consulta la guía Opciones de configuración de guardar de transmisión.

writeStream.outputMode()

Indica cómo el Spark Connector guarda un DataFrame de transmisión a un sink de transmisión. Para ver una lista de todos los modos de salida compatibles, consulta la documentación de pyspark outputMode.

writeStream.trigger()

Especifica con qué frecuencia el Spark Connector escribe resultados en el destino de transmisión. Llame a este método en el DataStreamWriter objeto que cree a partir del DataStreamReader que configure.

Para usar el procesamiento continuo, pase a la función un valor de tiempo mediante el parámetro continuous. Si pasa cualquier otro parámetro con nombre, o si no llama a writeStream.trigger(), el conector Spark utiliza el procesamiento por microlotes.

Para consultar la lista de todas las políticas de procesamiento admitidas, consulta la documentación de activador de pyspark.

El siguiente snippet de código muestra cómo utilizar la configuración anterior para enviar datos por streaming a MongoDB:

<streaming DataFrame>.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
.option("spark.mongodb.database", <database-name>) \
.option("spark.mongodb.collection", <collection-name>) \
.outputMode("append")

Para obtener una lista completa de las funciones, consulta la referencia de Structured transmisión de pyspark.

Para guardar datos en MongoDB, llame al método write en su objeto DataFrame. Este método devuelve un objeto DataStreamWriter, que puedes usar para especificar el formato y otras configuraciones para tu operación de guardado en transmisión.

Debe especificar las siguientes configuraciones para escribir en MongoDB:

Configuración
Descripción

writeStream.format()

Especifica el formato de la fuente de datos de salida subyacente. Use mongodb para guardar en MongoDB.

writeStream.option()

Especifica la configuración del flujo, incluido la cadena de conexión de la implementación de MongoDB, la base de datos y colección de MongoDB, y el directorio de puntos de control.

Para obtener una lista de opciones de configuración de transmisión de guardar, consulta la guía Opciones de configuración de guardar de transmisión.

writeStream.outputMode()

Especifica cómo el Spark Connector guarda un DataFrame de transmisión en un sink de transmisión. Para ver una lista de todos los modos de salida admitidos, consulta la documentación del modo de salida de Scala.

writeStream.trigger()

Especifica con qué frecuencia el Spark Connector escribe resultados en el destino de transmisión. Llame a este método en el DataStreamWriter objeto que cree a partir del DataStreamReader que configure.

Para usar el procesamiento continuo, pase Trigger.Continuous(<time value>) como argumento, donde <time value> indica con qué frecuencia quieres que el Spark Connector haga un punto de control asincrónico. Si pasas cualquier otro método estático de la clase Trigger, o si no llamas a writeStream.trigger(), el Spark Connector utiliza el procesamiento por microgrupos en su lugar.

Para ver una lista de todas las políticas de procesamiento admitidas, consulta la documentación del activador de Scala.

El siguiente snippet de código muestra cómo utilizar la configuración anterior para enviar datos por streaming a MongoDB:

<streaming DataFrame>.writeStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")

Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de Scala.

El siguiente ejemplo muestra cómo transmitir datos desde un CSV archivo a MongoDB:

  1. Crea un objeto DataStreamReader que lea desde el archivo CSV.

  2. Para crear un objeto DataStreamWriter, llame al método writeStream() en la transmisión Dataset<Row> que creó con DataStreamReader. Use el método format() para especificar mongodb como formato de datos subyacente.

  3. Llame al método start() en el objeto DataStreamWriter para comenzar la transmisión.

A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("csv")
.option("header", "true")
.schema("<csv-schema>")
.load("<csv-file-name>")
// manipulate your streaming data
.writeStream()
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Crea un objeto DataStreamReader que lea desde el archivo CSV.

  2. Para crear un objeto DataStreamWriter, llama a la función writeStream en la transmisión DataFrame que creaste con el DataStreamReader. Usa la función format() para especificar mongodb como el formato de datos subyacente.

  3. Llama a la función start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.

# create a local SparkSession
spark = SparkSession.builder \
.appName("writeExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define a streaming query
dataStreamWriter = (spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
# manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Crea un objeto DataStreamReader que lea desde el archivo CSV.

  2. Para crear un objeto DataStreamWriter, llame al método writeStream en la transmisión DataFrame que creó con DataStreamReader. Use el método format() para especificar mongodb como formato de datos subyacente.

  3. Llama al método start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.

// create a local SparkSession
val spark = SparkSession.builder
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define a streaming query
val dataStreamWriter = spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
// manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark:

Volver

Configuración