Docs Menu
Docs Home
/ /

Leer desde MongoDB en moda de transmisión

Al leer un flujo de datos de una base de datos MongoDB, el conector Spark de MongoDB admite tanto el procesamiento por microlotes como el procesamiento continuo. El procesamiento por microlotes, el motor de procesamiento predeterminado, alcanza latencias de extremo a extremo de tan solo 100 milisegundos con garantías de tolerancia a fallos de un solo intento. El procesamiento continuo es una función experimental introducida en la versión de Spark 2.3 que alcanza latencias de extremo a extremo de tan solo 1 milisegundos con garantías de tolerancia a fallos de al menos un intento.

Para obtener más información sobre el procesamiento continuo, consulte la Documentación de Spark.

Nota

El conector lee desde el flujo de cambios de tu implementación de MongoDB. Para generar eventos de cambio en el flujo de cambios, realiza operaciones de actualización en tu base de datos.

Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB.

Para leer datos de MongoDB, llame al readStream() Método en el objeto SparkSession. Este método devuelve un objeto DataStreamReader, que puede usar para especificar el formato y otras opciones de configuración para la operación de lectura en streaming.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

readStream.format()

Especifica el formato de la fuente de datos de entrada subyacente. Use mongodb para leer desde MongoDB.

readStream.option()

Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación.

Para obtener una lista de las opciones de configuración del flujo de lectura, consulte la Guíade opciones de configuración de lectura en streaming.

readStream.schema()

Especifica el esquema de entrada.

El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() habilita el procesamiento continuo.

import org.apache.spark.sql.streaming.Trigger;
Dataset<Row> streamingDataset = <local SparkSession>.readStream()
.format("mongodb")
.load();
DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append");
StreamingQuery query = dataStreamWriter.start();

Nota

Spark no comienza a transmitir hasta que llamas al método start() en una consulta de transmisión.

Para obtener una lista completa de métodos, consulte la referencia de Java Structured Streaming.

Para leer datos de MongoDB, llame a la función readStream en su objeto SparkSession. Esta función devuelve un objeto DataStreamReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura en streaming.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

readStream.format()

Especifica el formato de la fuente de datos de entrada subyacente. Use mongodb para leer desde MongoDB.

readStream.option()

Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación.

Para obtener una lista de las opciones de configuración de transmisión de lectura, consulte la guía Opciones de configuración de transmisión de lectura.

readStream.schema()

Especifica el esquema de entrada.

El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro continuous al método trigger() habilita el procesamiento continuo.

streamingDataFrame = (<local SparkSession>.readStream
.format("mongodb")
.load()
)
dataStreamWriter = (streamingDataFrame.writeStream
.trigger(continuous="1 second")
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
)
query = dataStreamWriter.start()

Nota

Spark no comienza a transmitir hasta que llamas al método start() en una consulta de transmisión.

Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de pyspark.

Para leer datos de MongoDB, llame al método readStream en su objeto SparkSession. Este método devuelve un objeto DataStreamReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura en streaming.

Debe especificar las siguientes configuraciones para leer desde MongoDB:

Configuración
Descripción

readStream.format()

Especifica el formato de la fuente de datos de entrada subyacente. Use mongodb para leer desde MongoDB.

readStream.option()

Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación.

Para obtener una lista de las opciones de configuración de transmisión de lectura, consulte la guía Opciones de configuración de transmisión de lectura.

readStream.schema()

Especifica el esquema de entrada.

El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() habilita el procesamiento continuo.

import org.apache.spark.sql.streaming.Trigger
val streamingDataFrame = <local SparkSession>.readStream
.format("mongodb")
.load()
val dataStreamWriter = streamingDataFrame.writeStream
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
val query = dataStreamWriter.start()

Nota

Spark no comienza a transmitir hasta que llamas al método start() en una consulta de transmisión.

Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de Scala.

El siguiente ejemplo muestra cómo transmitir datos desde MongoDB a su consola.

  1. Crea un objeto DataStreamReader que lee desde MongoDB.

  2. Cree un objeto DataStreamWriter llamando al método writeStream() en el objeto de transmisión Dataset que creó con un DataStreamReader. Especifique el formato console con el método format().

  3. Llame al método start() en la instancia DataStreamWriter para comenzar la transmisión.

A medida que se insertan nuevos datos en MongoDB, MongoDB transmite esos datos a su consola según el outputMode que usted especifique.

Importante

Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define the schema of the source collection
StructType readSchema = new StructType()
.add("company_symbol", DataTypes.StringType)
.add("company_name", DataTypes.StringType)
.add("price", DataTypes.DoubleType)
.add("tx_time", DataTypes.TimestampType);
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream()
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Crea un objeto DataStreamReader que lee desde MongoDB.

  2. Cree un objeto DataStreamWriter llamando al método writeStream() en la transmisión DataFrame que creó con un DataStreamReader. Especifique el formato console con el método format().

  3. Llame al método start() en la instancia DataStreamWriter para comenzar la transmisión.

A medida que se insertan nuevos datos en MongoDB, MongoDB transmite esos datos a su consola según el outputMode que usted especifique.

Importante

Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.

# create a local SparkSession
spark = SparkSession.builder \
.appName("readExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define the schema of the source collection
readSchema = (StructType()
.add('company_symbol', StringType())
.add('company_name', StringType())
.add('price', DoubleType())
.add('tx_time', TimestampType())
)
# define a streaming query
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Crea un objeto DataStreamReader que lee desde MongoDB.

  2. Cree un objeto DataStreamWriter llamando al método writeStream() en el objeto de transmisión DataFrame que creó con DataStreamReader. Especifique el formato console con el método format().

  3. Llame al método start() en la instancia DataStreamWriter para comenzar la transmisión.

A medida que se insertan nuevos datos en MongoDB, MongoDB transmite esos datos a su consola según el outputMode que usted especifique.

Importante

Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.

// create a local SparkSession
val spark = SparkSession.builder
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define the schema of the source collection
val readSchema = StructType()
.add("company_symbol", StringType())
.add("company_name", StringType())
.add("price", DoubleType())
.add("tx_time", TimestampType())
// define a streaming query
val dataStreamWriter = spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

Importante

Inferir el esquema de un flujo de cambios

Si configura la opción change.stream.publish.full.document.only en true, el conector Spark infiere el esquema de un DataFrame utilizando el esquema de los documentos escaneados. Si configura la opción en false, debe especificar un esquema.

Para obtener más información sobre esta configuración y ver una lista completa de las opciones de configuración del flujo de cambios, consulte la guía Opciones de configuración de lectura.

Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark:

Volver

Moda de transmisión

En esta página