Overview
以批处理模式从 MongoDB 读取数据时,可以配置以下属性。
注意
如果您使用 SparkConf 设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.。
属性名称 | 说明 | ||
|---|---|---|---|
| 必需。 | ||
| 必需。 | ||
| 必需。 | ||
| |||
| 处理与预期模式不匹配的文档时使用的解析策略。 此选项接受以下值:
| ||
| 如果将 | ||
| MongoClientFactory 配置键。您可以指定必须实现 接口的自定义实施。默认值: | ||
| 分区器完整类名。 您可以指定必须实现 | ||
| 分区器配置前缀。 | ||
|
| ||
| 在推断模式时是否启用Map 类型。启用后,大型兼容结构类型将被推断为 | ||
|
| ||
| 指定在将数据发送到Spark之前应用集合的自定义聚合管道。该值必须是扩展的JSON单个文档或文档列表。单个文档类似于以下内容: 文档列表类似于以下内容: 重要提示:自定义聚合管道必须与分区器策略兼容。示例, | ||
| 指定运行聚合时是否允许存储到磁盘。默认值: | ||
| 当为 | ||
|
分区器配置
分区器更改使用 Spark Connector 的批量读取的读取行为。通过将数据划分为多个分区,您可以并行运行转换。
本部分包含以下分区器的配置信息:
注意
仅批量读取
由于数据流处理引擎生成单个数据流,因此分区器不会影响流式读取。
AutoBucketPartitioner 配置(默认)
AutoBucketPartitioner 是默认分区器配置。它对数据进行采样以生成分区,并使用$bucketAuto聚合阶段进行分页。通过使用此配置,您可以跨单个或多个字段(包括嵌套字段)对数据进行分区。
注意
复合键
AutoBucketPartitioner 配置需要MongoDB Server 7.0 或更高版本才能支持复合键。
要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner。
属性名称 | 说明 |
|---|---|
| 用于分区的字段列表。 该值可以是单个字段名称,也可以是逗号分隔字段的列表。 默认: |
| 每个分区的平均大小 (MB)。 较小的分区会创建更多包含较少文档的分区。 由于此配置使用平均文档大小来确定每个分区的文档数量,因此分区的大小可能不同。 默认: |
| 每个分区要采集的样本数。 默认: |
| 用于投影字段的字段名称,该投影字段包含用于对集合进行分区的所有字段。 建议仅当每个文档已包含 默认: |
SamplePartitioner 配置
SamplePartitioner 配置与 AutoBucketPartitioner 配置类似,但不使用 $bucketAuto聚合阶段。此配置允许您指定分区字段、分区大小和每个分区的样本数。
要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner。
属性名称 | 说明 | |
|---|---|---|
| 用于分区的字段,必须是唯一的字段。 默认: | |
| 每个分区的大小(以 MB 为单位)。分区大小越小,创建的分区越多,包含的文档越少。 默认: | |
| 每个分区要采集的样本数。采集的样本总数为: 默认: |
例子
对于包含 640 个文档且平均文档大小为 0.5 MB 的集合,默认 SamplePartitioner 配置会创建 5 个分区,每个分区包含 128 个文档。
Spark Connector 对 50 个文档进行采样(每个预期分区默认为 10 个),并通过从采样文档中选择分区字段范围来定义 5 个分区。
ShardedPartitioner 配置
ShardedPartitioner 配置会根据您的分片配置自动对数据进行分区。
要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner。
重要
ShardedPartitioner 限制
在 MongoDB Server v 6.0及更高版本中,分片操作会创建一个较大的初始数据段来覆盖所有分片键值,从而导致分片分区器效率低下。 我们不建议在连接到 MongoDB v 6.0及更高版本时使用分片分区器。
分片分区器与哈希分片键不兼容。
PaginateBySizePartitioner 配置
PaginateBySizePartitioner 配置通过使用平均文档大小将集合拆分为平均大小的数据块来对数据进行分页。
要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner。
属性名称 | 说明 |
|---|---|
| 用于分区的字段,必须是唯一的字段。 默认: |
| 每个分区的大小(以 MB 为单位)。分区大小越小, 创建的分区越多,包含的文档越少。 默认: |
PaginateIntoPartitionsPartitioner Configuration
PaginateIntoPartitionsPartitioner 配置通过将集合中的文档计数除以允许的最大分区数来对数据进行分页。
要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner。
属性名称 | 说明 |
|---|---|
| 用于分区的字段,必须是唯一的字段。 默认: |
| 要创建的分区数量。 默认: |
SinglePartitionPartitioner 配置
SinglePartitionPartitioner 配置创建单个分区。
要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner。
在 connection.uri 中指定属性
如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri 设置中,也可以单独列出。
以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri 设置的一部分:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
为了缩短 connection.uri 并使设置更易于阅读,您可以改为单独指定它们:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection
重要
如果您在 connection.uri 及其自己的行中指定设置,则 connection.uri 设置优先。例如,在以下配置中,连接数据库为 foobar,因为它是 connection.uri 设置中的值:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar