Para agentes de IA: hay un índice de documentación disponible en https://www.mongodb.com/es/docs/llms.txt — versiones en markdown de todas las páginas están disponibles agregando .md a cualquier ruta URL.
Docs Menu

Opciones de configuración de lectura de transmisión

Puedes configurar las siguientes propiedades cuando leas datos desde MongoDB en modo transmisión.

Nota

Si utiliza SparkConf para establecer las configuraciones de lectura del conector, anteponga spark.mongodb.read. a cada propiedad.

Nombre de la propiedad
Descripción

connection.uri

Requerido.
Clave de configuración de la cadena de conexión.

Valor predeterminado: mongodb://localhost:27017/

database

Requerido.
La configuración del nombre de la base de datos.

collection

Requerido.
Configuración del nombre de
la colección. Puede especificar varias colecciones separando los nombres con comas.

Para obtener más información sobre cómo especificar varias colecciones, consulte la sección "Especificar varias colecciones en la collection propiedad ".

comment

Comentario que se añadirá a la operación de lectura. Los comentarios aparecen en la salida del Generador de perfiles de base de datos. Valor

predeterminado: Ninguno.

mode

La estrategia de análisis que se debe utilizar al manejar documentos que no coincidan con el esquema esperado. Esta opción acepta los siguientes valores:

  • ReadConfig.ParseMode.FAILFAST: Lanza una excepción al analizar un documento que no coincide con el esquema.

  • ReadConfig.ParseMode.PERMISSIVE: Establece los campos en null cuando los tipos de datos no coinciden con el esquema. Para almacenar cada documento inválido como una string JSON extendida, combina este valor con la opción columnNameOfCorruptRecord.

  • ReadConfig.ParseMode.DROPMALFORMED: Ignora cualquier documento que no coincida con el esquema.


Por defecto: ReadConfig.ParseMode.FAILFAST

columnNameOfCorruptRecord

Si establece la mode opción ReadConfig.ParseMode.PERMISSIVE en, esta opción especifica el nombre de la nueva columna que almacena el documento no válido como JSON extendido. Si utiliza un esquema explícito, este debe incluir el nombre de la nueva columna. Si utiliza un esquema inferido, el conector de Spark agrega la nueva columna al final del esquema.

Predeterminado: Ninguno

mongoClientFactory

Clave de configuración de MongoClientFactory.
Puede especificar una implementación personalizada, que debe implementar la com.mongodb.spark.sql.connector.connection.MongoClientFactory interfaz.

Valor predeterminado:. com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

aggregation.pipeline

Especifica una canalización de agregación personalizada que se aplicará a la colección antes de enviar los datos a Spark.
El valor debe ser un documento JSON único extendido o una lista de documentos.
Un documento único tiene el siguiente formato:

{"$match": {"closed": false}}

Una lista de documentos se parece a lo siguiente:

[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

Los pipelines de agregación personalizados deben ser compatibles con la estrategia de particionamiento. Por ejemplo, las etapas de agregación como $group no funcionan con ningún particionador que cree más de una partición.

aggregation.allowDiskUse

Especifica si se debe permitir el almacenamiento en disco al ejecutar la agregación.

Valor predeterminado: true

change.stream.

Prefijo de configuración de flujo de cambios. Consulte
la sección Configuración de flujo de cambios para obtener más información sobre los flujos de cambios.

outputExtendedJson

trueCuando, el conector convierte los tipos BSON no compatibles con Spark en cadenas JSON extendidas.false Cuando, el conector utiliza el formato JSON relajado original para los tipos no compatibles.

Valor predeterminado: false

schemaHints

Especifica un esquema parcial de tipos de campo conocidos que se utilizará al inferir el esquema de la colección. Para obtener más información sobre la schemaHints opción, consulte la sección Especificar campos conocidos con sugerencias de esquema.

Valor predeterminado: Ninguno

Puedes configurar las siguientes propiedades al leer un flujo de cambios desde MongoDB:

Nombre de la propiedad
Descripción

change.stream.lookup.full.document

Determina qué valores devuelve tu flujo de cambios en las operaciones de actualización.

La configuración por defecto devuelve las diferencias entre el documento original y el documento actualizado.

La configuración updateLookup también devuelve las diferencias entre el documento original y el documento actualizado, pero también incluye una copia de todo el documento actualizado.

Para más información sobre cómo funciona esta opción de flujo de cambios, consulta la guía del manual del servidor de MongoDB Búsqueda de documento completo para la operación de actualización.

Predeterminado: "por defecto"

change.stream.micro.batch.max.partition.count

Número máximo de particiones en las que el conector de Spark divide cada micro-lote. Los trabajadores de Spark pueden procesar estas particiones en paralelo.

Esta configuración solo se aplica al usar flujos de micro-lotes.

Valor predeterminado: 1

ADVERTENCIA: Especificar un valor mayor a 1 puede alterar el orden en que el Spark Connector procesa los eventos de cambio. Evite esta configuración si el procesamiento fuera de orden pudiera generar inconsistencias de datos en etapas posteriores.

change.stream.publish.full.document.only

Especifica si se debe publicar el documento modificado o el documento completo del flujo de cambios.

Cuando esta configuración false es, debe especificar un esquema. El esquema debe incluir todos los campos que desea leer del flujo de cambios. Puede usar campos opcionales para garantizar que el esquema sea válido para todos los eventos del flujo de cambios.

Cuando esta configuración true es, el conector presenta el siguiente comportamiento:

  • El conector filtra los mensajes que omiten el campo fullDocument y solo publica el valor del campo.

  • Si no especificas un esquema, el conector infiere el esquema del documento de flujo de cambios.

Esta configuración anula la configuración change.stream.lookup.full.document.

Por defecto: false

change.stream.startup.mode

Especifica cómo el conector se inicia cuando no hay ningún offset disponible.

Esta configuración acepta los siguientes valores:

  • latestEl conector comienza el procesamiento de los eventos de cambio, empezando por el evento más reciente. No se procesarán eventos anteriores no procesados.

  • timestampEl conector empieza el procesamiento de eventos de cambio en una hora especificada.

    Para utilizar la opción timestamp, debe especificar un horario utilizando la configuración change.stream.startup.mode.timestamp.start.at.operation.time. Esta configuración acepta marcas de tiempo en los siguientes formatos:

    • Un número entero que representa la cantidad de segundos desde la época Unix

    • Una fecha y hora en formato ISO-8601 con precisión de un segundo

    • Un JSON extendido BsonTimestamp

    Por defecto: latest

change.stream.lookup.full.document.before.change

Determina si se debe incluir la imagen previa de los documentos modificados en la salida del flujo de cambios.

Esta configuración acepta los siguientes valores:

  • default: Esta opción es equivalente al valor off.

  • off: No incluye la imagen previa de los documentos modificados en la salida del flujo de cambios.

  • whenAvailable- Incluye la preimagen de documentos modificados en la salida del flujo de cambios cuando las preimágenes están disponibles.

  • required: Incluye la preimagen de los documentos modificados en la salida del flujo de cambios. Si no se dispone de una preimagen para un documento modificado, el conector arroja un error.

Por defecto: default

Si utiliza SparkConf para especificar cualquiera de las configuraciones anteriores, puede incluirlas en la configuración connection.uri o enumerarlas individualmente.

El siguiente ejemplo de código muestra cómo especificar la base de datos, la colección y la preferencia de lectura como parte de la configuración connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

Para mantener el connection.uri más breve y hacer que la configuración sea más fácil de leer, puedes especificarlos individualmente en su lugar:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection

Importante

Si se especifica una configuración tanto en el connection.uri como en una línea independiente, la configuración connection.uri tiene prioridad. Por ejemplo, en la siguiente configuración, la base de datos de conexión es foobar, porque es el valor en la opción connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

Se pueden especificar varias colecciones en la propiedad de configuración del flujo de cambios collection separando los nombres de las colecciones con una coma. No agregue un espacio entre las colecciones a menos que el espacio forme parte del nombre de la colección.

Especifica varias colecciones como se muestra en el siguiente ejemplo:

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

Si un nombre de colección es ' * ', o si el nombre incluye una coma o una barra invertida (\), debes colocar un carácter de escape como se indica a continuación:

  • Si el nombre de una colección utilizada en la opción de configuración collection contiene una coma, el Spark Connector la tratará como dos colecciones diferentes. Para evitar esto, debes escapar la coma precediéndola con una barra invertida (\). Escape una colección llamada "my,collection" de la siguiente manera:

    "my\,collection"
  • Si el nombre de una colección utilizada en tu opción de configuración de collection es “*”, el Spark Connector lo interpreta como una especificación para escanear todas las colecciones. Para evitar esto, deben escapar el asterisco anteponiéndole una barra invertida (\). Escapa una colección con el nombre "*" de la siguiente manera:

    "\*"
  • Si el nombre de una colección usada en tu opción de configuración collection contiene una barra invertida (\), el Spark Connector trata la barra invertida como un carácter de escape, lo que podría cambiar la forma en que interpreta el valor. Para evitar esto, debes escapar la barra invertida precediéndola con otra barra invertida. Escapa una colección llamada "\collection" de la siguiente manera:

    "\\collection"

    Nota

    Al especificar el nombre de la colección como un literal de string en Java, hay que escapar cada barra invertida con otra más. Por ejemplo, escapa una colección llamada "\collection" de la siguiente manera:

    "\\\\collection"

Puedes transmitir desde todas las colecciones en la base de datos pasando un asterisco (*) como string para el nombre de la colección.

Especifica todas las colecciones como se muestra en el siguiente ejemplo:

...
.option("spark.mongodb.collection", "*")

Si creas una colección mientras transmites desde todas las colecciones, la nueva colección se incluye automáticamente en la transmisión.

Puedes descartar colecciones en cualquier momento mientras transmites desde varias colecciones.

Importante

Inferir el Esquema con Múltiples Colecciones

Si establece la opción change.stream.publish.full.document.only en true, Spark Connector infiere el esquema de un DataFrame utilizando el esquema de los documentos escaneados.

La inferencia de esquemas ocurre al inicio de la transmisión y no toma en cuenta las colecciones que se crean durante la transmisión.

Al hacer transmisión desde varias colecciones y deducir el esquema, el conector toma muestras de cada colección de forma secuencial. La transmisión de un gran número de colecciones puede provocar que la inferencia del esquema tenga un rendimiento notablemente más lento. Este impacto en el rendimiento ocurre solo al inferir el esquema.