Overview
Al leer un flujo de datos de una base de datos MongoDB, el conector Spark de MongoDB admite tanto el procesamiento por microlotes como el procesamiento continuo. El procesamiento por microlotes, el motor de procesamiento predeterminado, alcanza latencias de extremo a extremo de tan solo 100 milisegundos con garantías de tolerancia a fallos de un solo intento. El procesamiento continuo es una función experimental introducida en la versión de Spark 2.3 que alcanza latencias de extremo a extremo de tan solo 1 milisegundos con garantías de tolerancia a fallos de al menos un intento.
Para obtener más información sobre el procesamiento continuo, consulte la Documentación de Spark.
Nota
El conector lee desde el flujo de cambios de tu implementación de MongoDB. Para generar eventos de cambio en el flujo de cambios, realiza operaciones de actualización en tu base de datos.
Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB.
Para leer datos de MongoDB, llame al readStream() Método en el objeto SparkSession. Este método devuelve un objeto DataStreamReader, que puede usar para especificar el formato y otras opciones de configuración para la operación de lectura en streaming.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Use |
| Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación. Para obtener una lista de las opciones de configuración del flujo de lectura, consulte la Guíade opciones de configuración de lectura en streaming. |
| Especifica el esquema de entrada. |
El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() habilita el procesamiento continuo.
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream() .format("mongodb") .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream() .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append"); StreamingQuery query = dataStreamWriter.start();
Nota
Spark no comienza a transmitir hasta que llamas al método start() en una consulta de transmisión.
Para obtener una lista completa de métodos, consulte la referencia de Java Structured Streaming.
Para leer datos de MongoDB, llame a la función readStream en su objeto SparkSession. Esta función devuelve un objeto DataStreamReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura en streaming.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Use |
| Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación. Para obtener una lista de las opciones de configuración de transmisión de lectura, consulte la guía Opciones de configuración de transmisión de lectura. |
| Especifica el esquema de entrada. |
El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro continuous al método trigger() habilita el procesamiento continuo.
streamingDataFrame = (<local SparkSession>.readStream .format("mongodb") .load() ) dataStreamWriter = (streamingDataFrame.writeStream .trigger(continuous="1 second") .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") ) query = dataStreamWriter.start()
Nota
Spark no comienza a transmitir hasta que llamas al método start() en una consulta de transmisión.
Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de pyspark.
Para leer datos de MongoDB, llame al método readStream en su objeto SparkSession. Este método devuelve un objeto DataStreamReader, que puede usar para especificar el formato y otras opciones de configuración para su operación de lectura en streaming.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Use |
| Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación. Para obtener una lista de las opciones de configuración de transmisión de lectura, consulte la guía Opciones de configuración de transmisión de lectura. |
| Especifica el esquema de entrada. |
El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() habilita el procesamiento continuo.
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream .format("mongodb") .load() val dataStreamWriter = streamingDataFrame.writeStream .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") val query = dataStreamWriter.start()
Nota
Spark no comienza a transmitir hasta que llamas al método start() en una consulta de transmisión.
Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de Scala.
Ejemplo
El siguiente ejemplo muestra cómo transmitir datos desde MongoDB a su consola.
Crea un objeto
DataStreamReaderque lee desde MongoDB.Cree un objeto
DataStreamWriterllamando al métodowriteStream()en el objeto de transmisiónDatasetque creó con unDataStreamReader. Especifique el formatoconsolecon el métodoformat().Llame al método
start()en la instanciaDataStreamWriterpara comenzar la transmisión.
A medida que se insertan nuevos datos en MongoDB, MongoDB transmite esos datos a su consola según el outputMode que usted especifique.
Importante
Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType() .add("company_symbol", DataTypes.StringType) .add("company_name", DataTypes.StringType) .add("price", DataTypes.DoubleType) .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("mongodb") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .schema(readSchema) .load() // manipulate your streaming data .writeStream() .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
Crea un objeto
DataStreamReaderque lee desde MongoDB.Cree un objeto
DataStreamWriterllamando al métodowriteStream()en la transmisiónDataFrameque creó con unDataStreamReader. Especifique el formatoconsolecon el métodoformat().Llame al método
start()en la instanciaDataStreamWriterpara comenzar la transmisión.
A medida que se insertan nuevos datos en MongoDB, MongoDB transmite esos datos a su consola según el outputMode que usted especifique.
Importante
Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.
# create a local SparkSession spark = SparkSession.builder \ .appName("readExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define the schema of the source collection readSchema = (StructType() .add('company_symbol', StringType()) .add('company_name', StringType()) .add('price', DoubleType()) .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option('spark.mongodb.database', <database-name>) .option('spark.mongodb.collection', <collection-name>) .schema(readSchema) .load() # manipulate your streaming data .writeStream .format("console") .trigger(continuous="1 second") .outputMode("append") ) # run the query query = dataStreamWriter.start()
Crea un objeto
DataStreamReaderque lee desde MongoDB.Cree un objeto
DataStreamWriterllamando al métodowriteStream()en el objeto de transmisiónDataFrameque creó conDataStreamReader. Especifique el formatoconsolecon el métodoformat().Llame al método
start()en la instanciaDataStreamWriterpara comenzar la transmisión.
A medida que se insertan nuevos datos en MongoDB, MongoDB transmite esos datos a su consola según el outputMode que usted especifique.
Importante
Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.
// create a local SparkSession val spark = SparkSession.builder .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define the schema of the source collection val readSchema = StructType() .add("company_symbol", StringType()) .add("company_name", StringType()) .add("price", DoubleType()) .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .schema(readSchema) .load() // manipulate your streaming data .writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append") // run the query val query = dataStreamWriter.start()
Importante
Inferir el esquema de un flujo de cambios
Si configura la opción change.stream.publish.full.document.only en true, el conector Spark infiere el esquema de un DataFrame utilizando el esquema de los documentos escaneados. Si configura la opción en false, debe especificar un esquema.
Para obtener más información sobre esta configuración y ver una lista completa de las opciones de configuración del flujo de cambios, consulte la guía Opciones de configuración de lectura.
Documentación de la API
Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark: