Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

Opciones de configuración de lectura en streaming

Puedes configurar las siguientes propiedades cuando leas datos desde MongoDB en modo transmisión.

Nota

Si usas SparkConf para configurar las configuraciones de lectura del conector, anteponer spark.mongodb.read. a cada propiedad.

Nombre de la propiedad
Descripción

connection.uri

Required.
The connection string configuration key.

Default: mongodb://localhost:27017/

database

Required.
The database name configuration.

collection

Required.
The collection name configuration.
You can specify multiple collections by separating the collection names with a comma.

To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property.

comment

The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None

mode

The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST: Lanza una excepción al analizar un documento que no coincide con el esquema.

  • ReadConfig.ParseMode.PERMISSIVE: Establece los campos como null cuando los tipos de datos no coinciden con el esquema. Para almacenar cada documento no válido como una cadena JSON extendida, combine este valor con la opción columnNameOfCorruptRecord.

  • ReadConfig.ParseMode.DROPMALFORMED: Ignora cualquier documento que no coincida con el esquema.


Default: ReadConfig.ParseMode.FAILFAST

columnNameOfCorruptRecord

If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None

mongoClientFactory

MongoClientFactory configuration key.
You can specify a custom implementation, which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

aggregation.pipeline

Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

Los pipelines de agregación personalizados deben ser compatibles con la estrategia de particionamiento. Por ejemplo, las etapas de agregación como $group no funcionan con ningún particionador que cree más de una partición.

aggregation.allowDiskUse

Specifies whether to allow storage to disk when running the aggregation.

Default: true

change.stream.

Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.

outputExtendedJson

When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false

schemaHints

Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHints option, see the Specify Known Fields with Schema Hints section.

Default: None

Puedes configurar las siguientes propiedades al leer un flujo de cambios desde MongoDB:

Nombre de la propiedad
Descripción

change.stream.lookup.full.document

Determina qué valores devuelve tu flujo de cambios en las operaciones de actualización.

La configuración por defecto devuelve las diferencias entre el documento original y el documento actualizado.

La configuración updateLookup también devuelve las diferencias entre el documento original y el documento actualizado, pero también incluye una copia de todo el documento actualizado.

Para obtener más información sobre cómo funciona esta opción de flujo de cambios, consulte la guía del manual del servidor MongoDB Buscar documento completo para la operación de actualización.

Predeterminado: "predeterminado"

change.stream.micro.batch.max.partition.count

The maximum number of partitions the Spark Connector divides each micro-batch into. Spark workers can process these partitions in parallel.

This setting applies only when using micro-batch streams.

Default: 1

ADVERTENCIA: Especificar un valor mayor que 1 puede alterar el orden en que Spark Connector procesa los eventos de cambio. Evite esta configuración si el procesamiento fuera de orden podría generar inconsistencias en los datos posteriormente.

change.stream.publish.full.document.only

Specifies whether to publish the changed document or the full change stream document.

When this setting is false, you must specify a schema. The schema must include all fields that you want to read from the change stream. You can use optional fields to ensure that the schema is valid for all change-stream events.

When this setting is true, the connector exhibits the following behavior:
  • El conector filtra los mensajes que omiten el campo fullDocument y solo publica el valor del campo.

  • Si no especificas un esquema, el conector infiere el esquema del documento de flujo de cambios.

Esta configuración anula la configuración change.stream.lookup.full.document.

Por defecto: false

change.stream.startup.mode

Specifies how the connector starts up when no offset is available.
This setting accepts the following values:
  • latestEl conector comienza el procesamiento de los eventos de cambio, empezando por el evento más reciente. No se procesarán eventos anteriores no procesados.

  • timestamp:El conector comienza a procesar eventos de cambio en un momento específico.

    Para usar la opción timestamp, debe especificar una hora mediante la configuración change.stream.startup.mode.timestamp.start.at.operation.time. Esta configuración acepta marcas de tiempo en los siguientes formatos:

    Por defecto: latest

change.stream.lookup.full.document.before.change

Determines whether to include the pre-image of modified documents in the change stream output.

This setting accepts the following values:
  • default: Esta opción es equivalente al valor off.

  • off:No incluye la imagen previa de los documentos modificados en la salida del flujo de cambios.

  • whenAvailable:Incluye la imagen previa de los documentos modificados en la salida del flujo de cambios cuando hay imágenes previas disponibles.

  • required: Incluye la preimagen de los documentos modificados en la salida del flujo de cambios. Si no se dispone de una preimagen para un documento modificado, el conector arroja un error.

Por defecto: default

Si utiliza SparkConf para especificar cualquiera de las configuraciones anteriores, puede incluirlas en la connection.uri configuración o enumerarlas individualmente.

El siguiente ejemplo de código muestra cómo especificar la base de datos, la colección y la preferencia de lectura como parte de la configuración connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

Para mantener el connection.uri más breve y hacer que la configuración sea más fácil de leer, puedes especificarlos individualmente en su lugar:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection

Importante

Si se especifica una configuración tanto en el connection.uri como en una línea independiente, la configuración connection.uri tiene prioridad. Por ejemplo, en la siguiente configuración, la base de datos de conexión es foobar, porque es el valor en la opción connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

Se pueden especificar varias colecciones en la propiedad de configuración del flujo de cambios collection separando los nombres de las colecciones con una coma. No agregue un espacio entre las colecciones a menos que el espacio forme parte del nombre de la colección.

Especifica varias colecciones como se muestra en el siguiente ejemplo:

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

Si el nombre de una colección es "*", o si el nombre incluye una coma o una barra invertida (\), debe escapar el carácter de la siguiente manera:

  • Si el nombre de una colección utilizada en la opción de configuración collection contiene una coma, el Conector Spark la trata como dos colecciones diferentes. Para evitarlo, debe escapar la coma precediéndola de una barra invertida (\). Escápela como sigue:

    "my\,collection"
  • Si el nombre de una colección utilizada en tu opción de configuración de collection es “*”, el Spark Connector lo interpreta como una especificación para escanear todas las colecciones. Para evitar esto, deben escapar el asterisco anteponiéndole una barra invertida (\). Escapa una colección con el nombre "*" de la siguiente manera:

    "\*"
  • Si el nombre de una colección usada en tu opción de configuración collection contiene una barra invertida (\), el Spark Connector trata la barra invertida como un carácter de escape, lo que podría cambiar la forma en que interpreta el valor. Para evitar esto, debes escapar la barra invertida precediéndola con otra barra invertida. Escapa una colección llamada "\collection" de la siguiente manera:

    "\\collection"

    Nota

    Al especificar el nombre de la colección como un literal de cadena en Java, debe usar una barra invertida para escapar cada barra invertida. Por ejemplo, escape una colección llamada "\collection" de la siguiente manera:

    "\\\\collection"

Puedes transmitir desde todas las colecciones en la base de datos pasando un asterisco (*) como string para el nombre de la colección.

Especifica todas las colecciones como se muestra en el siguiente ejemplo:

...
.option("spark.mongodb.collection", "*")

Si crea una colección mientras transmite desde todas las colecciones, la nueva colección se incluye automáticamente en la transmisión.

Puedes descartar colecciones en cualquier momento mientras transmites desde varias colecciones.

Importante

Inferir el esquema con múltiples colecciones

Si establece la opción change.stream.publish.full.document.only en true, Spark Connector infiere el esquema de un DataFrame utilizando el esquema de los documentos escaneados.

La inferencia de esquemas ocurre al inicio de la transmisión y no toma en cuenta las colecciones que se crean durante la transmisión.

Al hacer transmisión desde varias colecciones y deducir el esquema, el conector toma muestras de cada colección de forma secuencial. La transmisión de un gran número de colecciones puede provocar que la inferencia del esquema tenga un rendimiento notablemente más lento. Este impacto en el rendimiento ocurre solo al inferir el esquema.

Volver

Lea

En esta página