Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs 菜单
Docs 主页
/
MongoDB Spark Connector
/ /

批量读取配置选项

以批处理模式从 MongoDB 读取数据时,可以配置以下属性。

注意

如果您使用 SparkConf 设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.

属性名称
说明

connection.uri

Required.
The connection string configuration key.

Default: mongodb://localhost:27017/

database

Required.
The database name configuration.

collection

Required.
The collection name configuration.

comment

The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None

mongoClientFactory

MongoClientFactory configuration key.
You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

partitioner

The partitioner full class name.
You can specify a custom implementation that must implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface.
See the Partitioner Configuration section for more information about partitioners.

Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner

partitioner.options.

Partitioner configuration prefix.
See the Partitioner Configuration section for more information about partitioners.

sampleSize

The number of documents to sample from the collection when inferring
the schema.

Default: 1000

sql.inferSchema.mapTypes.enabled

Whether to enable Map types when inferring the schema.
When enabled, large compatible struct types are inferred to a MapType instead.

Default: true

sql.inferSchema.mapTypes.minimum.key.size

Minimum size of a StructType before inferring as a MapType.

Default: 250

aggregation.pipeline

Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

重要提示:自定义聚合管道必须与分区器策略兼容。示例,$group 等聚合阶段不适用于任何创建多个分区的分区器。

aggregation.allowDiskUse

Specifies whether to allow storage to disk when running the aggregation.

Default: true

outputExtendedJson

When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false

分区器更改使用 Spark Connector 的批量读取的读取行为。通过将数据划分为多个分区,您可以并行运行转换。

本部分包含以下分区器的配置信息:

注意

仅批量读取

由于数据流处理引擎生成单个数据流,因此分区器不会影响流式读取。

SamplePartitioner 是默认的分区器配置。此配置允许您指定分区字段、分区大小和每个分区的样本数。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner

属性名称
说明

partitioner.options.partition.field

用于分区的字段,必须是唯一的字段。

默认: _id

partitioner.options.partition.size

每个分区的大小(以 MB 为单位)。分区大小越小,创建的分区越多,包含的文档越少。

默认: 64

partitioner.options.samples.per.partition

每个分区要采集的样本数。采集的样本总数为:

samples per partition * ( count / number of documents per partition)

默认: 10

例子

对于包含 640 个文档且平均文档大小为 0.5 MB 的集合,默认 SamplePartitioner 配置会创建 5 个分区,每个分区包含 128 个文档。

Spark Connector 对 50 个文档进行采样(每个预期分区默认为 10 个),并通过从采样文档中选择分区字段范围来定义 5 个分区。

ShardedPartitioner 配置会根据您的分片配置自动对数据进行分区。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner

重要

ShardedPartitioner 限制

  1. 在 MongoDB Server v 6.0及更高版本中,分片操作会创建一个较大的初始数据段来覆盖所有分片键值,从而导致分片分区器效率低下。 我们不建议在连接到 MongoDB v 6.0及更高版本时使用分片分区器。

  2. 分片分区器与哈希分片键不兼容。

PaginateBySizePartitioner 配置通过使用平均文档大小将集合拆分为平均大小的数据块来对数据进行分页。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner

属性名称
说明

partitioner.options.partition.field

用于分区的字段,必须是唯一的字段。

默认: _id

partitioner.options.partition.size

每个分区的大小(以 MB 为单位)。分区大小越小,

创建的分区越多,包含的文档越少。

默认: 64

PaginateIntoPartitionsPartitioner 配置通过将集合中的文档计数除以允许的最大分区数来对数据进行分页。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner

属性名称
说明

partitioner.options.partition.field

用于分区的字段,必须是唯一的字段。

默认: _id

partitioner.options.max.number.of.partitions

要创建的分区数量。

默认: 64

SinglePartitionPartitioner 配置创建单个分区。

要使用此配置,请将 partitioner 配置选项设置为 com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner

如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri 设置中,也可以单独列出。

以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri 设置的一部分:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

为了缩短 connection.uri 并使设置更易于阅读,您可以改为单独指定它们:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

重要

如果您在 connection.uri 及其自己的行中指定设置,则 connection.uri 设置优先。例如,在以下配置中,连接数据库为 foobar,因为它是 connection.uri 设置中的值:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

后退

读取

在此页面上