Docs Menu
Docs Home
/ /

Guardar en MongoDB en moda de transmisión.

Para escribir datos en MongoDB, llame al método writeStream() Método en el objeto Dataset<Row>. Este método devuelve un objeto DataStreamWriter, que puede usar para especificar el formato y otras opciones de configuración para la operación de escritura en streaming.

Debe especificar las siguientes configuraciones para escribir en MongoDB:

Configuración
Descripción

writeStream.format()

Especifica el formato de la fuente de datos de salida subyacente. Use mongodb para escribir en MongoDB.

writeStream.option()

Especifica la configuración de la transmisión, incluida la implementación de MongoDB cadena de conexión, base de datos y colección MongoDB y directorio de puntos de control.

Para obtener una lista de las opciones de configuración del flujo de escritura, consulte la Guíade opciones de configuración de escritura en streaming.

writeStream.outputMode()

Especifica cómo se escriben los datos de un DataFrame de streaming en un receptor de streaming. Para ver una lista de todos los modos de salida compatibles, consulte la documentación de Java outputMode.

writeStream.trigger()

Especifica la frecuencia con la que el conector Spark escribe resultados en el receptor de streaming. Llame a este método en el objeto DataStreamWriter que cree a partir del DataStreamReader que configure.

Para usar el procesamiento continuo, pase Trigger.Continuous(<time value>) como argumento, donde <time value> indica la frecuencia con la que desea que el conector Spark realice un punto de control asincrónico. Si pasa cualquier otro método estático de la clase Trigger, o si no llama a writeStream.trigger(), el conector Spark utiliza el procesamiento por microlotes.

Para ver una lista de todas las políticas de procesamiento admitidas, consulte la documentación del activador de Java.

El siguiente fragmento de código muestra cómo utilizar la configuración anterior para transmitir datos a MongoDB:

<streaming DataFrame>.writeStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append");

Para obtener una lista completa de métodos, consulte la referencia de Java Structured Streaming.

Para escribir datos en MongoDB, llame a la función writeStream en su objeto DataFrame. Esta función devuelve un objeto DataStreamWriter, que puede usar para especificar el formato y otras opciones de configuración para su operación de escritura en streaming.

Debe especificar las siguientes configuraciones para escribir en MongoDB:

Configuración
Descripción

writeStream.format()

Especifica el formato de la fuente de datos de salida subyacente. Use mongodb para escribir en MongoDB.

writeStream.option()

Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y la colección de MongoDB y el directorio de puntos de control.

Para obtener una lista de las opciones de configuración de secuencias de escritura, consulte la guía Opciones de configuración de escritura en streaming.

writeStream.outputMode()

Indica cómo el Spark Connector guarda un DataFrame de transmisión a un sink de transmisión. Para ver una lista de todos los modos de salida compatibles, consulta la documentación de pyspark outputMode.

writeStream.trigger()

Especifica la frecuencia con la que el conector Spark escribe resultados en el receptor de streaming. Llame a este método en el objeto DataStreamWriter que cree a partir del DataStreamReader que configure.

Para usar el procesamiento continuo, pase a la función un valor de tiempo mediante el parámetro continuous. Si pasa cualquier otro parámetro con nombre, o si no llama a writeStream.trigger(), el conector Spark utiliza el procesamiento por microlotes.

Para ver una lista de todas las políticas de procesamiento admitidas, consulte la documentación del activador de pyspark.

El siguiente fragmento de código muestra cómo utilizar la configuración anterior para transmitir datos a MongoDB:

<streaming DataFrame>.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
.option("spark.mongodb.database", <database-name>) \
.option("spark.mongodb.collection", <collection-name>) \
.outputMode("append")

Para obtener una lista completa de funciones, consulte la referencia de transmisión estructurada de pyspark.

Para escribir datos en MongoDB, llame al método write en su objeto DataFrame. Este método devuelve un objeto DataStreamWriter, que puede usar para especificar el formato y otras opciones de configuración para su operación de escritura en streaming.

Debe especificar las siguientes configuraciones para escribir en MongoDB:

Configuración
Descripción

writeStream.format()

Especifica el formato de la fuente de datos de salida subyacente. Use mongodb para escribir en MongoDB.

writeStream.option()

Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y la colección de MongoDB y el directorio de puntos de control.

Para obtener una lista de las opciones de configuración de secuencias de escritura, consulte la guía Opciones de configuración de escritura en streaming.

writeStream.outputMode()

Especifica cómo el conector Spark escribe un DataFrame de streaming en un receptor de streaming. Para ver una lista de todos los modos de salida compatibles, consulte la documentación de Scala outputMode.

writeStream.trigger()

Especifica la frecuencia con la que el conector Spark escribe resultados en el receptor de streaming. Llame a este método en el objeto DataStreamWriter que cree a partir del DataStreamReader que configure.

Para usar el procesamiento continuo, pase Trigger.Continuous(<time value>) como argumento, donde <time value> indica la frecuencia con la que desea que el conector Spark realice un punto de control asincrónico. Si pasa cualquier otro método estático de la clase Trigger, o si no llama a writeStream.trigger(), el conector Spark utiliza el procesamiento por microlotes.

Para ver una lista de todas las políticas de procesamiento compatibles, consulte la documentación del activador de Scala.

El siguiente fragmento de código muestra cómo utilizar la configuración anterior para transmitir datos a MongoDB:

<streaming DataFrame>.writeStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")

Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de Scala.

El siguiente ejemplo muestra cómo transmitir datos desde un ArchivoCSV a MongoDB:

  1. Crea un objeto DataStreamReader que lee desde el archivo CSV.

  2. Para crear un objeto DataStreamWriter, llame al método writeStream() en la transmisión Dataset<Row> que creó con DataStreamReader. Use el método format() para especificar mongodb como formato de datos subyacente.

  3. Llame al método start() en el objeto DataStreamWriter para comenzar la transmisión.

A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("csv")
.option("header", "true")
.schema("<csv-schema>")
.load("<csv-file-name>")
// manipulate your streaming data
.writeStream()
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Crea un objeto DataStreamReader que lee desde el archivo CSV.

  2. Para crear un objeto DataStreamWriter, llame a la función writeStream en la transmisión DataFrame que creó con DataStreamReader. Use la función format() para especificar mongodb como formato de datos subyacente.

  3. Llama a la función start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.

# create a local SparkSession
spark = SparkSession.builder \
.appName("writeExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define a streaming query
dataStreamWriter = (spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
# manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Crea un objeto DataStreamReader que lee desde el archivo CSV.

  2. Para crear un objeto DataStreamWriter, llame al método writeStream en la transmisión DataFrame que creó con DataStreamReader. Use el método format() para especificar mongodb como formato de datos subyacente.

  3. Llame al método start() en la instancia DataStreamWriter para comenzar la transmisión.

A medida que el conector lee datos del archivo CSV, agrega esos datos a MongoDB según el outputMode que especifique.

// create a local SparkSession
val spark = SparkSession.builder
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define a streaming query
val dataStreamWriter = spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
// manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark:

Volver

Configuración