Overview
从 MongoDB 数据库读取流时,MongoDB Spark Connector 支持微批处理和连续处理。 微批处理是默认的处理引擎,可实现低至 100 毫秒的端到端延迟,并具有一次性容错保证。连续处理是 Spark 2.3 版中引入的一项实验性功能,可在保证至少一次的情况下实现低至 1 毫秒的端到端延迟。
要学习;了解有关连续处理的详情,请参阅 Spark文档。
注意
连接器从 MongoDB 部署的变更流中读取数据。如需在变更流上生成更改事件,请对数据库上执行更新操作。
如需了解有关变更流的更多信息,请参阅 MongoDB 手册中的“变更流”部分。
要从MongoDB读取数据,请对SparkSession对象调用 readStream()方法。 此方法会返回一个DataStreamReader对象,您可以使用该对象指定流媒体读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置  | 说明  | 
|---|---|
  | 指定底层输入数据源的格式。使用   | 
  | |
  | 指定输入模式。  | 
以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir一次。将Trigger.Continuous参数传递给trigger()方法可启用连续处理。
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream()   .format("mongodb")   .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()   .trigger(Trigger.Continuous("1 second"))   .format("memory")   .option("checkpointLocation", "/tmp/checkpointDir")   .outputMode("append"); StreamingQuery query = dataStreamWriter.start(); 
注意
在您对流媒体查询调用start()方法之前,Spark 不会开始流媒体。
有关方法的完整列表,请参阅Java结构化流参考。
要从 MongoDB 读取数据,请对SparkSession对象调用readStream函数。 此函数返回一个DataStreamReader对象,您可以使用该对象指定流式读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置  | 说明  | 
|---|---|
  | 指定底层输入数据源的格式。使用   | 
  | |
  | 指定输入模式。  | 
以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir一次。将continuous参数传递给trigger()方法可启用连续处理。
streamingDataFrame = (<local SparkSession>.readStream   .format("mongodb")   .load() ) dataStreamWriter = (streamingDataFrame.writeStream   .trigger(continuous="1 second")   .format("memory")   .option("checkpointLocation", "/tmp/checkpointDir")   .outputMode("append") ) query = dataStreamWriter.start() 
注意
在您对流媒体查询调用start()方法之前,Spark 不会开始流媒体。
有关方法的完整列表,请参阅pyspark 结构化流参考。
要从 MongoDB 读取数据,请对SparkSession对象调用readStream方法。 此方法会返回一个DataStreamReader对象,您可以使用该对象指定流媒体读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置  | 说明  | 
|---|---|
  | 指定底层输入数据源的格式。使用   | 
  | |
  | 指定输入模式。  | 
以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir一次。将Trigger.Continuous参数传递给trigger()方法可启用连续处理。
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream   .format("mongodb")   .load() val dataStreamWriter = streamingDataFrame.writeStream   .trigger(Trigger.Continuous("1 second"))   .format("memory")   .option("checkpointLocation", "/tmp/checkpointDir")   .outputMode("append") val query = dataStreamWriter.start() 
注意
在您对流媒体查询调用start()方法之前,Spark 不会开始流媒体。
有关方法的完整列表,请参阅Scala结构化流参考。
例子
以下示例展示了如何将数据从 MongoDB 流式传输到控制台。
创建一个从 MongoDB 读取数据的
DataStreamReader对象。通过在使用
DataStreamReader创建的流式Dataset对象上调用writeStream()方法来创建DataStreamWriter对象。 使用format()方法指定格式console。在
DataStreamWriter实例上调用start()方法以开始流。
当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode将数据流式传输到控制台。
重要
避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。
// create a local SparkSession SparkSession spark = SparkSession.builder()   .appName("readExample")   .master("spark://spark-master:<port>")   .config("spark.jars", "<mongo-spark-connector-JAR-file-name>")   .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType()   .add("company_symbol", DataTypes.StringType)   .add("company_name", DataTypes.StringType)   .add("price", DataTypes.DoubleType)   .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream()   .format("mongodb")   .option("spark.mongodb.connection.uri", "<mongodb-connection-string>")   .option("spark.mongodb.database", "<database-name>")   .option("spark.mongodb.collection", "<collection-name>")   .schema(readSchema)   .load()   // manipulate your streaming data   .writeStream()   .format("console")   .trigger(Trigger.Continuous("1 second"))   .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start(); 
创建一个从 MongoDB 读取数据的
DataStreamReader对象。通过在您使用
DataStreamReader创建的流式DataFrame上调用writeStream()方法来创建DataStreamWriter对象。 使用format()方法指定格式console。在
DataStreamWriter实例上调用start()方法以开始流。
当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode将数据流式传输到控制台。
重要
避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。
# create a local SparkSession spark = SparkSession.builder \   .appName("readExample") \   .master("spark://spark-master:<port>") \   .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \   .getOrCreate() # define the schema of the source collection readSchema = (StructType()   .add('company_symbol', StringType())   .add('company_name', StringType())   .add('price', DoubleType())   .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream   .format("mongodb")   .option("spark.mongodb.connection.uri", <mongodb-connection-string>)   .option('spark.mongodb.database', <database-name>)   .option('spark.mongodb.collection', <collection-name>)   .schema(readSchema)   .load()   # manipulate your streaming data   .writeStream   .format("console")   .trigger(continuous="1 second")   .outputMode("append") ) # run the query query = dataStreamWriter.start() 
创建一个从 MongoDB 读取数据的
DataStreamReader对象。通过在使用
DataStreamReader创建的流式DataFrame对象上调用writeStream()方法来创建DataStreamWriter对象。 使用format()方法指定格式console。在
DataStreamWriter实例上调用start()方法以开始流。
当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode将数据流式传输到控制台。
重要
避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。
// create a local SparkSession val spark = SparkSession.builder   .appName("readExample")   .master("spark://spark-master:<port>")   .config("spark.jars", "<mongo-spark-connector-JAR-file-name>")   .getOrCreate() // define the schema of the source collection val readSchema = StructType()   .add("company_symbol", StringType())   .add("company_name", StringType())   .add("price", DoubleType())   .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream   .format("mongodb")   .option("spark.mongodb.connection.uri", <mongodb-connection-string>)   .option("spark.mongodb.database", <database-name>)   .option("spark.mongodb.collection", <collection-name>)   .schema(readSchema)   .load()   // manipulate your streaming data   .writeStream   .format("console")   .trigger(Trigger.Continuous("1 second"))   .outputMode("append") // run the query val query = dataStreamWriter.start() 
重要
推断变更流的模式
如果将change.stream.publish.full.document.only选项设置为true ,Spark Connector 将使用扫描文档的模式推断DataFrame的模式。 如果将该选项设置为false ,则必须指定模式。
有关此设置的更多信息,并查看change stream配置选项的完整列表,请参阅读取配置选项指南。
API 文档
要了解有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档: