Menu Docs
Página inicial do Docs
/
Conector do Spark
/

Ler do MongoDB no modo de lote

Para ler dados do MongoDB, ligue para o método read() em seu objeto SparkSession. Este método retorna um objeto DataFrameReader, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura em lote.

Você deve especificar as seguintes definições de configuração para ler do MongoDB:

Contexto
Descrição

dataFrame.read.format()

Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB.

dataFrame.read.option()

Use o método option para definir as configurações de leitura em lote, inclusive a string de conexão da implantação MongoDB , o banco de dados e a coleção do MongoDB e a configuração do particionador.

Para obter uma lista de opções de configuração de leitura em lote , consulte o guiaOpções de configuração de leitura em lote .

O seguinte exemplo de código mostra como utilizar as definições de configuração anteriores para ler dados do people.contacts no MongoDB:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Dica

Tipo de DataFrame

DataFrame não existe como uma classe na API Java . Utilize o Dataset<Row> para referenciar um DataFrame.

Para ler dados do MongoDB, chame a função read no objeto SparkSession. Esta função retorna um objeto DataFrameReader, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura em lote.

Você deve especificar as seguintes definições de configuração para ler do MongoDB:

Contexto
Descrição

dataFrame.read.format()

Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB.

dataFrame.read.option()

Use o método option para definir as configurações de leitura em lote, incluindo a cadeia de conexão da implantação do MongoDB, o banco de dados e coleção do MongoDB e a configuração do particionador.

Para obter uma lista de opções de configuração de leitura em lote, consulte o guia Opções de configuração de leitura em lote.

O seguinte exemplo de código mostra como utilizar as definições de configuração anteriores para ler dados do people.contacts no MongoDB:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ler dados do MongoDB, chame o método read em seu objeto SparkSession. Este método retorna um objeto DataFrameReader, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura em lote.

Você deve especificar as seguintes definições de configuração para ler do MongoDB:

Contexto
Descrição

dataFrame.read.format()

Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB.

dataFrame.read.option()

Use o método option para definir as configurações de leitura em lote, incluindo a cadeia de conexão da implantação do MongoDB, o banco de dados e coleção do MongoDB e a configuração do particionador.

Para obter uma lista de opções de configuração de leitura em lote, consulte o guia Opções de configuração de leitura em lote.

O seguinte exemplo de código mostra como utilizar as definições de configuração anteriores para ler dados do people.contacts no MongoDB:

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Dica

Tipo de DataFrame

Um DataFrame é representado por um Dataset de Row objetos. O tipo DataFrame é um alias para Dataset[Row].

Ao carregar um conjunto de dados ou DataFrame sem um esquema, o Spark faz uma amostra dos registros para inferir o esquema da coleção.

Suponha que a coleção people.contacts do MongoDB contenha os seguintes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

A seguinte operação carrega dados do people.contacts e infere o esquema do DataFrame:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Para ver o esquema inferido, use o método printSchema() no objeto Dataset<Row>, conforme o exemplo abaixo:

dataFrame.printSchema();
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver os dados no DataFrame, use o método show() em seu objeto DataFrame, conforme o exemplo abaixo:

dataFrame.show();
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

Ao carregar um conjunto de dados ou DataFrame sem um esquema, o Spark faz uma amostra dos registros para inferir o esquema da coleção.

Suponha que a coleção people.contacts do MongoDB contenha os seguintes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

A seguinte operação carrega dados do people.contacts e infere o esquema do DataFrame:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver o esquema inferido, use a função printSchema() no objeto DataFrame, conforme mostrado no exemplo a seguir:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver os dados no DataFrame, use a função show() no objeto DataFrame, conforme mostrado no exemplo a seguir:

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

Ao carregar um conjunto de dados ou DataFrame sem um esquema, o Spark faz uma amostra dos registros para inferir o esquema da coleção.

Suponha que a coleção people.contacts do MongoDB contenha os seguintes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

A seguinte operação carrega dados do people.contacts e infere o esquema do DataFrame:

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver o esquema inferido, use o método printSchema() no objeto DataFrame, conforme o exemplo abaixo:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver os dados no DataFrame, use o método show() em seu objeto DataFrame, conforme o exemplo abaixo:

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

Ao usar filtros com DataFrames ou conjuntos de dados, o código do Conector MongoDB subjacente constrói um pipeline de agregação para filtrar os dados no MongoDB antes de enviá-los ao Spark. Isso melhora o desempenho do Spark ao recuperar e processar somente os dados necessários.

O Conector Spark do MongoDB transforma os seguintes filtros em fases do pipeline de agregação:

  • And

  • EqualNullSafe

  • EqualTo

  • GreaterThan

  • GreaterThanOrEqual

  • In

  • IsNull

  • LessThan

  • LessThanOrEqual

  • Not

  • Or

  • StringContains

  • StringEndsWith

  • StringStartsWith

Use filter() para ler um subconjunto de dados de sua coleção do MongoDB.

Considere uma coleção denominada fruit que contenha os seguintes documentos:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Primeiro, configure um objeto DataFrame para se conectar com sua fonte de dados padrão do MongoDB:

df = spark.read.format("mongodb").load()

O exemplo abaixo inclui somente registros nos quais o campo qty é maior ou igual a 10.

df.filter(df['qty'] >= 10).show()

A operação imprime a seguinte saída:

+---+----+------+
|_id| qty| type|
+---+----+------+
|2.0|10.0|orange|
|3.0|15.0|banana|
+---+----+------+

Ao usar filtros com DataFrames ou conjuntos de dados, o código do Conector MongoDB subjacente constrói um pipeline de agregação para filtrar os dados no MongoDB antes de enviá-los ao Spark. Isso melhora o desempenho do Spark ao recuperar e processar somente os dados necessários.

O Conector Spark do MongoDB transforma os seguintes filtros em fases do pipeline de agregação:

  • And

  • EqualNullSafe

  • EqualTo

  • GreaterThan

  • GreaterThanOrEqual

  • In

  • IsNull

  • LessThan

  • LessThanOrEqual

  • Not

  • Or

  • StringContains

  • StringEndsWith

  • StringStartsWith

O exemplo a seguir filtra e gera os caracteres com idades inferiores a 100:

df.filter(df("age") < 100).show()

A operação produz o seguinte:

+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82| Fíli|
|[5755d7b4566878c9...| 77| Kíli|
+--------------------+---+-------------+

Antes de executar consultas SQL em seu conjunto de dados, é necessário registrar uma visualização temporária para o conjunto de dados.

A operação abaixo registra uma tabela characters e, em seguida, a consulta para localizar todos os caracteres com 100 ou mais:

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() gera a saída:

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

Antes de executar consultas SQL em seu DataFrame, você precisa registrar uma tabela temporária.

O exemplo a seguir registra uma tabela temporária chamada temp e, em seguida, usa o SQL para consultar registros nos quais o campo type contém a letra e:

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

No shell pyspark, a operação imprime a seguinte saída:

+------+----+
| type| qty|
+------+----+
| apple| 5.0|
|orange|10.0|
+------+----+

Antes de executar consultas SQL em seu conjunto de dados, é necessário registrar uma visualização temporária para o conjunto de dados.

A operação abaixo registra uma tabela characters e, em seguida, a consulta para localizar todos os caracteres com 100 ou mais:

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

Para saber mais sobre os tipos usados nestes exemplos, consulte a seguinte documentação do Apache Spark API:

Voltar

Modo de lote

Nesta página