Overview
以流式传输模式从 MongoDB 读取数据时,可以配置以下属性。
注意
如果您使用 SparkConf
设置连接器的读取配置,请为每个属性添加前缀 spark.mongodb.read.
。
属性名称 | 说明 | ||
---|---|---|---|
| Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
| Required. The database name configuration. | ||
| Required. The collection name configuration. You can specify multiple collections by separating the collection names
with a comma. To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property. | ||
| The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
| MongoClientFactory configuration key. You can specify a custom implementation, which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
| Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following:
A list of documents resembles the following:
自定义聚合管道必须与分区器策略兼容。例如,诸如 | ||
| Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
| Change stream configuration prefix. See the
Change Stream Configuration section for more
information about change streams. | ||
| When true , the connector converts BSON types not supported by Spark into
extended JSON strings.
When false , the connector uses the original relaxed JSON format for
unsupported types.Default: false |
变更流配置
从 MongoDB 读取变更流时,您可以配置以下属性:
属性名称 | 说明 |
---|---|
| 确定变更流在更新操作中返回的值。 默认设置返回原始文档和更新文档之间的差异。
有关此变更流选项如何工作的更多信息,请参阅 MongoDB 服务器手册指南“查找更新操作的完整文档”。 默认值: "default" |
| The maximum number of partitions the Spark Connector divides each
micro-batch into. Spark workers can process these partitions in parallel. This setting applies only when using micro-batch streams. Default: 1 警告:指定大于 |
| Specifies whether to publish the changed document or the full
change stream document. When this setting is false , you must specify a schema. The schema
must include all fields that you want to read from the change stream. You can
use optional fields to ensure that the schema is valid for all change-stream
events.When this setting is true , the connector exhibits the following behavior:
此设置会覆盖 默认: |
| Specifies how the connector starts up when no offset is available. This setting accepts the following values:
|
指定属性,在 connection.uri
如果使用 SparkConf 指定了之前的任何设置,可以将其包含在 connection.uri
设置中,也可以单独列出。
以下代码示例显示如何将数据库、集合和读取偏好指定为 connection.uri
设置的一部分:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
为了缩短 connection.uri
并使设置更易于阅读,您可以改为单独指定它们:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection spark.mongodb.read.readPreference.name=primaryPreferred
重要
如果您在 connection.uri
及其自己的行中指定设置,则 connection.uri
设置优先。例如,在以下配置中,连接数据库为 foobar
,因为它是 connection.uri
设置中的值:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
在 属性中指定多个集合collection
您可以在collection
变更流 配置属性中指定多个集合,用逗号分隔集合名称。 不要在集合之间添加空格,除非空格是集合名称的一部分。
指定多个集合,如下例所示:
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
如果集合名称为 "*",或者名称包含逗号或反斜杠 (\),则必须按如下方式对字符进行转义:
如果
collection
配置选项中使用的集合名称包含逗号,则 Spark Connector 会将其视为两个不同的集合。 为避免这种情况,必须在逗号前面加上反斜杠 (\) 来对逗号进行转义。 对名为 "my,collection" 的集合进行转义,如下所示:"my\,collection" 如果
collection
配置选项中使用的集合名称为“*”,Spark Connector 会将其解释为扫描所有集合的规范。 为避免这种情况,必须在星号前加上反斜杠 (\) 对星号进行转义。 对名为 "*" 的集合进行转义,如下所示:"\*" 如果
collection
配置选项中使用的集合名称包含反斜杠 (\),则 Spark Connector 会将反斜杠视为转义字符,这可能会改变它对该值的解释方式。 为避免这种情况,必须在反斜杠前面加上另一个反斜杠来对其进行转义。 对名为 "\collection" 的集合进行转义,如下所示:"\\collection" 注意
string在 中将集合名称指定为Java 文字时,必须使用另一个反斜杠进一步转义每个反斜杠。例如,对名为 "\collection" 的集合进行转义,如下所示:
"\\\\collection"
您可以通过将星号 (*) 作为集合名称的string传递,从数据库中的所有集合进行流式传输。
指定所有集合,如下例所示:
... .option("spark.mongodb.collection", "*")
如果您在从所有集合进行流式传输时创建集合,则新集合会自动包含在流中。
在从多个集合进行流式传输时,您可以随时删除集合。
重要
推断具有多个集合的模式
如果将change.stream.publish.full.document.only
选项设置为true
,Spark Connector 将使用扫描文档的模式推断DataFrame
的模式。
模式推断发生在流式传输开始时,并且不考虑流式传输期间创建的集合。
从多个集合进行流式传输并推断模式时, connector会按顺序对每个集合进行采样。 来自大量集合的流式传输可能会导致模式推断的性能明显降低。 这种性能影响仅在推断模式时才会发生。