Docs Menu
Docs Home
/ /
Lea
/ / /

Opciones de configuración de lectura en streaming

Puedes configurar las siguientes propiedades cuando leas datos desde MongoDB en modo transmisión.

Nota

Si utilizas SparkConf Para establecer las configuraciones de lectura del conector, agregue el prefijo spark.mongodb.read. a cada propiedad.

Nombre de la propiedad
Descripción

connection.uri

Required.
The connection string configuration key.

Default: mongodb://localhost:27017/

database

Required.
The database name configuration.

collection

Required.
The collection name configuration.

comment

The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None

mongoClientFactory

MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

aggregation.pipeline

Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

Los pipelines de agregación personalizados deben ser compatibles con la estrategia de particionamiento. Por ejemplo, las etapas de agregación como $group no funcionan con ningún particionador que cree más de una partición.

aggregation.allowDiskUse

Specifies whether to allow storage to disk when running the aggregation.

Default: true

change.stream.

Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.

outputExtendedJson

When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false

Puede configurar las siguientes propiedades al leer un flujo de cambios desde MongoDB:

Nombre de la propiedad
Descripción

change.stream.lookup.full.document

Determina qué valores devuelve su flujo de cambios en las operaciones de actualización.

La configuración predeterminada devuelve las diferencias entre el documento original y el documento actualizado.

La configuración updateLookup también devuelve las diferencias entre el documento original y el documento actualizado, pero también incluye una copia de todo el documento actualizado.

Para obtener más información sobre cómo funciona esta opción de flujo de cambios, consulte la guía del manual del servidor MongoDB Buscar documento completo para la operación de actualización.

Predeterminado: "predeterminado"

change.stream.micro.batch.max.partition.count

The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

ADVERTENCIA: Especificar un valor mayor que 1 puede alterar el orden en que Spark Connector procesa los eventos de cambio. Evite esta configuración si el procesamiento fuera de orden podría generar inconsistencias en los datos posteriormente.

change.stream.publish.full.document.only

Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • El conector filtra los mensajes que omiten el campo fullDocument y publica solo el valor del campo.

  • Si no especifica un esquema, el conector infiere el esquema del documento de flujo de cambios en lugar de hacerlo de la colección subyacente.

Esta configuración anula la configuración change.stream.lookup.full.document.

Por defecto: false

change.stream.startup.mode

Specifies how the connector starts up when no offset is available.
This setting accepts the following values:
  • latestEl conector comienza a procesar eventos de cambio a partir del evento más reciente. No procesará ningún evento anterior sin procesar.

  • timestamp:El conector comienza a procesar eventos de cambio en un momento específico.

    Para usar la opción timestamp, debe especificar una hora mediante la configuración change.stream.startup.mode.timestamp.start.at.operation.time. Esta configuración acepta marcas de tiempo en los siguientes formatos:

    Por defecto: latest

Si utiliza SparkConf para especificar cualquiera de las configuraciones anteriores, puede incluirlas en la connection.uri configuración o enumerarlas individualmente.

El siguiente ejemplo de código muestra cómo especificar la base de datos, la colección y la preferencia de lectura como parte de la configuración connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

Para mantener el connection.uri más breve y hacer que la configuración sea más fácil de leer, puedes especificarlos individualmente en su lugar:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

Importante

Si especifica una configuración tanto en la línea connection.uri como en la línea correspondiente, la configuración connection.uri tiene prioridad. Por ejemplo, en la siguiente configuración, la base de datos de conexión es foobar, porque es el valor de la configuración connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

Volver

Lea

En esta página