Overview
Cuando se lee un flujo desde una base de datos MongoDB, el Spark Connector de MongoDB admite tanto el procesamiento por micro-lotes como el procesamiento continuo. El procesamiento por micro-lotes, el motor de procesamiento predeterminado, logra una latencia de extremo a extremo tan baja como 100 milisegundos con garantías de tolerancia a fallos exactamente una vez. El procesamiento continuo es una funcionalidad experimental introducida en Spark versión 2.3 que logra latencias de extremo a extremo tan bajas como 1 milisegundo con garantías de al menos una vez.
Para obtener más información sobre el procesamiento continuo, véase la Documentación de Spark.
Nota
El conector lee desde el flujo de cambios de tu implementación de MongoDB. Para generar eventos de cambio en el flujo de cambios, realiza operaciones de actualización en tu base de datos.
Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB.
Para leer datos de MongoDB, llame al readStream() método en tu objeto SparkSession. Este método devuelve un DataStreamReader objeto, que puedes usar para especificar el formato y otras configuraciones de tu operación lectura en transmisión.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Utiliza |
| Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación. Para obtener una lista de las opciones de configuración del flujo de lectura, consulte la Guía de Opciones de configuración de lectura transmisión. |
| Especifica el esquema de entrada. |
El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() habilita el procesamiento continuo.
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream() .format("mongodb") .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream() .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append"); StreamingQuery query = dataStreamWriter.start();
Nota
Spark no comienza a realizar transmisión hasta que se llama al método start() en una query de transmisión.
Para obtener una lista completa de métodos, consulte la referencia de Java Structured Streaming.
Para leer datos de MongoDB, llama a la función readStream en tu objeto SparkSession. Esta función devuelve un objeto DataStreamReader, que puedes usar para especificar el formato y otras configuraciones para la lectura en transmisión.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Utiliza |
| Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación. Para obtener una lista de opciones de configuración de transmisión de lectura, consulta la guía Opciones de configuración de transmisión de lectura. |
| Especifica el esquema de entrada. |
El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro continuous al método trigger() habilita el procesamiento continuo.
streamingDataFrame = (<local SparkSession>.readStream .format("mongodb") .load() ) dataStreamWriter = (streamingDataFrame.writeStream .trigger(continuous="1 second") .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") ) query = dataStreamWriter.start()
Nota
Spark no comienza a realizar transmisión hasta que se llama al método start() en una query de transmisión.
Para obtener una lista completa de métodos, consulta la referencia de Structured transmisión en pyspark.
Para leer datos de MongoDB, llamar al método readStream en el objeto SparkSession. Este método devuelve un objeto DataStreamReader, que puedes usar para especificar el formato y otros ajustes de configuración para tu operación de lectura por transmisión.
Debe especificar las siguientes configuraciones para leer desde MongoDB:
Configuración | Descripción |
|---|---|
| Especifica el formato de la fuente de datos de entrada subyacente. Utiliza |
| Especifica la configuración de transmisión, incluida la cadena de conexión de implementación de MongoDB, la base de datos y colección de MongoDB y las etapas de la canalización de agregación. Para obtener una lista de opciones de configuración de transmisión de lectura, consulta la guía Opciones de configuración de transmisión de lectura. |
| Especifica el esquema de entrada. |
El siguiente fragmento de código muestra cómo usar la configuración anterior para procesar continuamente los datos transmitidos desde MongoDB. El conector añade todos los datos nuevos a los existentes y escribe puntos de control asincrónicamente en /tmp/checkpointDir una vez por segundo. Pasar el parámetro Trigger.Continuous al método trigger() habilita el procesamiento continuo.
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream .format("mongodb") .load() val dataStreamWriter = streamingDataFrame.writeStream .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") val query = dataStreamWriter.start()
Nota
Spark no comienza a realizar transmisión hasta que se llama al método start() en una query de transmisión.
Para obtener una lista completa de métodos, consulte la referencia de transmisión estructurada de Scala.
Ejemplo
El siguiente ejemplo muestra cómo transmitir datos desde MongoDB a su consola.
Crea un objeto
DataStreamReaderque lea desde MongoDB.Cree un objeto
DataStreamWriterllamando al métodowriteStream()sobre el objeto de transmisiónDatasetque creó con unDataStreamReader. Especifica el formatoconsoleutilizando el métodoformat().Llama al método
start()en la instanciaDataStreamWriterpara iniciar el flujo.
A medida que se insertan nuevos datos en MongoDB, MongoDB envía esos datos a la consola según el outputMode que se especifique.
Importante
Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType() .add("company_symbol", DataTypes.StringType) .add("company_name", DataTypes.StringType) .add("price", DataTypes.DoubleType) .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("mongodb") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .schema(readSchema) .load() // manipulate your streaming data .writeStream() .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
Crea un objeto
DataStreamReaderque lea desde MongoDB.Cree un objeto
DataStreamWriterllamando al métodowriteStream()en la transmisiónDataFrameque creó con unDataStreamReader. Especifique el formatoconsolecon el métodoformat().Llama al método
start()en la instanciaDataStreamWriterpara iniciar el flujo.
A medida que se insertan nuevos datos en MongoDB, MongoDB envía esos datos a la consola según el outputMode que se especifique.
Importante
Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.
# create a local SparkSession spark = SparkSession.builder \ .appName("readExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define the schema of the source collection readSchema = (StructType() .add('company_symbol', StringType()) .add('company_name', StringType()) .add('price', DoubleType()) .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option('spark.mongodb.database', <database-name>) .option('spark.mongodb.collection', <collection-name>) .schema(readSchema) .load() # manipulate your streaming data .writeStream .format("console") .trigger(continuous="1 second") .outputMode("append") ) # run the query query = dataStreamWriter.start()
Crea un objeto
DataStreamReaderque lea desde MongoDB.Crea un objeto
DataStreamWriterllamando al métodowriteStream()en el objeto de transmisiónDataFrameque creaste utilizando elDataStreamReader. Especifique el formatoconsoleusando el métodoformat().Llama al método
start()en la instanciaDataStreamWriterpara iniciar el flujo.
A medida que se insertan nuevos datos en MongoDB, MongoDB envía esos datos a la consola según el outputMode que se especifique.
Importante
Evita transmitir grandes conjuntos de datos a tu consola. La transmisión a tu consola requiere mucha memoria y está destinada sólo para pruebas.
// create a local SparkSession val spark = SparkSession.builder .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define the schema of the source collection val readSchema = StructType() .add("company_symbol", StringType()) .add("company_name", StringType()) .add("price", DoubleType()) .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .schema(readSchema) .load() // manipulate your streaming data .writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append") // run the query val query = dataStreamWriter.start()
Importante
Inferencia del esquema de un flujo de cambios
Si configuras la opción change.stream.publish.full.document.only en true, el Conector de Spark infiere el esquema de un DataFrame usando el esquema de los documentos escaneados. Si configuras la opción en false, debes especificar un esquema.
Para obtener más información sobre esta configuración y ver una lista completa de las opciones de configuración del flujo de cambios, consulta la guía Leer opciones de configuración.
Documentación de la API
Para obtener más información sobre los tipos utilizados en estos ejemplos, consulte la siguiente documentación de la API de Apache Spark: