以批处理模式从 MongoDB 读取
Overview
要从 MongoDB 读取数据,请对 SparkSession
对象调用read()
方法。此方法会返回一个 DataFrameReader
对象,您可以使用该对象指定批量读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
dataFrame.read.format() | 指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。 |
dataFrame.read.option() |
以下代码示例演示了如何利用先前的配置设置,从 MongoDB 的 people.contacts
中读取数据:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
提示
DataFrame 类型
DataFrame
在Java API中不作为类存在。 使用Dataset<Row>
引用 DataFrame。
要从 MongoDB 读取数据,请调用 SparkSession
对象上的 read
函数。此函数会返回一个 DataFrameReader
对象,您可以使用该对象指定批处理读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
dataFrame.read.format() | 指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。 |
dataFrame.read.option() |
以下代码示例演示了如何利用先前的配置设置,从 MongoDB 的 people.contacts
中读取数据:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
要从 MongoDB 读取数据,请调用 SparkSession
对象上的 read
方法。此方法会返回一个 DataFrameReader
对象,您可以使用该对象指定批处理读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
dataFrame.read.format() | 指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。 |
dataFrame.read.option() |
以下代码示例演示了如何利用先前的配置设置,从 MongoDB 的 people.contacts
中读取数据:
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
提示
DataFrame 类型
一个 DataFrame 由 Row
对象中的一个 Dataset
表示。DataFrame
类型是Dataset[Row]
的别名。
模式推断
当您加载没有模式的数据集或 DataFrame 时,Spark 会对记录进行采样以推断集合的模式。
假设 MongoDB 集合 people.contacts
包含以下文档:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
以下操作从 people.contacts
加载数据并推断 DataFrame 的模式:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
若要查看推断的模式,则在 Dataset<Row>
对象上使用 printSchema()
方法,如以下示例所示:
dataFrame.printSchema();
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
要查看 DataFrame 中的数据,则在 DataFrame
对象上使用 show()
方法,如以下示例所示:
dataFrame.show();
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
当您加载没有模式的数据集或 DataFrame 时,Spark 会对记录进行采样以推断集合的模式。
假设 MongoDB 集合 people.contacts
包含以下文档:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
以下操作从 people.contacts
加载数据并推断 DataFrame 的模式:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
若要查看推断的模式,请在 DataFrame
对象上使用 printSchema()
方法,如以下示例所示:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
要查看 DataFrame 中的数据,请对 DataFrame
对象使用 show()
函数,如下例所示:
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
当您加载没有模式的数据集或 DataFrame 时,Spark 会对记录进行采样以推断集合的模式。
假设 MongoDB 集合 people.contacts
包含以下文档:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
以下操作从 people.contacts
加载数据并推断 DataFrame 的模式:
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
若要查看推断的模式,则在 DataFrame
对象上使用 printSchema()
方法,如以下示例所示:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
要查看 DataFrame 中的数据,则在 DataFrame
对象上使用 show()
方法,如以下示例所示:
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
使用模式提示指定已知字段
您可以通过指定schemaHint
配置选项来指定要在模式推断期间使用的包含已知字段值的模式。 您可以使用以下任何Spark格式指定schemaHint
选项:
类型 | format | |||
---|---|---|---|---|
DDL | <field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE> | |||
SQL DDL | STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE> | |||
JSON |
|
以下示例演示如何使用Spark shell以每种格式指定 schemaHint
选项。 该示例指定了一个名为"value"
的字符串值字段和一个名为"count"
的整数值字段。
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
您还可以使用 PySpark 以简单string DDL 格式或JSON格式指定 schemaHint
选项,如以下示例所示:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
筛选器
在 DataFrame 或数据集中使用过滤器时,底层的 MongoDB Connector 代码会构建聚合管道,以在将 MongoDB 中的数据发送到 Spark 之前对其进行过滤。这通过仅检索和处理所需数据来提高 Spark 性能。
MongoDB Spark Connector 将以下筛选器转变为聚合管道阶段:
和
EqualNullSafe
EqualTo
大于
GreaterThanOrEqual
登录
IsNull
小于
LessThanOrEqual
not
或
StringContains
StringEndsWith
StringStartsWith
使用 filter()
从 MongoDB 集合中读取数据子集。
考虑包含以下文档的集合 fruit
:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
首先,设置 DataFrame
对象来连接默认的 MongoDB 数据源:
df = spark.read.format("mongodb").load()
以下示例仅包含 qty
字段大于或等于 10
的记录。
df.filter(df['qty'] >= 10).show()
该操作将打印以下输出:
+---+----+------+ |_id| qty| type| +---+----+------+ |2.0|10.0|orange| |3.0|15.0|banana| +---+----+------+
在 DataFrame 或数据集中使用过滤器时,底层的 MongoDB Connector 代码会构建聚合管道,以在将 MongoDB 中的数据发送到 Spark 之前对其进行过滤。这通过仅检索和处理所需数据来提高 Spark 性能。
MongoDB Spark Connector 将以下筛选器转变为聚合管道阶段:
和
EqualNullSafe
EqualTo
大于
GreaterThanOrEqual
登录
IsNull
小于
LessThanOrEqual
not
或
StringContains
StringEndsWith
StringStartsWith
以下示例筛选并输出年龄低于100的字符:
df.filter(df("age") < 100).show()
该操作输出以下内容:
+--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[5755d7b4566878c9...| 50|Bilbo Baggins| |[5755d7b4566878c9...| 82| Fíli| |[5755d7b4566878c9...| 77| Kíli| +--------------------+---+-------------+
SQL 查询
在数据集上运行 SQL 查询之前,必须为数据集注册临时视图。
以下操作注册 characters
表,然后进行查询以查找所有大于或等于 100 的字符:
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
输出以下内容:
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
在对 DataFrame 运行 SQL 查询之前,您需要注册一个临时表。
以下示例注册一个名为 temp
的临时表,然后使用 SQL 查询 type
字段包含字母 e
的记录:
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
在 pyspark
Shell 中,该操作将打印以下输出:
+------+----+ | type| qty| +------+----+ | apple| 5.0| |orange|10.0| +------+----+
在数据集上运行 SQL 查询之前,必须为数据集注册临时视图。
以下操作注册 characters
表,然后进行查询以查找所有大于或等于 100 的字符:
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
API 文档
要了解有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档: