对于 AI 代理:可在 https://www.mongodb.com/zh-cn/docs/llms.txt 获取文档索引—通过在任何 URL 路径后添加 .md 可获取所有页面的 Markdown 版本。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs 菜单

流式读取配置选项

以流式传输模式从 MongoDB 读取数据时,可以配置以下属性。

注意

如果您使用 SparkConf 设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.

属性名称
说明

connection.uri

必需。
连接字符串配置键。默认值:

mongodb://localhost:27017/

database

必需。
数据库名称配置。

collection

必需。
集合名称配置。您可以指定多个集合,用逗号分隔集合名称。要学习;了解有关指定多个集合的更多信息,请参阅在


collection属性中指定多个集合。

comment

要附加到读取操作的注释。注释显示在数据库分析器的输出中。默认值:无

mode

处理与预期模式不匹配的文档时使用的解析策略。 此选项接受以下值:

  • ReadConfig.ParseMode.FAILFAST:在解析与模式不匹配的文档时引发异常。

  • ReadConfig.ParseMode.PERMISSIVE:当数据类型与模式不匹配时,将字段设置为null 。 要将每个无效文档存储为扩展JSON string ,请将此值与 columnNameOfCorruptRecord 选项结合使用。

  • ReadConfig.ParseMode.DROPMALFORMED:忽略任何与模式不匹配的文档。


默认: ReadConfig.ParseMode.FAILFAST

columnNameOfCorruptRecord

如果将mode 选项设立为ReadConfig.ParseMode.PERMISSIVE ,则此选项会指定将无效文档存储为扩展JSON 的新列的名称。如果使用的是显式模式,则必须包含新列的名称。如果您使用的是推断模式, Spark Connector会将新列添加到模式的末尾。默认值:无

mongoClientFactory

MongoClientFactory 配置键。您可以指定自定义实施,该实现必须实现 接口。默认值:
com.mongodb.spark.sql.connector.connection.MongoClientFactory

com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

aggregation.pipeline

指定在将数据发送到Spark之前应用集合的自定义聚合管道。该值必须是扩展的JSON单个文档或文档列表。单个文档类似于以下内容:

{"$match": {"closed": false}}

文档列表类似于以下内容:

[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

自定义聚合管道必须与分区器策略兼容。例如,诸如 $group 之类的聚合阶段不适用于创建多个分区的任何分区器。

aggregation.allowDiskUse

指定运行聚合时是否允许存储到磁盘。默认值:

true

change.stream.

更改流配置前缀。有关变更流的更多信息,请参阅变更流配置部分。

outputExtendedJson

当为true 时,Connector会将Spark不支持的BSON 类型转换为扩展JSON字符串。当为false 时,Connector对不支持的类型使用原始的宽松JSON格式。默认值:

false

schemaHints

指定在推断集合的模式时使用的已知字段类型的部分模式。要学习;了解有关 选项的更多信息,请参阅使用模式提示指定已知字段部分。默认值:无schemaHints

从 MongoDB 读取变更流时,您可以配置以下属性:

属性名称
说明

change.stream.lookup.full.document

确定变更流在更新操作中返回的值。

默认设置返回原始文档和更新文档之间的差异。

updateLookup设置还返回原始文档和更新文档之间的差异,但它也包括整个更新文档的副本。

有关此变更流选项如何工作的更多信息,请参阅 MongoDB 服务器手册指南“查找更新操作的完整文档”。

默认值: "default"

change.stream.micro.batch.max.partition.count

Spark Connector将每个微批处理划分为的最大分区数。 Spark Workers 可以并行进程这些分区。此设置仅在使用微批处理流时适用。默认值:



1

警告:指定大于1的值可能会改变Spark Connector处理更改事件的顺序。 如果无序处理可能会导致下游数据不一致,请避免此设置。

change.stream.publish.full.document.only

指定发布变更后的文档还是完整的变更流文档。当此设置为

false时,您必须指定模式。该模式必须包括要从变更流中读取的所有字段。您可以使用可选字段来确保模式对所有变更流事件都有效。当此设置为

true时,Connector会表现出以下行为:

  • connector会过滤掉省略 fullDocument 字段的消息,并仅发布该字段的值。

  • 如果不指定模式, connector将从变更流文档推断模式。

此设置会覆盖change.stream.lookup.full.document设置。

默认false

change.stream.startup.mode

指定connector在没有可用偏移时的启动方式。

此设置可接受以下值:

  • latest: connector从最新事件开始处理变更事件。 它不会处理任何早期未处理的事件。

  • timestamp: connector在指定时间开始处理变更事件。

    要使用timestamp选项,必须使用change.stream.startup.mode.timestamp.start.at.operation.time设置指定时间。 此设置接受以下格式的时间戳:

    默认latest

change.stream.lookup.full.document.before.change

确定是否在变更流输出中包含已修改文档的前像。此设置接受以下值:

  • default:此选项相当于 off 值。

  • off:在变更流输出中不包括已修改文档的前像。

  • whenAvailable:当前像可用时,在变更流输出中包含已修改文档的前像。

  • required:在变更流输出中包含已修改文档的前像。如果修改后的文档没有可用的前像,则Connector会引发错误。

默认: default

如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri 设置中,也可以单独列出。

以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri 设置的一部分:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

为了缩短 connection.uri 并使设置更易于阅读,您可以改为单独指定它们:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection

重要

如果您在 connection.uri 及其自己的行中指定设置,则 connection.uri 设置优先。例如,在以下配置中,连接数据库为 foobar,因为它是 connection.uri 设置中的值:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

您可以在collection变更流 配置属性中指定多个集合,用逗号分隔集合名称。 不要在集合之间添加空格,除非空格是集合名称的一部分。

指定多个集合,如下例所示:

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

如果集合名称为 "*",或者名称包含逗号或反斜杠 (\),则必须按如下方式对字符进行转义:

  • 如果collection配置选项中使用的集合名称包含逗号,则 Spark Connector 会将其视为两个不同的集合。 为避免这种情况,必须在逗号前面加上反斜杠 (\) 来对逗号进行转义。 对名为 "my,collection" 的集合进行转义,如下所示:

    "my\,collection"
  • 如果collection配置选项中使用的集合名称为“*”,Spark Connector 会将其解释为扫描所有集合的规范。 为避免这种情况,必须在星号前加上反斜杠 (\) 对星号进行转义。 对名为 "*" 的集合进行转义,如下所示:

    "\*"
  • 如果collection配置选项中使用的集合名称包含反斜杠 (\),则 Spark Connector 会将反斜杠视为转义字符,这可能会改变它对该值的解释方式。 为避免这种情况,必须在反斜杠前面加上另一个反斜杠来对其进行转义。 对名为 "\collection" 的集合进行转义,如下所示:

    "\\collection"

    注意

    string在 中将集合名称指定为Java 文字时,必须使用另一个反斜杠进一步转义每个反斜杠。例如,对名为 "\collection" 的集合进行转义,如下所示:

    "\\\\collection"

您可以通过将星号 (*) 作为集合名称的string传递,从数据库中的所有集合进行流式传输。

指定所有集合,如下例所示:

...
.option("spark.mongodb.collection", "*")

如果您在从所有集合进行流式传输时创建集合,则新集合会自动包含在流中。

在从多个集合进行流式传输时,您可以随时删除集合。

重要

推断具有多个集合的模式

如果将change.stream.publish.full.document.only选项设置为true ,Spark Connector 将使用扫描文档的模式推断DataFrame的模式。

模式推断发生在流式传输开始时,并且不考虑流式传输期间创建的集合。

从多个集合进行流式传输并推断模式时, connector会按顺序对每个集合进行采样。 来自大量集合的流式传输可能会导致模式推断的性能明显降低。 这种性能影响仅在推断模式时才会发生。