Overview
MongoDB からデータを読み取るには、 SparkSessionオブジェクトで read()メソッドを呼び出します。 このメソッドはDataFrameReaderオブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
|---|---|
| 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 |
|
バッチ読み取り構成オプションのリストについては、 バッチ読み取り構成オプションのガイドを参照してください。 |
次のコード例は、前の構成設定を使用して MongoDB のpeople.contactsからデータを読み取る方法を示しています。
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Tip
DataFrame 型
DataFrame は、Java API のクラスとして存在しません。 DataFrame を参照するには、 Dataset<Row>を使用します。
MongoDB からデータを読み取るには、 SparkSessionオブジェクトでread関数を呼び出します。 この関数はDataFrameReaderオブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
|---|---|
| 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 |
|
バッチ読み取り構成オプションのリストについては、バッチ読み取り構成オプションのガイドを参照してください。 |
次のコード例は、前の構成設定を使用して MongoDB のpeople.contactsからデータを読み取る方法を示しています。
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
MongoDB からデータを読み取るには、 SparkSessionオブジェクトでreadメソッドを呼び出します。 このメソッドはDataFrameReaderオブジェクトを返します。このオブジェクトを使用して、バッチ読み取り操作の形式やその他の構成設定を指定できます。
MongoDB から読み取るには、次の構成設定を指定する必要があります。
設定 | 説明 |
|---|---|
| 基礎となる入力データソースの形式を指定します。 MongoDB から読み取るには、 |
|
バッチ読み取り構成オプションのリストについては、バッチ読み取り構成オプションのガイドを参照してください。 |
次のコード例は、前の構成設定を使用して MongoDB のpeople.contactsからデータを読み取る方法を示しています。
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Tip
DataFrame 型
Data Frame は、 RowオブジェクトのDatasetによって表されます。 DataFrame型はDataset[Row]のエイリアスです。
スキーマ推論
スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。
MongoDB コレクションpeople.contactsに次のドキュメントが含まれているとします。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
次の操作は、 people.contactsからデータを読み込み、DataFrame のスキーマを推論します。
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
推論されたスキーマを確認するには、次の例に示すように、 Dataset<Row>オブジェクトでprintSchema()メソッドを使用します。
dataFrame.printSchema();
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
DataFrame のデータを表示するには、次の例に示すように、 DataFrameオブジェクトでshow()メソッドを使用します。
dataFrame.show();
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。
MongoDB コレクションpeople.contactsに次のドキュメントが含まれているとします。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
次の操作は、 people.contactsからデータを読み込み、DataFrame のスキーマを推論します。
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
推論されたスキーマを確認するには、次の例に示すように、 DataFrameオブジェクトでprintSchema()関数を使用します。
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
DataFrame のデータを表示するには、次の例に示すように、 DataFrameオブジェクトでshow()関数を使用します。
dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
スキーマなしで Dataset または DataFrame をロードすると、Spark はレコードをサンプリングしてコレクションのスキーマを推測します。
MongoDB コレクションpeople.contactsに次のドキュメントが含まれているとします。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
次の操作は、 people.contactsからデータを読み込み、DataFrame のスキーマを推論します。
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
推論されたスキーマを確認するには、次の例に示すように、 DataFrameオブジェクトでprintSchema()メソッドを使用します。
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
DataFrame のデータを表示するには、次の例に示すように、 DataFrameオブジェクトでshow()メソッドを使用します。
dataFrame.show()
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
スキーマ ヒントで既知のフィールドを指定する
schemaHints構成オプションを指定することで、スキーマ推論中に使用する既知のフィールド値を含むスキーマを指定できます。 次の Spark 形式のいずれかでschemaHintsオプションを指定できます。
タイプ | 形式 | |||
|---|---|---|---|---|
DDL |
| |||
SQL DDL |
| |||
JSON | |
次の例は、Spark shell を使用して、各形式でschemaHintsオプションを指定する方法を示しています。 この例では、 "value"という名前の string 値のフィールドと、 "count"という名前の整数値のフィールドを指定しています。
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
次の例に示すように、PySpark を使用して Simple string DDL 形式またはJSON形式で schemaHints オプションを指定することもできます。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
フィルター
DataFrames または Datasets でフィルターを使用する場合、基礎の MongoDB Connector コードは集計パイプラインを構築し、MongoDB のデータを Spark に送信する前にフィルタリングします。 これにより、必要なデータのみを取得して処理することで、Spark のパフォーマンスが向上します。
MongoDB Spark Connector は、次のフィルターを集計パイプライン ステージに変換します。
および
EqualNullセーフ
EqualTo
greaterThan
greaterThanOrEqual
で
IsNull
未満
LegsThanOrEqual
ではない
または
StringContains
StringEndsWith
StringStartsWith
Java集計式 を使用してデータをフィルタリングできます。
次のドキュメントを含むfruitという名前のコレクションについて考えてみます。
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
まず、デフォルトのMongoDBデータソースに接続するための DataFrame を作成します。
Dataset<Row> df = spark.read() .format("mongodb") .option("database", "food") .option("collection", "fruit") .load();
次の例では、qtyフィールドの値が 10 以上であるレコードのみを検索します。
df.filter(df.col("qty").gte(10))
この操作では、以下の結果が出力されます。
{ "_id" : 2, "qty" : 10.0, "type" : "orange" } { "_id" : 3, "qty" : 15.0, "type" : "banana" }
DataFrames または Datasets でフィルターを使用する場合、基礎の MongoDB Connector コードは集計パイプラインを構築し、MongoDB のデータを Spark に送信する前にフィルタリングします。 これにより、必要なデータのみを取得して処理することで、Spark のパフォーマンスが向上します。
MongoDB Spark Connector は、次のフィルターを集計パイプライン ステージに変換します。
および
EqualNullセーフ
EqualTo
greaterThan
greaterThanOrEqual
で
IsNull
未満
LegsThanOrEqual
ではない
または
StringContains
StringEndsWith
StringStartsWith
MongoDB コレクションからデータのサブセットを読み取るには、 filter()を使用します。
次のドキュメントを含むfruitという名前のコレクションについて考えてみます。
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
まず、デフォルトの MongoDB データソースに接続するためのDataFrameオブジェクトを設定します。
df = spark.read.format("mongodb").load()
次の例には、 qtyフィールドが10以上であるレコードのみが含まれます。
df.filter(df['qty'] >= 10).show()
この操作では、次の出力が印刷されます。
{ "_id" : 2, "qty" : 10.0, "type" : "orange" } { "_id" : 3, "qty" : 15.0, "type" : "banana" }
DataFrames または Datasets でフィルターを使用する場合、基礎の MongoDB Connector コードは集計パイプラインを構築し、MongoDB のデータを Spark に送信する前にフィルタリングします。 これにより、必要なデータのみを取得して処理することで、Spark のパフォーマンスが向上します。
MongoDB Spark Connector は、次のフィルターを集計パイプライン ステージに変換します。
および
EqualNullセーフ
EqualTo
greaterThan
greaterThanOrEqual
で
IsNull
未満
LegsThanOrEqual
ではない
または
StringContains
StringEndsWith
StringStartsWith
次の例では、期限が100の下の文字をフィルタリングして出力します。
df.filter(df("age") < 100).show()
この操作では、以下の結果が出力されます。
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
SQL クエリ
データセットで SQL クエリを実行する前に、データセットの一時ビューを登録する必要があります。
次の操作では、 charactersテーブルを登録し、クエリを実行して 100 以上の文字をすべて検索します。
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show() は、以下を出力します。
{ "name" : "Gandalf", "age" : 1000 } { "name" : "Thorin", "age" : 195 } { "name" : "Balin", "age" : 178 } { "name" : "Dwalin", "age" : 169 } { "name" : "Óin", "age" : 167 } { "name" : "Glóin", "age" : 158 }
DataFrame に対して SQL クエリを実行する前に、一時テーブルを登録する必要があります。
次の例では、 tempという一時テーブルを登録し、SQL を使用してtypeフィールドに文字eが含まれるレコードをクエリします。
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
pyspark shell では、この操作によって次の出力が印刷されます。
{ "type" : "apple", "qty" : 5.0 } { "type" : "orange", "qty" : 10.0 }
データセットで SQL クエリを実行する前に、データセットの一時ビューを登録する必要があります。
次の操作では、 charactersテーブルを登録し、クエリを実行して 100 以上の文字をすべて検索します。
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
centenarians.show() は、以下を出力します。
{ "name" : "Gandalf", "age" : 1000 } { "name" : "Thorin", "age" : 195 } { "name" : "Balin", "age" : 178 } { "name" : "Dwalin", "age" : 169 } { "name" : "Óin", "age" : 167 } { "name" : "Glóin", "age" : 158 }
API ドキュメント
これらの例で使用されている型の詳細については、次の Apache Spark API ドキュメントを参照してください。