Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Leer desde MongoDB en moda de transmisión

Cuando se lee un flujo desde una base de datos MongoDB, el Spark Connector de MongoDB admite tanto el procesamiento por micro-lotes como el procesamiento continuo. El procesamiento por micro-lotes, el motor de procesamiento predeterminado, logra una latencia de extremo a extremo tan baja como 100 milisegundos con garantías de tolerancia a fallos exactamente una vez. El procesamiento continuo es una funcionalidad experimental introducida en Spark versión 2.3 que logra latencias de extremo a extremo tan bajas como 1 milisegundo con garantías de al menos una vez.

Para obtener más información sobre el procesamiento continuo, véase la Documentación de Spark.

Nota

El conector lee desde el flujo de cambios de tu implementación de MongoDB. Para generar eventos de cambio en el flujo de cambios, realiza operaciones de actualización en tu base de datos.

Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB.

Para leer datos de MongoDB, llame al readStream() método en tu objeto SparkSession. Este método devuelve un DataStreamReader objeto, que puedes usar para especificar el formato y otras configuraciones de tu operación lectura en transmisión.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

readStream.format()

Especifica el formato de la fuente de datos de entrada subyacente. Utiliza mongodb para leer desde MongoDB.

readStream.option()

Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación.

Para obtener una lista de las opciones de configuración del flujo de lectura, consulte la Guía de Opciones de configuración de lectura transmisión.

readStream.schema()

Especifica el esquema de entrada.

El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() habilita el procesamiento continuo.

import org.apache.spark.sql.streaming.Trigger;
Dataset<Row> streamingDataset = <local SparkSession>.readStream()
.format("mongodb")
.load();
DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append");
StreamingQuery query = dataStreamWriter.start();

Nota

Spark no comienza a realizar transmisión hasta que se llama al método start() en una query de transmisión.

Para obtener una lista completa de métodos, consulte la referencia de Java Structured Streaming.

Para leer datos de MongoDB, llama a la función readStream en tu objeto SparkSession. Esta función devuelve un objeto DataStreamReader, que puedes usar para especificar el formato y otras configuraciones para la lectura en transmisión.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

readStream.format()

Especifica el formato de la fuente de datos de entrada subyacente. Utiliza mongodb para leer desde MongoDB.

readStream.option()

Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación.

Para obtener una lista de opciones de configuración de transmisión de lectura, consulta la guía Opciones de configuración de transmisión de lectura.

readStream.schema()

Especifica el esquema de entrada.

El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro continuous al método trigger() habilita el procesamiento continuo.

streamingDataFrame = (<local SparkSession>.readStream
.format("mongodb")
.load()
)
dataStreamWriter = (streamingDataFrame.writeStream
.trigger(continuous="1 second")
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
)
query = dataStreamWriter.start()

Nota

Spark no comienza a realizar transmisión hasta que se llama al método start() en una query de transmisión.

Para obtener una lista completa de métodos, consulta la referencia de Structured transmisión en pyspark.

Para leer datos de MongoDB, llamar al método readStream en el objeto SparkSession. Este método devuelve un objeto DataStreamReader, que puedes usar para especificar el formato y otros ajustes de configuración para tu operación de lectura por transmisión.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

readStream.format()

Especifica el formato de la fuente de datos de entrada subyacente. Utiliza mongodb para leer desde MongoDB.

readStream.option()

Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación.

Para obtener una lista de opciones de configuración de transmisión de lectura, consulta la guía Opciones de configuración de transmisión de lectura.

readStream.schema()

Especifica el esquema de entrada.

El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() habilita el procesamiento continuo.

import org.apache.spark.sql.streaming.Trigger
val streamingDataFrame = <local SparkSession>.readStream
.format("mongodb")
.load()
val dataStreamWriter = streamingDataFrame.writeStream
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
val query = dataStreamWriter.start()

Nota

Spark no comienza a realizar transmisión hasta que se llama al método start() en una query de transmisión.

Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de Scala.

El siguiente ejemplo muestra cómo transmitir datos desde MongoDB a su consola.

  1. Crea un objeto DataStreamReader que lea desde MongoDB.

  2. Cree un objeto DataStreamWriter llamando al método writeStream() sobre el objeto de transmisión Dataset que creó con un DataStreamReader. Especifica el formato console utilizando el método format().

  3. Llama al método start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que se insertan nuevos datos en MongoDB, MongoDB envía esos datos a la consola según el outputMode que se especifique.

Importante

Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define the schema of the source collection
StructType readSchema = new StructType()
.add("company_symbol", DataTypes.StringType)
.add("company_name", DataTypes.StringType)
.add("price", DataTypes.DoubleType)
.add("tx_time", DataTypes.TimestampType);
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream()
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Crea un objeto DataStreamReader que lea desde MongoDB.

  2. Cree un objeto DataStreamWriter llamando al método writeStream() en la transmisión DataFrame que creó con un DataStreamReader. Especifique el formato console con el método format().

  3. Llama al método start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que se insertan nuevos datos en MongoDB, MongoDB envía esos datos a la consola según el outputMode que se especifique.

Importante

Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.

# create a local SparkSession
spark = SparkSession.builder \
.appName("readExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define the schema of the source collection
readSchema = (StructType()
.add('company_symbol', StringType())
.add('company_name', StringType())
.add('price', DoubleType())
.add('tx_time', TimestampType())
)
# define a streaming query
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Crea un objeto DataStreamReader que lea desde MongoDB.

  2. Crea un objeto DataStreamWriter llamando al método writeStream() en el objeto de transmisión DataFrame que creaste utilizando el DataStreamReader. Especifique el formato console usando el método format().

  3. Llama al método start() en la instancia DataStreamWriter para iniciar el flujo.

A medida que se insertan nuevos datos en MongoDB, MongoDB envía esos datos a la consola según el outputMode que se especifique.

Importante

Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.

// create a local SparkSession
val spark = SparkSession.builder
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define the schema of the source collection
val readSchema = StructType()
.add("company_symbol", StringType())
.add("company_name", StringType())
.add("price", DoubleType())
.add("tx_time", TimestampType())
// define a streaming query
val dataStreamWriter = spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

Importante

Inferencia del esquema de un flujo de cambios

Si configuras la opción change.stream.publish.full.document.only en true, el Conector de Spark infiere el esquema de un DataFrame usando el esquema de los documentos escaneados. Si configuras la opción en false, debes especificar un esquema.

Para obtener más información sobre esta configuración y ver una lista completa de las opciones de configuración del flujo de cambios, consulta la guía Leer opciones de configuración.

Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark:

Volver

Moda de transmisión

En esta página