Menu Docs
Página inicial do Docs
/
Conector do Spark
/ /

Opções de configuração de leitura em lote

Você pode configurar as seguintes propriedades ao ler dados do MongoDB no modo de lote.

Observação

Se você usa o SparkConf para definir as configurações de leitura do conector, insira spark.mongodb.read. como prefixo em cada propriedade.

Nome da propriedade
Descrição

connection.uri

Required.
The connection string configuration key.

Default: mongodb://localhost:27017/

database

Required.
The database name configuration.

collection

Required.
The collection name configuration.

comment

The comment to append to the read operation. Comments appear in the output of the Database Profiler.

Default: None

mode

The parsing strategy to use when handling documents that don't match the expected schema. This option accepts the following values:
  • ReadConfig.ParseMode.FAILFAST: Lança uma exceção ao analisar um documento que não corresponde ao esquema.

  • ReadConfig.ParseMode.PERMISSIVE: Define campos para null quando os tipos de dados não correspondem ao esquema. Para armazenar cada documento inválido como uma string JSON estendida, combine este valor com a opção columnNameOfCorruptRecord .

  • ReadConfig.ParseMode.DROPMALFORMED: ignora qualquer documento que não corresponda ao esquema.


Default: ReadConfig.ParseMode.FAILFAST

columnNameOfCorruptRecord

If you set the mode option to ReadConfig.ParseMode.PERMISSIVE, this option specifies the name of the new column that stores the invalid document as extended JSON. If you're using an explicit schema, it must include the name of the new column. If you're using an inferred schema, the Spark Connector adds the new column to the end of the schema.

Default: None

mongoClientFactory

MongoClientFactory configuration key.
You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

partitioner

The partitioner full class name.
You can specify a custom implementation that must implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface.
See the Partitioner Configuration section for more information about partitioners.

Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner

partitioner.options.

Partitioner configuration prefix.
See the Partitioner Configuration section for more information about partitioners.

sampleSize

The number of documents to sample from the collection when inferring
the schema.

Default: 1000

sql.inferSchema.mapTypes.enabled

Whether to enable Map types when inferring the schema.
When enabled, large compatible struct types are inferred to a MapType instead.

Default: true

sql.inferSchema.mapTypes.minimum.key.size

Minimum size of a StructType before inferring as a MapType.

Default: 250

aggregation.pipeline

Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark.
The value must be either an extended JSON single document or list of documents.
A single document resembles the following:
{"$match": {"closed": false}}
A list of documents resembles the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

IMPORTANTE: os pipelines de agregação personalizados devem ser compatíveis com a estratégia do particionador. Por exemplo, estágios de agregação como $group não funcionam com nenhum particionador que crie mais de uma partição.

aggregation.allowDiskUse

Specifies whether to allow storage to disk when running the aggregation.

Default: true

outputExtendedJson

When true, the connector converts BSON types not supported by Spark into extended JSON strings. When false, the connector uses the original relaxed JSON format for unsupported types.

Default: false

schemaHints

Specifies a partial schema of known field types to use when inferring the schema for the collection. To learn more about the schemaHints option, see the Specify Known Fields with Schema Hints section.

Default: None

Particionadores alteram o comportamento de leitura das leituras em lote que usam o Conector Spark. Ao dividir os dados em partições, você pode executar transformações em paralelo.

Esta seção contém informações de configuração para o seguinte particionador:

Observação

Somente leituras em lote

Como o mecanismo de processamento de fluxo de dados produz um único fluxo de dados, os particionadores não afetam as leituras de streaming.

SamplePartitioner é a configuração padrão do particionador. Esta configuração permite especificar um campo de partição, tamanho da partição e número de amostras por partição.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner.

Nome da propriedade
Descrição

partitioner.options.partition.field

O campo a ser usado para o particionamento, que deve ser um campo único.

Padrão: _id

partitioner.options.partition.size

O tamanho (em MB) de cada partição. Tamanhos menores de partição criam mais partições contendo menos documentos.

Padrão: 64

partitioner.options.samples.per.partition

O número de amostras a serem coletadas por partição. O número total de amostras coletadas é:

samples per partition * ( count / number of documents per partition)

Padrão: 10

Exemplo

Para uma coleção com 640 documentos e com um tamanho médio de 0,5 MB por documento, a configuração padrão SamplePartitioner cria 5 partições com 128 documentos por partição.

O Conector Spark faz amostras de 50 documentos (padrão de 10 por cada partição pretendida) e define 5 partições ao selecionar intervalos de campos de partição dos documentos de amostra.

A configuração do ShardedPartitioner automaticamente particiona os dados com base na sua configuração de fragmento.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner.

Importante

Restrições do ShardedPartitioner

  1. No MongoDB Server v6.0 e posterior, a operação de fragmentação cria um grande chunk inicial para cobrir todos os valores de chave de shard, tornando o particionador fragmentado ineficiente. Não recomendamos usar o particionador fragmentado quando conectado ao MongoDB v6.0 e posterior.

  2. O particionador fragmentado não é compatível com chaves de shard com hash.

A configuração do PaginateBySizePartitioner pagina os dados utilizando o tamanho médio do documento para dividir a coleção em partes de tamanho médio.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner.

Nome da propriedade
Descrição

partitioner.options.partition.field

O campo a ser usado para o particionamento, que deve ser um campo único.

Padrão: _id

partitioner.options.partition.size

O tamanho (em MB) de cada partição. Tamanhos de partição menores

criam mais partições contendo menos documentos.

Padrão: 64

A configuração do PaginateIntoPartitionsPartitioner pagina os dados dividindo a contagem de documentos na coleção pelo número máximo de partições permitidas.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner.

Nome da propriedade
Descrição

partitioner.options.partition.field

O campo a ser usado para o particionamento, que deve ser um campo único.

Padrão: _id

partitioner.options.max.number.of.partitions

O número de partições a serem criadas.

Padrão: 64

A configuração do SinglePartitionPartitioner cria uma única partição.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner.

A configuração AutoBucketPartitioner é semelhante à configuração SamplePartitioner , mas utiliza o estágio de agregação $bucketAuto para paginar os dados. Ao usar essa configuração, você pode particionar os dados em um ou vários campos, incluindo campos aninhados.

Para usar esta configuração, defina a opção de configuração partitioner como com.mongodb.spark.sql.connector.read.partitioner.AutoBucketPartitioner.

Nome da propriedade
Descrição

partitioner.options.partition.fieldList

A lista de campos a serem usados para o particionamento. O valor pode ser um único nome de campo ou uma lista de campos separados por vírgula.

Padrão: _id

partitioner.options.partition.chunkSize

O tamanho médio (MB) para cada partição. Tamanhos de partição menores criam mais partições contendo menos documentos. Como essa configuração usa o tamanho médio do documento para determinar o número de documentos por partição, as partições podem não ter o mesmo tamanho.

Padrão: 64

partitioner.options.partition.samplesPerPartition

O número de amostras a serem coletadas por partição.

Padrão: 100

partitioner.options.partition.partitionKeyProjectionField

O nome do campo a ser usado para um campo projetado que contém todos os campos usados para dividir a coleção. Recomendamos alterar o valor dessa propriedade somente se cada documento já contiver o campo __idx .

Padrão: __idx

Se você usa SparkConf para especificar qualquer uma das configurações anteriores, você poderá incluí-las na configuração do connection.uri ou listá-las individualmente.

O exemplo de código abaixo mostra como especificar o banco de dados, coleção e preferência de leitura como parte da configuração do connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

Para manter o connection.uri curto e facilitar a leitura das configurações, você pode especificá-las individualmente:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection
spark.mongodb.read.readPreference.name=primaryPreferred

Importante

Se você especificar uma configuração em connection.uri e em sua própria linha, a configuração connection.uri terá precedência. Por exemplo, na configuração abaixo, o banco de dados de conexão é foobar, porque é o valor na configuração connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

Voltar

Leia

Nesta página