对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Docs 菜单

批量读取配置选项

以批处理模式从 MongoDB 读取数据时,可以配置以下属性。

注意

如果您使用 SparkConf 设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.

属性名称
说明

connection.uri

必需。
连接字符串配置键。默认值:

mongodb://localhost:27017/

database

必需。
数据库名称配置。

collection

必需。
集合名称配置。

comment

要附加到读取操作的注释。注释显示在数据库分析器的输出中。默认值:无

mode

处理与预期模式不匹配的文档时使用的解析策略。 此选项接受以下值:

  • ReadConfig.ParseMode.FAILFAST:在解析与模式不匹配的文档时引发异常。

  • ReadConfig.ParseMode.PERMISSIVE:当数据类型与模式不匹配时,将字段设置为null 。 要将每个无效文档存储为扩展JSON string ,请将此值与 columnNameOfCorruptRecord 选项结合使用。

  • ReadConfig.ParseMode.DROPMALFORMED:忽略任何与模式不匹配的文档。


默认: ReadConfig.ParseMode.FAILFAST

columnNameOfCorruptRecord

如果将mode 选项设立为ReadConfig.ParseMode.PERMISSIVE ,则此选项会指定将无效文档存储为扩展JSON 的新列的名称。如果使用的是显式模式,则必须包含新列的名称。如果您使用的是推断模式, Spark Connector会将新列添加到模式的末尾。默认值:无

mongoClientFactory

MongoClientFactory 配置键。您可以指定必须实现 接口的自定义实施。默认值:
com.mongodb.spark.sql.connector.connection.MongoClientFactory

com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

partitioner

分区器完整类名。

您可以指定必须实现 com.mongodb.spark.sql.connector.read.partitioner.Partitioner 接口的自定义实现。
有关分区器的更多信息,请参阅 分区器配置 部分。

默认值: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner

partitioner.options.

分区器配置前缀。
有关分区器的更多信息,请参阅分区器配置部分。

sampleSize


推断模式时从集合中示例的文档数。默认值:

1000

sql.inferSchema.mapTypes.enabled

在推断模式时是否启用Map 类型。启用后,大型兼容结构类型将被推断为
MapType

。默认值:true

sql.inferSchema.mapTypes.minimum.key.size

StructType在推断为MapType 之前, 的最小大小。默认值:

250

aggregation.pipeline

指定在将数据发送到Spark之前应用集合的自定义聚合管道。该值必须是扩展的JSON单个文档或文档列表。单个文档类似于以下内容:

{"$match": {"closed": false}}

文档列表类似于以下内容:

[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

重要提示:自定义聚合管道必须与分区器策略兼容。示例,$group 等聚合阶段不适用于任何创建多个分区的分区器。

aggregation.allowDiskUse

指定运行聚合时是否允许存储到磁盘。默认值:

true

outputExtendedJson

当为true 时,Connector会将Spark不支持的BSON 类型转换为扩展JSON字符串。当为false 时,Connector对不支持的类型使用原始的宽松JSON格式。默认值:

false

schemaHints

指定在推断集合的模式时使用的已知字段类型的部分模式。要学习;了解有关 选项的更多信息,请参阅使用模式提示指定已知字段部分。默认值:无schemaHints

分区器更改使用 Spark Connector 的批量读取的读取行为。通过将数据划分为多个分区,您可以并行运行转换。

本部分包含以下分区器的配置信息:

注意

仅批量读取

由于数据流处理引擎生成单个数据流,因此分区器不会影响流式读取。

AutoBucketPartitioner 是默认分区器配置。它对数据进行采样以生成分区,并使用$bucketAuto聚合阶段进行分页。通过使用此配置,您可以跨单个或多个字段(包括嵌套字段)对数据进行分区。

注意

复合键

AutoBucketPartitioner 配置需要MongoDB Server 7.0 或更高版本才能支持复合键。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner

属性名称
说明

partitioner.options.fieldList

用于分区的字段列表。 该值可以是单个字段名称,也可以是逗号分隔字段的列表。

默认: _id

partitioner.options.chunkSize

每个分区的平均大小 (MB)。 较小的分区会创建更多包含较少文档的分区。 由于此配置使用平均文档大小来确定每个分区的文档数量,因此分区的大小可能不同。

默认: 64

partitioner.options.samplesPerPartition

每个分区要采集的样本数。

默认: 100

partitioner.options.partitionKeyProjectionField

用于投影字段的字段名称,该投影字段包含用于对集合进行分区的所有字段。 建议仅当每个文档已包含__idx字段时才更改此属性的值。

默认: __idx

SamplePartitioner 配置与 AutoBucketPartitioner 配置类似,但不使用 $bucketAuto聚合阶段。此配置允许您指定分区字段、分区大小和每个分区的样本数。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner

属性名称
说明

partitioner.options.partition.field

用于分区的字段,必须是唯一的字段。

默认: _id

partitioner.options.partition.size

每个分区的大小(以 MB 为单位)。分区大小越小,创建的分区越多,包含的文档越少。

默认: 64

partitioner.options.samples.per.partition

每个分区要采集的样本数。采集的样本总数为:

samples per partition * ( count / number of documents per partition)

默认: 10

例子

对于包含 640 个文档且平均文档大小为 0.5 MB 的集合,默认 SamplePartitioner 配置会创建 5 个分区,每个分区包含 128 个文档。

Spark Connector 对 50 个文档进行采样(每个预期分区默认为 10 个),并通过从采样文档中选择分区字段范围来定义 5 个分区。

ShardedPartitioner 配置会根据您的分片配置自动对数据进行分区。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner

重要

ShardedPartitioner 限制

  1. 在 MongoDB Server v 6.0及更高版本中,分片操作会创建一个较大的初始数据段来覆盖所有分片键值,从而导致分片分区器效率低下。 我们不建议在连接到 MongoDB v 6.0及更高版本时使用分片分区器。

  2. 分片分区器与哈希分片键不兼容。

PaginateBySizePartitioner 配置通过使用平均文档大小将集合拆分为平均大小的数据块来对数据进行分页。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner

属性名称
说明

partitioner.options.partition.field

用于分区的字段,必须是唯一的字段。

默认: _id

partitioner.options.partition.size

每个分区的大小(以 MB 为单位)。分区大小越小,

创建的分区越多,包含的文档越少。

默认: 64

PaginateIntoPartitionsPartitioner 配置通过将集合中的文档计数除以允许的最大分区数来对数据进行分页。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner

属性名称
说明

partitioner.options.partition.field

用于分区的字段,必须是唯一的字段。

默认: _id

partitioner.options.max.number.of.partitions

要创建的分区数量。

默认: 64

SinglePartitionPartitioner 配置创建单个分区。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri 设置中,也可以单独列出。

以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri 设置的一部分:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

为了缩短 connection.uri 并使设置更易于阅读,您可以改为单独指定它们:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection

重要

如果您在 connection.uri 及其自己的行中指定设置,则 connection.uri 设置优先。例如,在以下配置中,连接数据库为 foobar,因为它是 connection.uri 设置中的值:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar