Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Leer desde MongoDB en moda de transmisión

Cuando se lee un flujo desde una base de datos MongoDB, el Spark Connector de MongoDB admite tanto el procesamiento por micro-lotes como el procesamiento continuo. El procesamiento por micro-lotes, el motor de procesamiento predeterminado, logra una latencia de extremo a extremo tan baja como 100 milisegundos con garantías de tolerancia a fallos exactamente una vez. El procesamiento continuo es una funcionalidad experimental introducida en Spark versión 2.3 que logra latencias de extremo a extremo tan bajas como 1 milisegundo con garantías de al menos una vez.

Para obtener más información sobre el procesamiento continuo, véase la Documentación de Spark.

Nota

El conector lee desde el flujo de cambios de tu implementación de MongoDB. Para generar eventos de cambio en el flujo de cambios, realiza operaciones de actualización en tu base de datos.

Para obtener más información sobre los Change Streams, consulte Change Streams en el manual de MongoDB.

Para leer datos de MongoDB, ejecuta la readStream() método en tu objeto SparkSession. Este método devuelve un DataStreamReader objeto, que puedes usar para especificar el formato y otras configuraciones de tu operación lectura en transmisión.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

readStream.format()

Especifica el formato de la fuente de datos de entrada subyacente. Utiliza mongodb para leer desde MongoDB.

readStream.option()

Especifica la configuración de flujo, incluyendo la cadena de conexión de la implementación de MongoDB, la base de datos y colección de MongoDB, y las etapas del pipeline de agregación.

Para ver una lista de opciones de configuración de lectura de transmisión, consulte el Guía de Opciones de configuración de lectura transmisión.

readStream.schema()

Especifica el esquema de entrada.

El siguiente snippet muestra cómo utilizar la configuración anterior para procesar continuamente datos transmitidos desde MongoDB. El conector agrega todos los datos nuevos a los datos existentes y guarda de forma asíncrona los puntos de control en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() permite el procesamiento continuo.

import org.apache.spark.sql.streaming.Trigger;
Dataset<Row> streamingDataset = <local SparkSession>.readStream()
.format("mongodb")
.load();
DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append");
StreamingQuery query = dataStreamWriter.start();

Nota

Spark no comienza a realizar transmisión hasta que se llama al método start() en una query de transmisión.

Para obtener una lista completa de métodos, consulta la referencia de transmisión estructurada de Java.

Para leer datos de MongoDB, llama a la función readStream en tu objeto SparkSession. Esta función devuelve un objeto DataStreamReader, que puedes usar para especificar el formato y otras configuraciones para la lectura en transmisión.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

readStream.format()

Especifica el formato de la fuente de datos de entrada subyacente. Utiliza mongodb para leer desde MongoDB.

readStream.option()

Especifica la configuración de flujo, incluyendo la cadena de conexión de la implementación de MongoDB, la base de datos y colección de MongoDB, y las etapas del pipeline de agregación.

Para obtener una lista de opciones de configuración de transmisión de lectura, consulta la guía Opciones de configuración de transmisión de lectura.

readStream.schema()

Especifica el esquema de entrada.

El siguiente snippet muestra cómo utilizar la configuración anterior para procesar continuamente datos transmitidos desde MongoDB. El conector agrega todos los datos nuevos a los datos existentes y guarda de forma asíncrona los puntos de control en /tmp/checkpointDir una vez por segundo. Pasar el parámetro continuous al método trigger() permite el procesamiento continuo.

streamingDataFrame = (<local SparkSession>.readStream
.format("mongodb")
.load()
)
dataStreamWriter = (streamingDataFrame.writeStream
.trigger(continuous="1 second")
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
)
query = dataStreamWriter.start()

Nota

Spark no comienza a realizar transmisión hasta que se llama al método start() en una query de transmisión.

Para obtener una lista completa de métodos, consulta la referencia de Structured transmisión en pyspark.

Para leer datos de MongoDB, llamar al método readStream en el objeto SparkSession. Este método devuelve un objeto DataStreamReader, que puedes usar para especificar el formato y otros ajustes de configuración para tu operación de lectura por transmisión.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

readStream.format()

Especifica el formato de la fuente de datos de entrada subyacente. Utiliza mongodb para leer desde MongoDB.

readStream.option()

Especifica la configuración de flujo, incluyendo la cadena de conexión de la implementación de MongoDB, la base de datos y colección de MongoDB, y las etapas del pipeline de agregación.

Para obtener una lista de opciones de configuración de transmisión de lectura, consulta la guía Opciones de configuración de transmisión de lectura.

readStream.schema()

Especifica el esquema de entrada.

El siguiente snippet muestra cómo utilizar la configuración anterior para procesar continuamente datos transmitidos desde MongoDB. El conector agrega todos los datos nuevos a los datos existentes y guarda de forma asíncrona los puntos de control en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() permite el procesamiento continuo.

import org.apache.spark.sql.streaming.Trigger
val streamingDataFrame = <local SparkSession>.readStream
.format("mongodb")
.load()
val dataStreamWriter = streamingDataFrame.writeStream
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
val query = dataStreamWriter.start()

Nota

Spark no comienza a realizar transmisión hasta que se llama al método start() en una query de transmisión.

Para obtener una lista completa de métodos, consulta la Reference de transmisión Estructurada de Scala.

El próximo ejemplo muestra cómo hacer streaming de datos desde MongoDB a tu consola.

  1. Crea un objeto DataStreamReader que lea desde MongoDB.

  2. Cree un objeto DataStreamWriter llamando al método writeStream() sobre el objeto de transmisión Dataset que creó con un DataStreamReader. Especifica el formato console utilizando el método format().

  3. Llama al método start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que se insertan nuevos datos en MongoDB, MongoDB envía esos datos a la consola según el outputMode que se especifique.

Importante

Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define the schema of the source collection
StructType readSchema = new StructType()
.add("company_symbol", DataTypes.StringType)
.add("company_name", DataTypes.StringType)
.add("price", DataTypes.DoubleType)
.add("tx_time", DataTypes.TimestampType);
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream()
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Crea un objeto DataStreamReader que lea desde MongoDB.

  2. Crea un DataStreamWriter objeto llamando al método writeStream() en la transmisión DataFrame que has creado con un DataStreamReader. Especifica el formato console mediante el uso del método format().

  3. Llama al método start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que se insertan nuevos datos en MongoDB, MongoDB envía esos datos a la consola según el outputMode que se especifique.

Importante

Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.

# create a local SparkSession
spark = SparkSession.builder \
.appName("readExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define the schema of the source collection
readSchema = (StructType()
.add('company_symbol', StringType())
.add('company_name', StringType())
.add('price', DoubleType())
.add('tx_time', TimestampType())
)
# define a streaming query
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Crea un objeto DataStreamReader que lea desde MongoDB.

  2. Crea un objeto DataStreamWriter llamando al método writeStream() en el objeto de transmisión DataFrame que creaste utilizando el DataStreamReader. Especifique el formato console usando el método format().

  3. Llama al método start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que se insertan nuevos datos en MongoDB, MongoDB envía esos datos a la consola según el outputMode que se especifique.

Importante

Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.

// create a local SparkSession
val spark = SparkSession.builder
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define the schema of the source collection
val readSchema = StructType()
.add("company_symbol", StringType())
.add("company_name", StringType())
.add("price", DoubleType())
.add("tx_time", TimestampType())
// define a streaming query
val dataStreamWriter = spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

Importante

Inferencia del esquema de un flujo de cambios

Si configuras la opción change.stream.publish.full.document.only en true, el Conector de Spark infiere el esquema de un DataFrame usando el esquema de los documentos escaneados. Si configuras la opción en false, debes especificar un esquema.

Para obtener más información sobre esta configuración y ver una lista completa de las opciones de configuración del flujo de cambios, consulta la guía Leer opciones de configuración.

Para obtener más información sobre los tipos utilizados en estos ejemplos, consulta la siguiente documentación de la API de Apache Spark:

Volver

Moda de transmisión

En esta página