Overview
このガイドでは、 Java Reactive Streams ドライバーで集計パイプラインライン ステージを構築する静的ファクトリー メソッドを提供する Aggregatesクラスの使用方法を学習できます。
集計について詳しくは、 集計ガイドをご覧ください。
このページの例では、次のクラスのメソッドのインポートを前提としています。
AggregatesFiltersProjectionsSortsAccumulators
次のコードは、前述のクラスのメソッドをインポートする方法を示しています。
import static java.util.Arrays.asList; import static com.mongodb.client.model.Accumulators.*; import static com.mongodb.client.model.Aggregates.*; import static com.mongodb.client.model.Filters.*; import static com.mongodb.client.model.Projections.*; import static com.mongodb.client.model.Sorts.*; import static com.mongodb.client.model.search.SearchPath.fieldPath;
これらのメソッドを使用してパイプライン ステージを構築し、それらをリストとして集計で指定します。
Bson matchStage = match(eq("some_field", "some_criteria")); Bson sortByCountStage = sortByCount("some_field"); Publisher<Document> results = collection.aggregate(asList(matchStage, sortByCountStage)); Flux.from(results).collectList().block();
一致
match()メソッドを使用して、指定されたクエリフィルターに対して受信ドキュメントを照合する $matchパイプラインステージを作成し、一致しないドキュメントをフィルタリングで除外します。フィルターは、Filtersクラスまたはその他の Bsonインスタンスを使用して作成できます。
次の例では、 titleフィールドが「シャードのシャーディング」に等しいすべてのドキュメントに一致するパイプライン ステージを作成します。
match(eq("title", "The Shawshank Redemption"));
プロジェクト
project()メソッドを使用して、指定されたドキュメントフィールドをプロジェクト$プロジェクトパイプラインステージを作成します。集計での フィールドプロジェクションは、クエリのフィールドプロジェクションと同じルールに従います。プロジェクションクラスまたはその他の Bsonインスタンスを使用してプロジェクションを構築できます。
次の例では、 _idフィールドを除外するものの、 titleフィールドとplotフィールドを含むパイプライン ステージを作成します。
project(fields(include("title", "plot"), excludeId()));
計算フィールドのプロジェクション
$project ステージは、既存のフィールドまたは計算フィールドを出力できます。
次の例では、 ratedフィールドをratingという新しいフィールドにプロジェクションし、フィールドの名前を実質的に変更するパイプライン ステージを作成します。
project(fields(computed("rating", "$rated"), excludeId()));
ドキュメント
documents()メソッドを使用して、入力値からリテラル ドキュメントを返す$documentsパイプライン ステージを作成します。
重要
集計パイプラインで$documentsステージを使用する場合、パイプラインの最初のステージである必要があります。
次の例では、 titleフィールドを持つサンプル ドキュメントを作成するパイプライン ステージを作成します。
documents(asList( new Document("title", "The Shawshank Redemption"), new Document("title", "Back to the Future"), new Document("title", "Jurassic Park")));
重要
documents()メソッドを使用して集計パイプラインに入力を提供する場合は、コレクションではなくデータベースでaggregate()メソッドを呼び出す必要があります。
サンプル
sample()メソッドを使用して、入力からドキュメントをランダムに選択するための$sampleパイプライン ステージを作成します。
次の例では、5 つのドキュメントをランダムに選択するパイプライン ステージを作成します。
sample(5);
Sort
sort()メソッドを使用して、指定された条件で並べ替えるための $sortパイプラインステージを作成します。ソートクラスまたはその他のBson インスタンスを使用してソート条件を構築できます。
次の例では、yearフィールドの値に基づいて降順でソートするパイプラインステージを作成しています。
sort(orderBy(descending("year"), ascending("title")));
スキップ
skip()メソッドを使用して$skipパイプライン ステージを作成し、ドキュメントを次のステージに渡す前に指定された数のドキュメントをスキップします。
次の例では、最初の5ドキュメントをスキップするパイプライン ステージを作成しています。
skip(5);
Limit
次のステージに渡されるドキュメントの数を制限するには、 $limitパイプライン ステージを使用します。
次の例では、ドキュメント数を10に制限するパイプライン ステージを作成しています。
limit(10);
ルックアップ
lookup()メソッドを使用して$lookupパイプライン ステージを作成し、2 つのコレクション間で結合と非相関サブクエリを実行します。
左外部結合
次の例では、 コレクションと コレクション間で左外部結合を実行するパイプライン ステージを作成します。moviescomments
moviesの_idフィールドをcommentsのmovie_idフィールドに結合しますjoined_commentsフィールドに結果を出力します。
lookup("comments", "_id", "movie_id", "joined_comments");
完全結合と非相関サブクエリ
次の例では、 moviesコレクションに対して自己結合を実行して、各映画とジャンルを共有し、より高い imdb.rating を持つ映画を検索するパイプラインステージを作成します。
List<Variable<Object>> variables = asList( new Variable<>("genre", new Document("$arrayElemAt", asList("$genres", 0))), new Variable<>("rating", "$imdb.rating") ); List<Bson> pipeline = asList( match(expr(new Document("$and", asList( new Document("$in", asList("$$genre", "$genres")), new Document("$gt", asList("$imdb.rating", "$$rating")) )))), project(fields(include("title", "imdb.rating"), excludeId()))); Bson similarHigherRatedLookup = lookup("movies", variables, pipeline, "similar_higher_rated");
グループ
group()メソッドを使用して$groupパイプライン ステージを作成し、指定された式でドキュメントをグループ化し、個別のグループごとにドキュメントを出力します。
Tip
ドライバーには、サポートされているアキュムレータごとに静的ファクトリー メソッドを持つアキュムレータクラスが含まれています。
次の例では、 customerIdフィールドの値でドキュメントをグループ化するパイプライン ステージを作成します。 各グループは、 quantityフィールドの値の合計と平均をtotalQuantityフィールドとaverageQuantityフィールドに累積します。
group("$customerId", sum("totalQuantity", "$quantity"), avg("averageQuantity", "$quantity"));
アキュムレータ演算子の詳細については、サーバー マニュアルの「 アキュムレータ 」ページを参照してください。
Ticket-N アキュムレータ
選択可能アキュムレータ は、特定の順序指定された上位要素と最下位要素を返す集計アキュムレーション演算子です。 次のいずれかのビルダを使用して、集計アキュムレーション演算子を作成します。
Tip
MongoDBデプロイ(Atlas クラスターを含む)でMongoDB Server 5.2 以降が実行中いる場合にのみ、これらの listen するアキュムレータを使用して集計操作を実行できます。
でアキュムレータ集計パイプラインステージについては、サーバー マニュアルの「 アキュムレータ 」のページから参照してください。
MinN
minN()ビルダーは$minNアキュムレータを作成します。これはグループのn最小値を含むドキュメントのデータを返します。
Tip
$minNと$bottomNのアキュムレータも同様のタスクを実行できます。 それぞれの推奨使用量については、「 $minN と $bottomN アキュムレータの比較」を参照してください。
次の例では、 minN()メソッドを使用して、 yearでグループ化された映画の最小の 3 つのimdb.rating値を返す方法を示しています。
group( "$year", minN( "lowest_three_ratings", new BsonString("$imdb.rating"), 3 ));
詳細については、 minN() APIドキュメントを参照してください。
MaxN
maxN()アキュムレータは、グループの最大値nを含むドキュメントのデータを返します。
次の例では、 maxN()メソッドを使用して、 yearでグループ化された映画の上位 2 つのimdb.rating値を返す方法を示します。
group( "$year", maxN( "highest_two_ratings", new BsonString("$imdb.rating"), 2 ));
詳細については、 maxN() APIドキュメントを参照してください。
FirstN
firstN()アキュムレータは、指定されたソート順序の各グループの最初のnドキュメントのデータを返します。
Tip
$firstNと$topNのアキュムレータも同様のタスクを実行できます。 それぞれの推奨使用量については、「 $firstN と $topN アキュムレータの比較」を参照してください。
次の例では、 firstN()メソッドを使用して、 ステージになった順序に基づいて最初の 4 つの映画のtitle値をyearでグループ化して返す方法を示しています。
group( "$year", firstN( "first_four_movies", new BsonString("$title"), 4 ));
詳細については、 firstN() APIドキュメントを参照してください。
LastN
lastN()アキュムレータは、指定されたソート順序の各グループ内の最後のnドキュメントのデータを返します。
次の例では、lastN() メソッドを使用して、ステージに入る順序に基づいて、最後の 3 つの映画のtitle 値を表示する方法を示します。year
group( "$year", lastN( "last_three_movies", new BsonString("$title"), 3 ));
詳細については、 lastN() APIドキュメント を参照してください。
top
top()アキュムレータは、指定ソート順に基づいてグループ内の最初のドキュメントのデータを返します。
top()次の例では、title imdb.ratingimdb.ratingメソッドを使用して、 でグループ化された に基づいて最高評価の映画の とyear の値を返す方法を示します。
group( "$year", top( "top_rated_movie", descending("imdb.rating"), asList(new BsonString("$title"), new BsonString("$imdb.rating")) ));
詳細については、top() APIドキュメントを参照してください。
TopN
topN()アキュムレータは、指定されたフィールドの最大のn値を含むドキュメントのデータを返します。
Tip
$firstNと$topNのアキュムレータも同様のタスクを実行できます。 それぞれの推奨使用量については、「 $firstN と $topN アキュムレータの比較」を参照してください。
次の例では、 topN()メソッドを使用して、 yearでグループ化されたruntime値に基づいて、最も長い 3 つの映画のtitleとruntimeの値を返す方法を示します。
group( "$year", topN( "longest_three_movies", descending("runtime"), asList(new BsonString("$title"), new BsonString("$runtime")), 3 ));
詳細については、topN() APIドキュメントを参照してください。
下部
bottom()アキュムレータは、指定されたソート順序に基づいてグループ内の最後のドキュメントのデータを返します。
次の例では、 bottom()メソッドを使用して、 yearでグループ化されたruntime値に基づいて最も短い映画のtitleとruntimeの値を返す方法を示します。
group( "$year", bottom( "shortest_movies", descending("runtime"), asList(new BsonString("$title"), new BsonString("$runtime")) ));
詳細については、bottom() APIドキュメントを参照してください。
bottomN
bottomN()アキュムレータは、指定されたフィールドの最小のn値を含むドキュメントのデータを返します。
Tip
$minNと$bottomNのアキュムレータも同様のタスクを実行できます。 それぞれの推奨使用量については、「 $minN と $bottomN アキュムレータの比較」を参照してください。
次の例では、 bottomN()メソッドを使用して、 yearでグループ化されたimdb.rating値に基づき、評価が最も低い 2 つの映画のtitleとimdb.ratingの値を返す方法を示しています。
group( "$year", bottomN( "lowest_rated_two_movies", descending("imdb.rating"), asList(new BsonString("$title"), new BsonString("$imdb.rating")), 2 ));
詳細については、bottomN() APIドキュメントを参照してください。
Unwind
unwind()メソッドを使用して$unwindパイプライン ステージを作成し、入力ドキュメントから配列フィールドを分解し、配列要素ごとに出力ドキュメントを作成します。
次の例では、 sizes配列内の各要素のドキュメントを作成します。
unwind("$sizes");
次の例では、配列フィールドの欠落値または null 値を持つドキュメント、または配列が空のドキュメントを保持します。
unwind("$sizes", new UnwindOptions().preserveNullAndEmptyArrays(true));
次の例には、"position"というフィールドに配列インデックスが含まれています。
unwind("$sizes", new UnwindOptions().includeArrayIndex("position"));
アウト
out()メソッドを使用して、すべてのドキュメントを同じデータベース内の指定されたコレクションに書込む$outパイプライン ステージを作成します。
重要
$outステージは、すべての集計パイプラインの 最後のステージ である必要があります。
次の例では、パイプラインの結果をauthorsコレクションに書き込みます。
out("authors");
merge
merge()メソッドを使用して、すべてのドキュメントを指定されたコレクションにマージする$mergeパイプライン ステージを作成します。
重要
$mergeステージは、すべての集計パイプラインの 最後のステージ である必要があります。
次の例では、デフォルトの オプションを使用してパイプラインをauthorsコレクションにマージします。
merge("authors");
次の例では、 dateとcustomerIdの両方が一致する場合はドキュメントを置き換え、それ以外の場合はドキュメントを挿入するオプションを使用して、パイプラインをreportingデータベースのcustomersコレクションにマージします。
merge(new MongoNamespace("reporting", "customers"), new MergeOptions().uniqueIdentifier(asList("date", "customerId")) .whenMatched(MergeOptions.WhenMatched.REPLACE) .whenNotMatched(MergeOptions.WhenNotMatched.INSERT));
GraphLookup
graphLookup()メソッドを使用して、指定されたコレクションに対して再帰検索を実行し、1 つのドキュメント内の指定されたフィールドを別のドキュメントの指定されたフィールドと照合する$graphLookupパイプライン ステージを作成します。
次の例では、 contactsコレクション内のユーザーのソーシャル ネットワーク グラフを計算し、 friendsフィールドの値をnameフィールドに再帰的に照合します。
graphLookup("contacts", "$friends", "friends", "name", "socialNetwork");
GraphLookupOptionsを使用すると、必要に応じて再帰する深度と、深度フィールドの名前を指定できます。 この例では、 $graphLookupは最大 2 回再帰し、すべてのドキュメントの再帰深度情報を持つdegreesというフィールドを作成します。
graphLookup("contacts", "$friends", "friends", "name", "socialNetwork", new GraphLookupOptions().maxDepth(2).depthField("degrees"));
GraphLookupOptionsを使用すると、MongoDB が検索にドキュメントを含めるために一致する必要があるフィルターを指定できます。 この例では、 hobbiesフィールドに「golf」が含まれるリンクのみが含まれます。
graphLookup("contacts", "$friends", "friends", "name", "socialNetwork", new GraphLookupOptions().maxDepth(1).restrictSearchWithMatch(eq("hobbies", "golf")));
カウントによる並べ替え
sortByCount()メソッドを使用して、特定の式でドキュメントをグループ化し、これらのグループをカウントで降順にソートする$sortByCountパイプライン ステージを作成します。
Tip
$sortByCountステージは、 $sumアキュムレータとそれに続く$sortステージを持つ$groupステージと同一です。
[ { "$group": { "_id": <expression to group on>, "count": { "$sum": 1 } } }, { "$sort": { "count": -1 } } ]
次の例では、ドキュメントをフィールドxの切り捨てられた値でグループ化し、個別の値ごとにカウントを計算します。
sortByCount(new Document("$floor", "$x"));
ReplaceRoot
replaceRoot()メソッドを使用して、各入力ドキュメントを指定されたドキュメントで置き換える$replaceRootパイプライン ステージを作成します。
次の例では、各入力ドキュメントをspanish_translationフィールドにあるネストされたドキュメントに置き換えます。
replaceRoot("$spanish_translation");
AddFields
addFields()メソッドを使用して、ドキュメントに新しいフィールドを追加する$addFieldsパイプライン ステージを作成します。
Tip
フィールドの包含または除外をプロジェクトしない場合は、 $addFieldsを使用します。
次の例では、入力ドキュメントにaとbの 2 つの新しいフィールドを追加します。
addFields(new Field("a", 1), new Field("b", 2));
数
count()メソッドを使用して、ステージに入るドキュメントの数をカウントし、その値を指定されたフィールド名に割り当てる $countパイプラインステージを作成します。フィールド名を指定しない場合、count() はデフォルトでフィールド名を「count」に設定します。
Tip
$count ステージは次のmongoshステージと同等です。
{ "$group":{ "_id": 0, "count": { "$sum" : 1 } } }
次の例では、「total」というフィールドに受信したドキュメントの数を出力するパイプライン ステージを作成します。
count("total");
バケット
bucket()メソッドを使用して、事前定義された境界値の周囲のデータのバケットを自動化する$bucketパイプライン ステージを作成します。
次の例では、受信したドキュメントをscreenSizeフィールドの値に基づいてグループ化するパイプライン ステージを作成します。このステージは下限を含み、上限を含まないものです。
bucket("$screenSize", asList(0, 24, 32, 50, 70, 200));
指定された境界外の値のデフォルト バケットを指定し、追加のアキュムレータを指定するには、 BucketOptionsクラスを使用します。
次の例では、 screenSizeフィールドの値に基づいて受信ドキュメントをグループ化し、各バケットに含まれるドキュメントの数をカウントし、 screenSizeの値をmatchesというフィールドにプッシュして、次を取得するパイプライン ステージを作成します: 「70」を超える任意の画面サイズを、平均して大きな画面サイズ用の「mongostat」と呼ばれるバケットに格納します。
Tip
ドライバーには、サポートされているアキュムレータごとに静的ファクトリー メソッドを持つアキュムレータクラスが含まれています。
bucket("$screenSize", asList(0, 24, 32, 50, 70), new BucketOptions().defaultBucket("monster").output(sum("count", 1), push("matches", "$screenSize")));
BucketAuto
bucketAuto()メソッドを使用して$bucketAutoパイプライン ステージを作成します。このステージは、指定された数のバケットにドキュメントを均等に分散するために各バケットの境界を自動的に決定します。
次の例では、 priceフィールドの値を使用して、ドキュメントを作成し、10 バケットに均等に分散するパイプラインステージを作成します。
bucketAuto("$price", 10);
BucketAutoOptionsクラスを使用して、境界値を設定するための推奨数値ベースのスキームを指定し、追加のアキュムレータを指定します。
次の例では、priceフィールドの値を使用して 10 バケットにドキュメントを作成し、均等に分散するパイプラインステージを作成し、バケット境界を 2 の累乗(2、4、 8、16、...)。また、各バケット内のドキュメント数をカウントし、avgPrice という新しいフィールドでそれらの平均 price を計算します。
Tip
ドライバーには、サポートされているアキュムレータごとに静的ファクトリー メソッドを持つアキュムレータクラスが含まれています。
bucketAuto("$price", 10, new BucketAutoOptions().granularity(BucketGranularity.POWERSOF2) .output(sum("count", 1), avg("avgPrice", "$price")));
Facet
facet()メソッドを使用して$facetパイプライン ステージを作成し、並列パイプラインの定義を可能にします。
次の例では、2 つの並列集計を実行するパイプライン ステージを作成しています。
最初の集計では、受信したドキュメントを
attributes.screen_sizeフィールドに従って 5 つのグループに分散します。2 番目の集計では、すべてのメーカーをカウントし、上位5に限定したその数を返します。
facet(new Facet("Screen Sizes", bucketAuto("$attributes.screen_size", 5, new BucketAutoOptions().output(sum("count", 1)))), new Facet("Manufacturer", sortByCount("$attributes.manufacturer"), limit(5)));
SetWindowFields
setWindowFields()メソッドを使用して$setWindowFieldsパイプライン ステージを作成し、ウィンドウ演算子がコレクション内の指定された範囲のドキュメントに対して操作を実行できるようにします。
次の例では、 フィールドと フィールドに表示されるより詳細な測定値から、各地域の過去 1 か月の累積降量と平均温度を計算するパイプライン ステージを作成します。rainfalltemperature
Window pastMonth = Windows.timeRange(-1, MongoTimeUnit.MONTH, Windows.Bound.CURRENT); setWindowFields("$localityId", ascending("measurementDateTime"), WindowOutputFields.sum("monthlyRainfall", "$rainfall", pastMonth), WindowOutputFields.avg("monthlyAvgTemp", "$temperature", pastMonth));
密度
densify()メソッドを使用して、指定された間隔にわたるドキュメントのシーケンスを生成する$densifyパイプライン ステージを作成します。
Tip
$densify集計ステージにはMongoDB Server v5.1 以降が必要です。
Atlas サンプル 気象 データセットから取得され、1 時間間隔で配置された同様のpositionフィールドの測定値を含む次のドキュメントを検討してください。
Document{{ _id=5553a..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:00:00 EST 1984, ... }} Document{{ _id=5553b..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 09:00:00 EST 1984, ... }}
これらのドキュメントに対して次のアクションを実行するパイプライン ステージを作成する必要があるとします。
ts値がまだ存在しないドキュメントを 15 分ごとに追加します。ドキュメントを
positionフィールドでグループ化します。
これらのアクションを実行するdensify()集計ステージ ビルダへの呼び出しは次のようになります。
densify( "ts", DensifyRange.partitionRangeWithStep(15, MongoTimeUnit.MINUTE), DensifyOptions.densifyOptions().partitionByFields("position.coordinates"));
次の出力では、既存のドキュメント間で 15 分ごとのts値を含む、 集計ステージによって生成されたドキュメントが強調表示されています。
Document{{ _id=5553a..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:00:00 EST 1984, ... }} Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:15:00 EST 1984 }} Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:30:00 EST 1984 }} Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:45:00 EST 1984 }} Document{{ _id=5553b..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 09:00:00 EST 1984, ... }}
詳細については、densifyパッケージAPIドキュメントを参照してください。
Fill
fill()メソッドを使用して、 nullと欠落しているフィールド値を入力する$fillパイプライン ステージを作成します。
Tip
$fill集計ステージにはMongoDB Server v5.3 以降が必要です。
1 時間ごとの温度と温度の測定値を含む次のドキュメントを検討してください。
Document{{_id=6308a..., hour=1, temperature=23C, air_pressure=29.74}} Document{{_id=6308b..., hour=2, temperature=23.5C}} Document{{_id=6308c..., hour=3, temperature=null, air_pressure=29.76}}
次のように、ドキュメントに欠落している温度と温度のデータ ポイントを入力する必要があるとします。
線形補間 を使用して値を計算し、
air_pressureフィールドに時間 "2" を入力します。欠落している
temperature値を「23.6C」に設定します 時間の「3」。
これらのアクションを実行するfill()集計ステージ ビルダへの呼び出しは次のようになります。
fill( FillOptions.fillOptions().sortBy(ascending("hour")), FillOutputField.value("temperature", "23.6C"), FillOutputField.linear("air_pressure") );
次の出力では、 集計ステージによって入力されるフィールドを含むドキュメントが強調表示されています。
Document{{_id=6308a..., hour=1, temperature=23C, air_pressure=29.74}} Document{{_id=6308b..., hour=2, temperature=23.5C, air_pressure=29.75}} Document{{_id=6308c..., hour=3, temperature=23.6C, air_pressure=29.76}}
詳細については、fillパッケージAPIドキュメントを参照してください。
MongoDB Search
search()メソッドを使用して、1 つ以上のフィールドの全文検索を指定する$searchパイプライン ステージを作成します。
注意
Atlas および Community Edition のバージョン要件
$search 集計パイプライン演算子は、 MongoDB v4.2 以降を実行中MongoDB Atlasクラスター、またはMongoDB v8.2 以降を実行中MongoDB Community Editionクラスターでホストされているコレクションでのみ使用できます。コレクションはMongoDB 検索インデックスでカバーされている必要があります。必要な設定とこの演算子の機能の詳細については、MongoDB 検索するのドキュメントを参照してください。
次の例では、 titleフィールドで「feature」という単語を含むテキストを検索するパイプライン ステージを作成します。
Bson textSearch = search( SearchOperator.text( fieldPath("title"), "Future"));
MongoDB 検索する メタデータ
searchMeta() メソッドを使用して、MongoDB 検索クエリの結果のメタデータ部分のみを返す $searchMetaパイプラインステージを作成します。
注意
Atlas および Community Edition のバージョン要件
この集計パイプライン演算子は、MongoDB Atlasクラスターでv4.4.11 以降を実行中、またはMongoDB Community EditionクラスターでMongoDB v8.2 以降を実行中のみ使用できます。利用可能なバージョンの詳細なリストについては、$searchMeta に関するMongoDB Atlas のドキュメント を参照してください。
次の例では、 MongoDB Search集計ステージの countメタデータを示しています。
searchMeta( SearchOperator.near(2010, 1, fieldPath("year")));