Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Guardar en MongoDB en moda de transmisión.

Para guardar datos en MongoDB, llama a la writeStream() método en tu objeto Dataset<Row>. Este método devuelve un objeto DataStreamWriter, que puedes usar para especificar el formato y otras configuraciones para tu operación de guardado en transmisión.

Debes especificar la siguiente configuración para guardar en MongoDB:

Configuración
Descripción

writeStream.format()

Especifica el formato de la fuente de datos de salida subyacente. Use mongodb para guardar en MongoDB.

writeStream.option()

Especifica la configuración del stream, incluyendo la implementación de MongoDB. cadena de conexión, base de datos y colección de MongoDB, y directorio de punto de control.

Para obtener una lista de las opciones de configuración del flujo de guardar, consulte el Guía de opciones de configuración de escritura por transmisión.

writeStream.outputMode()

Especifica cómo los datos de un DataFrame en transmisión se escriben en un destino de transmisión. Para ver una lista de todos los modos de salida compatibles, consulta la documentación del outputMode de Java.

writeStream.trigger()

Especifica con qué frecuencia el Spark Connector escribe resultados en el destino de transmisión. Llame a este método en el DataStreamWriter objeto que cree a partir del DataStreamReader que configure.

Para usar el procesamiento continuo, pase Trigger.Continuous(<time value>) como argumento, donde <time value> indica con qué frecuencia quieres que el Spark Connector haga un punto de control asincrónico. Si pasas cualquier otro método estático de la clase Trigger, o si no llamas a writeStream.trigger(), el Spark Connector utiliza el procesamiento por microgrupos en su lugar.

Para ver una lista de todas las políticas de procesamiento compatibles, consulta la Documentación del activador Java.

El siguiente snippet de código muestra cómo utilizar la configuración anterior para enviar datos por streaming a MongoDB:

<streaming DataFrame>.writeStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append");

Para obtener una lista completa de métodos, consulta la referencia de transmisión estructurada de Java.

Para guardar datos en MongoDB, llama a la función writeStream en tu objeto DataFrame. Esta función devuelve un objeto DataStreamWriter, que puedes usar para especificar el formato y otros ajustes de configuración para tu operación de escritura en transmisión.

Debes especificar la siguiente configuración para guardar en MongoDB:

Configuración
Descripción

writeStream.format()

Especifica el formato de la fuente de datos de salida subyacente. Use mongodb para guardar en MongoDB.

writeStream.option()

Especifica la configuración del flujo, incluido la cadena de conexión de la implementación de MongoDB, la base de datos y colección de MongoDB, y el directorio de puntos de control.

Para obtener una lista de opciones de configuración de transmisión de guardar, consulta la guía Opciones de configuración de guardar de transmisión.

writeStream.outputMode()

Indica cómo el Spark Connector guarda un DataFrame de transmisión a un sink de transmisión. Para ver una lista de todos los modos de salida compatibles, consulta la documentación de pyspark outputMode.

writeStream.trigger()

Especifica con qué frecuencia el Spark Connector escribe resultados en el destino de transmisión. Llame a este método en el DataStreamWriter objeto que cree a partir del DataStreamReader que configure.

Para utilizar el procesamiento continuo, debe pasarle a la función un valor de tiempo usando el parámetro continuous. Si se pasa cualquier otro parámetro con nombre, o si no se llama a writeStream.trigger(), el Spark Connector utiliza el procesamiento por micro-lotes en su lugar.

Para consultar la lista de todas las políticas de procesamiento admitidas, consulta la documentación de activador de pyspark.

El siguiente snippet de código muestra cómo utilizar la configuración anterior para enviar datos por streaming a MongoDB:

<streaming DataFrame>.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
.option("spark.mongodb.database", <database-name>) \
.option("spark.mongodb.collection", <collection-name>) \
.outputMode("append")

Para obtener una lista completa de las funciones, consulta la referencia de Structured transmisión de pyspark.

Para guardar datos en MongoDB, llame al método write en su objeto DataFrame. Este método devuelve un objeto DataStreamWriter, que puedes usar para especificar el formato y otras configuraciones para tu operación de guardado en transmisión.

Debes especificar la siguiente configuración para guardar en MongoDB:

Configuración
Descripción

writeStream.format()

Especifica el formato de la fuente de datos de salida subyacente. Use mongodb para guardar en MongoDB.

writeStream.option()

Especifica la configuración del flujo, incluido la cadena de conexión de la implementación de MongoDB, la base de datos y colección de MongoDB, y el directorio de puntos de control.

Para obtener una lista de opciones de configuración de transmisión de guardar, consulta la guía Opciones de configuración de guardar de transmisión.

writeStream.outputMode()

Especifica cómo el Spark Connector guarda un DataFrame de transmisión en un sink de transmisión. Para ver una lista de todos los modos de salida admitidos, consulta la documentación del modo de salida de Scala.

writeStream.trigger()

Especifica con qué frecuencia el Spark Connector escribe resultados en el destino de transmisión. Llame a este método en el DataStreamWriter objeto que cree a partir del DataStreamReader que configure.

Para usar el procesamiento continuo, pase Trigger.Continuous(<time value>) como argumento, donde <time value> indica con qué frecuencia quieres que el Spark Connector haga un punto de control asincrónico. Si pasas cualquier otro método estático de la clase Trigger, o si no llamas a writeStream.trigger(), el Spark Connector utiliza el procesamiento por microgrupos en su lugar.

Para ver una lista de todas las políticas de procesamiento admitidas, consulta la documentación del activador de Scala.

El siguiente snippet de código muestra cómo utilizar la configuración anterior para enviar datos por streaming a MongoDB:

<streaming DataFrame>.writeStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")

Para obtener una lista completa de métodos, consulta la Reference de transmisión Estructurada de Scala.

El siguiente ejemplo muestra cómo transmitir datos desde un CSV archivo a MongoDB:

  1. Crea un objeto DataStreamReader que lea desde el archivo CSV.

  2. Para crear un DataStreamWriter objeto, llama al método writeStream() en la transmisión Dataset<Row> que creaste con el DataStreamReader. Utiliza el método format() para especificar mongodb como el formato de datos subyacente.

  3. Llama al método start() en el objeto DataStreamWriter para comenzar el stream.

A medida que el conector lee datos del archivo CSV, los agrega a MongoDB según el outputMode que especifique.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("csv")
.option("header", "true")
.schema("<csv-schema>")
.load("<csv-file-name>")
// manipulate your streaming data
.writeStream()
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Crea un objeto DataStreamReader que lea desde el archivo CSV.

  2. Para crear un objeto DataStreamWriter, llama a la función writeStream en la transmisión DataFrame que creaste con el DataStreamReader. Usa la función format() para especificar mongodb como el formato de datos subyacente.

  3. Llama a la función start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que el conector lee datos del archivo CSV, los agrega a MongoDB según el outputMode que especifique.

# create a local SparkSession
spark = SparkSession.builder \
.appName("writeExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define a streaming query
dataStreamWriter = (spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
# manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Crea un objeto DataStreamReader que lea desde el archivo CSV.

  2. Para crear un DataStreamWriter objeto, llama al método writeStream en la transmisión DataFrame que creaste con el DataStreamReader. Utiliza el método format() para especificar mongodb como el formato de datos subyacente.

  3. Llama al método start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que el conector lee datos del archivo CSV, los agrega a MongoDB según el outputMode que especifique.

// create a local SparkSession
val spark = SparkSession.builder
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define a streaming query
val dataStreamWriter = spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
// manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

Para obtener más información sobre los tipos utilizados en estos ejemplos, consulta la siguiente documentación de la API de Apache Spark:

Volver

Configuración