Overview
从 MongoDB 数据库读取流时,MongoDB Spark Connector 支持微批处理和连续处理。 微批处理是默认的处理引擎,可实现低至 100 毫秒的端到端延迟,并具有一次性容错保证。连续处理是 Spark 2.3 版中引入的一项实验性功能,可在保证至少一次的情况下实现低至 1 毫秒的端到端延迟。
要学习;了解有关连续处理的详情,请参阅 Spark文档。
注意
连接器从 MongoDB 部署的变更流中读取数据。如需在变更流上生成更改事件,请对数据库上执行更新操作。
如需了解有关变更流的更多信息,请参阅 MongoDB 手册中的“变更流”部分。
要从MongoDB读取数据,请对SparkSession
对象调用 readStream()
方法。 此方法会返回一个DataStreamReader
对象,您可以使用该对象指定流媒体读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
| 指定底层输入数据源的格式。使用 |
| |
| 指定输入模式。 |
以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir
一次。将Trigger.Continuous
参数传递给trigger()
方法可启用连续处理。
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream() .format("mongodb") .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream() .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append"); StreamingQuery query = dataStreamWriter.start();
注意
在您对流媒体查询调用start()
方法之前,Spark 不会开始流媒体。
要从 MongoDB 读取数据,请对SparkSession
对象调用readStream
函数。 此函数返回一个DataStreamReader
对象,您可以使用该对象指定流式读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
| 指定底层输入数据源的格式。使用 |
| |
| 指定输入模式。 |
以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir
一次。将continuous
参数传递给trigger()
方法可启用连续处理。
streamingDataFrame = (<local SparkSession>.readStream .format("mongodb") .load() ) dataStreamWriter = (streamingDataFrame.writeStream .trigger(continuous="1 second") .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") ) query = dataStreamWriter.start()
注意
在您对流媒体查询调用start()
方法之前,Spark 不会开始流媒体。
有关方法的完整列表,请参阅 pyspark 结构化流参考。
要从 MongoDB 读取数据,请对SparkSession
对象调用readStream
方法。 此方法会返回一个DataStreamReader
对象,您可以使用该对象指定流媒体读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
| 指定底层输入数据源的格式。使用 |
| |
| 指定输入模式。 |
以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir
一次。将Trigger.Continuous
参数传递给trigger()
方法可启用连续处理。
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream .format("mongodb") .load() val dataStreamWriter = streamingDataFrame.writeStream .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") val query = dataStreamWriter.start()
注意
在您对流媒体查询调用start()
方法之前,Spark 不会开始流媒体。
例子
以下示例展示了如何将数据从 MongoDB 流式传输到控制台。
创建一个从 MongoDB 读取数据的
DataStreamReader
对象。通过在使用
DataStreamReader
创建的流式Dataset
对象上调用writeStream()
方法来创建DataStreamWriter
对象。 使用format()
方法指定格式console
。在
DataStreamWriter
实例上调用start()
方法以开始流。
当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode
将数据流式传输到控制台。
重要
避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType() .add("company_symbol", DataTypes.StringType) .add("company_name", DataTypes.StringType) .add("price", DataTypes.DoubleType) .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("mongodb") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .schema(readSchema) .load() // manipulate your streaming data .writeStream() .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
创建一个从 MongoDB 读取数据的
DataStreamReader
对象。通过在您使用
DataStreamReader
创建的流式DataFrame
上调用writeStream()
方法来创建DataStreamWriter
对象。 使用format()
方法指定格式console
。在
DataStreamWriter
实例上调用start()
方法以开始流。
当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode
将数据流式传输到控制台。
重要
避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。
# create a local SparkSession spark = SparkSession.builder \ .appName("readExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define the schema of the source collection readSchema = (StructType() .add('company_symbol', StringType()) .add('company_name', StringType()) .add('price', DoubleType()) .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option('spark.mongodb.database', <database-name>) .option('spark.mongodb.collection', <collection-name>) .schema(readSchema) .load() # manipulate your streaming data .writeStream .format("console") .trigger(continuous="1 second") .outputMode("append") ) # run the query query = dataStreamWriter.start()
创建一个从 MongoDB 读取数据的
DataStreamReader
对象。通过在使用
DataStreamReader
创建的流式DataFrame
对象上调用writeStream()
方法来创建DataStreamWriter
对象。 使用format()
方法指定格式console
。在
DataStreamWriter
实例上调用start()
方法以开始流。
当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode
将数据流式传输到控制台。
重要
避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。
// create a local SparkSession val spark = SparkSession.builder .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define the schema of the source collection val readSchema = StructType() .add("company_symbol", StringType()) .add("company_name", StringType()) .add("price", DoubleType()) .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .schema(readSchema) .load() // manipulate your streaming data .writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append") // run the query val query = dataStreamWriter.start()
重要
推断变更流的模式
如果将change.stream.publish.full.document.only
选项设置为true
,Spark Connector 将使用扫描文档的模式推断DataFrame
的模式。 如果将该选项设置为false
,则必须指定模式。
有关此设置的更多信息,并查看change stream配置选项的完整列表,请参阅读取配置选项指南。
API 文档
要了解有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档: