Overview
Puedes configurar las siguientes propiedades cuando leas datos desde MongoDB en modo transmisión.
Nota
Si utilizas SparkConf Para establecer las configuraciones de lectura del conector, agregue el prefijo spark.mongodb.read. a cada propiedad.
Nombre de la propiedad | Descripción | ||
|---|---|---|---|
| Required. The connection string configuration key. Default: mongodb://localhost:27017/ | ||
| Required. The database name configuration. | ||
| Required. The collection name configuration. You can specify multiple collections by separating the collection names
with a comma. To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property. | ||
| The comment to append to the read operation. Comments appear in the
output of the Database Profiler. Default: None | ||
| The parsing strategy to use when handling documents that don't match the
expected schema. This option accepts the following values:
Default: ReadConfig.ParseMode.FAILFAST | ||
| If you set the mode option to ReadConfig.ParseMode.PERMISSIVE,
this option specifies the name of the new column that stores the invalid
document as extended JSON. If you're using an explicit schema, it must
include the name of the new column. If you're
using an inferred schema, the Spark Connector adds the new column to the
end of the schema.Default: None | ||
| MongoClientFactory configuration key. You can specify a custom implementation, which must implement the
com.mongodb.spark.sql.connector.connection.MongoClientFactory
interface.Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | ||
| Specifies a custom aggregation pipeline to apply to the collection
before sending data to Spark. The value must be either an extended JSON single document or list
of documents. A single document resembles the following: A list of documents resembles the following: Los pipelines de agregación personalizados deben ser compatibles con la estrategia de particionamiento. Por ejemplo, las etapas de agregación como | ||
| Specifies whether to allow storage to disk when running the
aggregation. Default: true | ||
| Change stream configuration prefix. See the
Change Stream Configuration section for more
information about change streams. | ||
| When true, the connector converts BSON types not supported by Spark into
extended JSON strings.
When false, the connector uses the original relaxed JSON format for
unsupported types.Default: false | ||
| Specifies a partial schema of known field types to use when inferring
the schema for the collection. To learn more about the schemaHints
option, see the Specify Known Fields with Schema Hints section.Default: None |
Cambiar la configuración del flujo
Puede configurar las siguientes propiedades al leer un flujo de cambios desde MongoDB:
Nombre de la propiedad | Descripción |
|---|---|
| Determina qué valores devuelve su flujo de cambios en las operaciones de actualización. La configuración predeterminada devuelve las diferencias entre el documento original y el documento actualizado. La configuración Para obtener más información sobre cómo funciona esta opción de flujo de cambios, consulte la guía del manual del servidor MongoDB Buscar documento completo para la operación de actualización. Predeterminado: "predeterminado" |
| The maximum number of partitions the Spark Connector divides each
micro-batch into. Spark workers can process these partitions in parallel. This setting applies only when using micro-batch streams. Default: 1ADVERTENCIA: Especificar un valor mayor que |
| Specifies whether to publish the changed document or the full
change stream document. When this setting is false, you must specify a schema. The schema
must include all fields that you want to read from the change stream. You can
use optional fields to ensure that the schema is valid for all change-stream
events.When this setting is true, the connector exhibits the following behavior:
Esta configuración anula la configuración Por defecto: |
| Specifies how the connector starts up when no offset is available. This setting accepts the following values:
|
| Determines whether to include the pre-image of modified documents
in the change stream output. This setting accepts the following values:
Por defecto: |
Especificación de propiedades en connection.uri
Si utiliza SparkConf para especificar cualquiera de las configuraciones anteriores, puede incluirlas en la connection.uri configuración o enumerarlas individualmente.
El siguiente ejemplo de código muestra cómo especificar la base de datos, la colección y la preferencia de lectura como parte de la configuración connection.uri:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred
Para mantener el connection.uri más breve y hacer que la configuración sea más fácil de leer, puedes especificarlos individualmente en su lugar:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ spark.mongodb.read.database=myDB spark.mongodb.read.collection=myCollection
Importante
Si especifica una configuración tanto en la línea connection.uri como en la línea correspondiente, la configuración connection.uri tiene prioridad. Por ejemplo, en la siguiente configuración, la base de datos de conexión es foobar, porque es el valor de la configuración connection.uri:
spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar spark.mongodb.read.database=bar
Especificación de múltiples colecciones en la collection propiedad
Puede especificar varias colecciones en la propiedad de configuración del flujo de cambios collection separando los nombres de las colecciones con una coma. No deje espacios entre las colecciones a menos que formen parte del nombre de la colección.
Especifique varias colecciones como se muestra en el siguiente ejemplo:
... .option("spark.mongodb.collection", "collectionOne,collectionTwo")
Si el nombre de una colección es "*", o si el nombre incluye una coma o una barra invertida (\), debe escapar el carácter de la siguiente manera:
Si el nombre de una colección utilizada en la opción de configuración
collectioncontiene una coma, el Conector Spark la trata como dos colecciones diferentes. Para evitarlo, debe escapar la coma precediéndola de una barra invertida (\). Escápela como sigue:"my\,collection" Si el nombre de una colección utilizada en la opción de configuración
collectiones "*", el Conector Spark lo interpreta como una especificación para escanear todas las colecciones. Para evitar esto, debe evitar el asterisco precediéndolo de una barra invertida (\). Para una colección llamada "*", siga estos pasos:"\*" Si el nombre de una colección utilizada en la opción de configuración
collectioncontiene una barra invertida (\), el Conector Spark la trata como un carácter de escape, lo que podría cambiar la interpretación del valor. Para evitarlo, debe escapar la barra invertida precediéndola con otra barra invertida. Escápese una colección llamada "\collection" de la siguiente manera:"\\collection" Nota
Al especificar el nombre de la colección como un literal de cadena en Java, debe usar una barra invertida para escapar cada barra invertida. Por ejemplo, escape una colección llamada "\collection" de la siguiente manera:
"\\\\collection"
Puede transmitir desde todas las colecciones de la base de datos pasando un asterisco (*) como cadena para el nombre de la colección.
Especifique todas las colecciones como se muestra en el siguiente ejemplo:
... .option("spark.mongodb.collection", "*")
Si crea una colección mientras transmite desde todas las colecciones, la nueva colección se incluye automáticamente en la transmisión.
Puedes eliminar colecciones en cualquier momento mientras transmites desde varias colecciones.
Importante
Inferir el esquema con múltiples colecciones
Si configura la opción change.stream.publish.full.document.only en true, el conector Spark infiere el esquema de un DataFrame utilizando el esquema de los documentos escaneados.
La inferencia del esquema ocurre al comienzo de la transmisión y no tiene en cuenta las colecciones que se crean durante la transmisión.
Al transmitir desde varias colecciones e inferir el esquema, el conector muestrea cada colección secuencialmente. Transmitir desde un gran número de colecciones puede ralentizar considerablemente la inferencia del esquema. Este impacto en el rendimiento solo se produce al inferir el esquema.