Para agentes de IA: um índice de documentação está disponível em https://www.mongodb.com/pt-br/docs/llms.txt — as versões de markdown de todas as páginas estão disponíveis anexando .md a qualquer caminho de URL.
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Menu Docs

Opções de configuração de leitura de streaming

Você pode configurar as seguintes propriedades ao ler dados do MongoDB no modo de streaming.

Observação

Se você usar SparkConf para definir as configurações de leitura do conector, insira spark.mongodb.read. como prefixo em cada propriedade.

Nome da propriedade
Descrição

connection.uri

Obrigatório.
A chave de

configuração da string de conexão . Padrão: mongodb://localhost:27017/

database

Obrigatório.
A configuração do nome do banco de dados.

collection

Obrigatório.
The collection name configuration.
You can specify multiple collections by separating the collection names with a comma.

To learn more about specifying multiple collections, see Specifying Multiple Collections in the collection Property.

comment

O comentário a ser anexado à operação de leitura. Os comentários aparecem na saída do Analisador de banco de dados.

Padrão: nenhum

mode

A estratégia de análise a ser usada ao lidar com documentos que não correspondem ao esquema esperado. Esta opção aceita os seguintes valores:

  • ReadConfig.ParseMode.FAILFAST: Lança uma exceção ao analisar um documento que não corresponde ao esquema.

  • ReadConfig.ParseMode.PERMISSIVE: Define campos para null quando os tipos de dados não correspondem ao esquema. Para armazenar cada documento inválido como uma string JSON estendida, combine este valor com a opção columnNameOfCorruptRecord .

  • ReadConfig.ParseMode.DROPMALFORMED: ignora qualquer documento que não corresponda ao esquema.


Padrão: ReadConfig.ParseMode.FAILFAST

columnNameOfCorruptRecord

Se você definir a mode opção ReadConfig.ParseMode.PERMISSIVE como, essa opção especificará o nome da nova coluna que armazena o documento inválido como JSON estendido. Se você estiver usando um esquema explícito, ele deverá incluir o nome da nova coluna. Se você estiver usando um esquema inferido, o conector Spark adicionará a nova coluna ao final do esquema.

Padrão: nenhum

mongoClientFactory

Chave de configuração MongoClientFactory.
Você pode especificar uma implementação personalizada, que deve implementar a com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.

Padrão: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory

aggregation.pipeline

Especifica um pipeline de agregação personalizado a ser aplicado à coleção antes de enviar dados para o Spark.
O valor deve ser um documento único JSON estendido ou uma lista de documentos.
Um único documento se assemelha ao seguinte:

{"$match": {"closed": false}}

Uma lista de documentos é semelhante ao seguinte:

[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]

Pipelines de agregação personalizados devem ser compatíveis com a estratégia do particionador. Por exemplo, estágios de agregação como $group não funcionam com nenhum particionador que cria mais de uma partição.

aggregation.allowDiskUse

Especifica se o armazenamento em disco deve ser permitido ao executar a agregação.

Padrão: true

change.stream.

Change stream configuration prefix.
See the Change Stream Configuration section for more information about change streams.

outputExtendedJson

trueQuando, o conector converte tipos BSON não suportados pelo Spark em strings JSON estendidas.false Quando, o conector usa o formato JSON relaxado original para tipos não suportados.

Padrão: false

schemaHints

Especifica um esquema parcial de tipos de campo conhecidos a serem usados ao inferir o esquema para a coleção. Para saber mais sobre a schemaHints opção, consulte a seção Especificar campos conhecidos com dicas de esquema.

Padrão: nenhum

Você pode configurar as seguintes propriedades ao ler um change stream do MongoDB:

Nome da propriedade
Descrição

change.stream.lookup.full.document

Determina quais valores seu change stream retorna nas operações de atualização.

A configuração padrão retorna as diferenças entre o documento original e o documento atualizado.

A configuração updateLookup também retorna as diferenças entre o documento original e o documento atualizado, mas também inclui uma cópia de todo o documento atualizado.

Para obter mais informações sobre como essa opção de fluxo de alterações funciona, consulte o guia manual do servidor MongoDB Pesquisar documento completo para operação de atualização.

Padrão: "default"

change.stream.micro.batch.max.partition.count

O número máximo de partições em que o conector Spark divide cada microlote. Os trabalhadores do Spark podem processar essas partições em paralelo.

Essa configuração se aplica somente ao usar fluxos de microlote.

Padrão: 1

AVISO: especificar um valor superior a 1 pode alterar a ordem em que o Spark Connector processa eventos de mudança. Evite essa configuração se o processamento fora de ordem puder criar inconsistências de dados downstream.

change.stream.publish.full.document.only

Especifica se deseja publicar o documento alterado ou o documento completo do change stream .

Quando esta configuração false é, você deve especificar um esquema. O esquema deve incluir todos os campos que você deseja ler do fluxo de alterações. Você pode usar campos opcionais para garantir que o esquema seja válido para todos os eventos do change stream.

Quando esta configuração true é, o conector exibe o seguinte comportamento:

  • O connector filtra mensagens que omitem o campo fullDocument e publica somente o valor do campo.

  • Se você não especificar um esquema, o connector inferirá o esquema a partir do documento do fluxo de alterações.

Esta configuração substitui a configuração change.stream.lookup.full.document .

Padrão: false

change.stream.startup.mode

Especifica como o connector é iniciado quando nenhum deslocamento está disponível.

Esta configuração aceita os seguintes valores:

  • latest: o connector começa a processar eventos de mudança começando com o evento mais recente. Ele não processará nenhum evento anterior não processado.

  • timestamp: o connector começa a processar eventos de mudança em um horário especificado.

    Para usar a opção timestamp , você deve especificar um horário usando a configuração change.stream.startup.mode.timestamp.start.at.operation.time . Esta configuração aceita carimbos de data/hora nos seguintes formatos:

    • Um número inteiro representando o número de segundos desde a Unix epoch

    • Uma data e hora no formato ISO-8601 com precisão de um segundo

    • Um JSON estendido BsonTimestamp

    Padrão: latest

change.stream.lookup.full.document.before.change

Determina se incluir a pré-imagem de documentos modificados na saída do fluxo de alteração.

Esta configuração aceita os seguintes valores:

  • default: esta opção é equivalente ao valor off.

  • off: não inclui a pré-imagem de documentos modificados na saída do fluxo de alterações.

  • whenAvailable: inclui a pré-imagem de documentos modificados na saída do fluxo de alterações quando as pré-imagens estão disponíveis.

  • required: inclui a pré-imagem de documentos modificados na saída do fluxo de alterações. Se uma pré-imagem não estiver disponível para um documento modificado, o conector gerará um erro.

Padrão: default

Se você usa SparkConf para especificar qualquer uma das configurações anteriores, você poderá incluí-las na configuração do connection.uri ou listá-las individualmente.

O exemplo de código abaixo mostra como especificar o banco de dados, coleção e preferência de leitura como parte da configuração do connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred

Para manter o connection.uri curto e facilitar a leitura das configurações, você pode especificá-las individualmente:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/
spark.mongodb.read.database=myDB
spark.mongodb.read.collection=myCollection

Importante

Se você especificar uma configuração em connection.uri e em sua própria linha, a configuração connection.uri terá precedência. Por exemplo, na configuração abaixo, o banco de dados de conexão é foobar, porque é o valor na configuração connection.uri:

spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar
spark.mongodb.read.database=bar

Você pode especificar várias collections na propriedade de configuração change stream collection separando os nomes das collections com uma vírgula. Não adicione um espaço entre as coleções, a menos que o espaço faça parte do nome da coleção.

Especifique várias collections, como mostrado no exemplo a seguir:

...
.option("spark.mongodb.collection", "collectionOne,collectionTwo")

Se o nome de uma coleção for "*", ou se o nome incluir uma vírgula ou uma barra invertida (\), você deverá trocar o caractere da seguinte maneira:

  • Se o nome de uma collection usada em sua opção de configuração collection contiver uma vírgula, o Spark Connector a tratará como duas collections diferentes. Para evitar isso, você deve escapar da vírgula precedendo-a com uma barra invertida (\). Escape de uma coleção chamada "minha coleção" da seguinte maneira:

    "my\,collection"
  • Se o nome de uma collection usada em sua opção de configuração collection for "*", o Spark Connector o interpretará como uma especificação para verificar todas as collections. Para evitar isso, você deve escapar do asterisco precedendo-o com uma barra invertida (\). Escape de uma collection chamada "*" da seguinte maneira:

    "\*"
  • Se o nome de uma coleção usada em sua opção de configuração do collection contiver uma barra invertida (\), o Spark Connector tratará a barra invertida como um caractere de escape, o que pode alterar a forma como ele interpreta o valor. Para evitar isso, você deve escapar da barra invertida precedendo-a com outra barra invertida. Escape de uma coleção chamada "\collection" da seguinte maneira:

    "\\collection"

    Observação

    Ao especificar o nome da coleção como uma string literal em Java, você deve escapar ainda mais de cada barra invertida com outra. Por exemplo, escape de uma collection chamada "\collection" da seguinte maneira:

    "\\\\collection"

Você pode transmitir de todas as coleções no banco de dados passando um asterisco() como uma string para o nome da coleção.

Especifique todas as collections como mostrado no exemplo a seguir:

...
.option("spark.mongodb.collection", "*")

Se você criar uma collection durante a transmissão a partir de todas as collections, a nova collection será automaticamente incluída no stream.

Você pode descartar coleções a qualquer momento durante a transmissão de várias coleções.

Importante

Inferindo o esquema com múltiplas coleções

Se você definir a opção change.stream.publish.full.document.only como true, o Spark Connector inferirá o esquema de um DataFrame usando o esquema dos documentos digitalizados.

A inferência de esquema acontece no início da transmissão e não leva em conta as coleções criadas durante a transmissão.

Ao transmitir de várias coleções e inferir o esquema, o connector amostra cada coleção sequencialmente. O streaming de um grande número de collections pode fazer com que a inferência de esquema tenha desempenho notoriamente mais lento. Esse impacto no desempenho ocorre apenas ao inferir o esquema.