Overview
以流式传输模式从 MongoDB 读取数据时,可以配置以下属性。
注意
如果您使用 SparkConf 设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.。
属性名称 | 说明 | ||
|---|---|---|---|
| 必需。 | ||
| 必需。 | ||
| 必需。 | ||
| |||
| 处理与预期模式不匹配的文档时使用的解析策略。 此选项接受以下值:
| ||
| 如果将 | ||
| MongoClientFactory 配置键。您可以指定自定义实施,该实现必须实现 接口。默认值: | ||
| 指定在将数据发送到Spark之前应用集合的自定义聚合管道。该值必须是扩展的JSON单个文档或文档列表。单个文档类似于以下内容: 文档列表类似于以下内容: 自定义聚合管道必须与分区器策略兼容。例如,诸如 | ||
| 指定运行聚合时是否允许存储到磁盘。默认值: | ||
| |||
| 当为 | ||
|
变更流配置
从 MongoDB 读取变更流时,您可以配置以下属性:
属性名称 | 说明 |
|---|---|
| 确定变更流在更新操作中返回的值。 默认设置返回原始文档和更新文档之间的差异。
有关此变更流选项如何工作的更多信息,请参阅 MongoDB 服务器手册指南“查找更新操作的完整文档”。 默认值: "default" |
| Spark Connector将每个微批处理划分为的最大分区数。 Spark Workers 可以并行进程这些分区。此设置仅在使用微批处理流时适用。默认值: 警告:指定大于 |
| 指定发布变更后的文档还是完整的变更流文档。当此设置为
此设置会覆盖 默认: |
| 指定connector在没有可用偏移时的启动方式。 此设置可接受以下值:
|
| 确定是否在变更流输出中包含已修改文档的前像。此设置接受以下值:
默认: |
指定属性,在 connection.uri
如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri 设置中,也可以单独列出。
以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri 设置的一部分:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
为了缩短 connection.uri 并使设置更易于阅读,您可以改为单独指定它们:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection
重要
如果您在 connection.uri 及其自己的行中指定设置,则 connection.uri 设置优先。例如,在以下配置中,连接数据库为 foobar,因为它是 connection.uri 设置中的值:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
在 collection 属性中指定多个集合
您可以在collection变更流 配置属性中指定多个集合,用逗号分隔集合名称。 不要在集合之间添加空格,除非空格是集合名称的一部分。
指定多个集合,如下例所示:
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
如果集合名称为 "*",或者名称包含逗号或反斜杠 (\),则必须按如下方式对字符进行转义:
如果
collection配置选项中使用的集合名称包含逗号,则 Spark Connector 会将其视为两个不同的集合。 为避免这种情况,必须在逗号前面加上反斜杠 (\) 来对逗号进行转义。 对名为 "my,collection" 的集合进行转义,如下所示:"my\,collection" 如果
collection配置选项中使用的集合名称为“*”,Spark Connector 会将其解释为扫描所有集合的规范。 为避免这种情况,必须在星号前加上反斜杠 (\) 对星号进行转义。 对名为 "*" 的集合进行转义,如下所示:"\*" 如果
collection配置选项中使用的集合名称包含反斜杠 (\),则 Spark Connector 会将反斜杠视为转义字符,这可能会改变它对该值的解释方式。 为避免这种情况,必须在反斜杠前面加上另一个反斜杠来对其进行转义。 对名为 "\collection" 的集合进行转义,如下所示:"\\collection" 注意
string在 中将集合名称指定为Java 文字时,必须使用另一个反斜杠进一步转义每个反斜杠。例如,对名为 "\collection" 的集合进行转义,如下所示:
"\\\\collection"
您可以通过将星号 (*) 作为集合名称的string传递,从数据库中的所有集合进行流式传输。
指定所有集合,如下例所示:
... .option("spark.mongodb.collection", "*")
如果您在从所有集合进行流式传输时创建集合,则新集合会自动包含在流中。
在从多个集合进行流式传输时,您可以随时删除集合。
重要
推断具有多个集合的模式
如果将change.stream.publish.full.document.only选项设置为true ,Spark Connector 将使用扫描文档的模式推断DataFrame的模式。
模式推断发生在流式传输开始时,并且不考虑流式传输期间创建的集合。
从多个集合进行流式传输并推断模式时, connector会按顺序对每个集合进行采样。 来自大量集合的流式传输可能会导致模式推断的性能明显降低。 这种性能影响仅在推断模式时才会发生。