Overview
要从 MongoDB 读取数据,请对 SparkSession
对象调用read()
方法。此方法会返回一个 DataFrameReader
对象,您可以使用该对象指定批量读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
| 指定底层输入数据源的格式。使用 |
|
以下代码示例演示了如何利用先前的配置设置,从 MongoDB 的 people.contacts
中读取数据:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
提示
DataFrame 类型
DataFrame
在Java API中不作为类存在。 使用Dataset<Row>
引用 DataFrame。
要从 MongoDB 读取数据,请调用 SparkSession
对象上的 read
函数。此函数会返回一个 DataFrameReader
对象,您可以使用该对象指定批处理读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
| 指定底层输入数据源的格式。使用 |
|
以下代码示例演示了如何利用先前的配置设置,从 MongoDB 的 people.contacts
中读取数据:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
要从 MongoDB 读取数据,请调用 SparkSession
对象上的 read
方法。此方法会返回一个 DataFrameReader
对象,您可以使用该对象指定批处理读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
| 指定底层输入数据源的格式。使用 |
|
以下代码示例演示了如何利用先前的配置设置,从 MongoDB 的 people.contacts
中读取数据:
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
提示
DataFrame 类型
一个 DataFrame 由 Row
对象中的一个 Dataset
表示。DataFrame
类型是Dataset[Row]
的别名。
模式推断
当您加载没有模式的数据集或 DataFrame 时,Spark 会对记录进行采样以推断集合的模式。
假设 MongoDB 集合 people.contacts
包含以下文档:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
以下操作从 people.contacts
加载数据并推断 DataFrame 的模式:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
若要查看推断的模式,则在 Dataset<Row>
对象上使用 printSchema()
方法,如以下示例所示:
dataFrame.printSchema();
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
要查看 DataFrame 中的数据,则在 DataFrame
对象上使用 show()
方法,如以下示例所示:
dataFrame.show();
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
当您加载没有模式的数据集或 DataFrame 时,Spark 会对记录进行采样以推断集合的模式。
假设 MongoDB 集合 people.contacts
包含以下文档:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
以下操作从 people.contacts
加载数据并推断 DataFrame 的模式:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
若要查看推断的模式,请在 DataFrame
对象上使用 printSchema()
方法,如以下示例所示:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
要查看 DataFrame 中的数据,请对 DataFrame
对象使用 show()
函数,如下例所示:
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
当您加载没有模式的数据集或 DataFrame 时,Spark 会对记录进行采样以推断集合的模式。
假设 MongoDB 集合 people.contacts
包含以下文档:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
以下操作从 people.contacts
加载数据并推断 DataFrame 的模式:
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
若要查看推断的模式,则在 DataFrame
对象上使用 printSchema()
方法,如以下示例所示:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
要查看 DataFrame 中的数据,则在 DataFrame
对象上使用 show()
方法,如以下示例所示:
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
筛选器
在 DataFrame 或数据集中使用过滤器时,底层的 MongoDB Connector 代码会构建聚合管道,以在将 MongoDB 中的数据发送到 Spark 之前对其进行过滤。这通过仅检索和处理所需数据来提高 Spark 性能。
MongoDB Spark Connector 将以下筛选器转变为聚合管道阶段:
和
EqualNullSafe
EqualTo
大于
GreaterThanOrEqual
登录
IsNull
小于
LessThanOrEqual
not
或
StringContains
StringEndsWith
StringStartsWith
使用 filter()
从 MongoDB 集合中读取数据子集。
考虑包含以下文档的集合 fruit
:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
首先,设置 DataFrame
对象来连接默认的 MongoDB 数据源:
df = spark.read.format("mongodb").load()
以下示例仅包含 qty
字段大于或等于 10
的记录。
df.filter(df['qty'] >= 10).show()
该操作将打印以下输出:
+---+----+------+ |_id| qty| type| +---+----+------+ |2.0|10.0|orange| |3.0|15.0|banana| +---+----+------+
在 DataFrame 或数据集中使用过滤器时,底层的 MongoDB Connector 代码会构建聚合管道,以在将 MongoDB 中的数据发送到 Spark 之前对其进行过滤。这通过仅检索和处理所需数据来提高 Spark 性能。
MongoDB Spark Connector 将以下筛选器转变为聚合管道阶段:
和
EqualNullSafe
EqualTo
大于
GreaterThanOrEqual
登录
IsNull
小于
LessThanOrEqual
not
或
StringContains
StringEndsWith
StringStartsWith
以下示例筛选并输出年龄低于100的字符:
df.filter(df("age") < 100).show()
该操作输出以下内容:
+--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[5755d7b4566878c9...| 50|Bilbo Baggins| |[5755d7b4566878c9...| 82| Fíli| |[5755d7b4566878c9...| 77| Kíli| +--------------------+---+-------------+
SQL 查询
在数据集上运行 SQL 查询之前,必须为数据集注册临时视图。
以下操作注册 characters
表,然后进行查询以查找所有大于或等于 100 的字符:
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
输出以下内容:
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
在对 DataFrame 运行 SQL 查询之前,您需要注册一个临时表。
以下示例注册一个名为 temp
的临时表,然后使用 SQL 查询 type
字段包含字母 e
的记录:
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
在 pyspark
Shell 中,该操作将打印以下输出:
+------+----+ | type| qty| +------+----+ | apple| 5.0| |orange|10.0| +------+----+
在数据集上运行 SQL 查询之前,必须为数据集注册临时视图。
以下操作注册 characters
表,然后进行查询以查找所有大于或等于 100 的字符:
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
API 文档
要了解有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档: