Overview
このガイドでは、 MongoDB Kotlinドライバーで集計パイプラインライン ステージを構築する静的ファクトリー メソッドを提供する Aggregatesクラスの使用方法を説明します。
集計の詳細については、 集計ガイド をご覧ください。
このページの例では、次のクラスのメソッドのインポートを前提としています。
Aggregates
Filters
Projections
Sorts
Accumulators
import com.mongodb.client.model.Aggregates import com.mongodb.client.model.Filters import com.mongodb.client.model.Projections import com.mongodb.client.model.Sorts import com.mongodb.client.model.Accumulators
これらのメソッドを使用してパイプライン ステージを構築し、それらをリストとして集計で指定します。
val matchStage = Aggregates.match(Filters.eq("someField", "someCriteria")) val sortByCountStage = Aggregates.sortByCount("\$someField") val results = collection.aggregate( listOf(matchStage, sortByCountStage)).toList()
このガイドの多くのAggregation
例では、 Atlas sample_mflix.movies データセットを使用します。 このコレクション内のドキュメントは、 Kotlin ドライバーで使用する次のMovie
データ クラスによってモデル化されます。
data class Movie( val title: String, val year: Int, val genres: List<String>, val rated: String, val plot: String, val runtime: Int, val imdb: IMDB ){ data class IMDB( val rating: Double ) }
一致
match()
メソッドを使用して、指定されたクエリフィルターと受信ドキュメントを照合する$matchパイプライン ステージを作成し、一致しないドキュメントをフィルタリングで除外します。
Tip
フィルターは、 Bson
を実装する任意のクラスのインスタンスにすることができますが、フィルタークラスの使用と組み合わせると便利です。 クラス。
movies
のtitle
Aggregates.match(Filters.eq(Movie::title.name, "The Shawshank Redemption"))
プロジェクト
project()
メソッドを使用して、指定されたドキュメント フィールドをプロジェクションする$projectパイプライン ステージを作成します。 集計でのフィールド プロジェクションは、クエリの フィールド プロジェクションと同じルールに従います。
Tip
プロジェクションはBson
を実装する任意のクラスのインスタンスでもかまいませんが、 の使用と組み合わせると便利です。
次の例では、 title
フィールドとplot
フィールドを含み、 _id
フィールドを除外するパイプライン ステージを作成します。
Aggregates.project( Projections.fields( Projections.include(Movie::title.name, Movie::plot.name), Projections.excludeId()) )
計算フィールドのプロジェクション
$project
ステージは計算フィールドもプロジェクションできます。
次の例では、 rated
フィールドをrating
という新しいフィールドにプロジェクションし、フィールドの名前を実質的に変更するパイプライン ステージを作成します。
Aggregates.project( Projections.fields( Projections.computed("rating", "\$${Movie::rated.name}"), Projections.excludeId() ) )
ドキュメント
documents()
メソッドを使用して、入力値からリテラル ドキュメントを返す$documentsパイプライン ステージを作成します。
重要
集計パイプラインで$documents
ステージを使用する場合、パイプラインの最初のステージである必要があります。
次の例では、 title
フィールドを持つmovies
コレクションにサンプル ドキュメントを作成するパイプライン ステージを作成します。
Aggregates.documents( listOf( Document(Movie::title.name, "Steel Magnolias"), Document(Movie::title.name, "Back to the Future"), Document(Movie::title.name, "Jurassic Park") ) )
重要
documents()
メソッドを使用して集計パイプラインに入力を提供する場合は、コレクションではなくデータベースでaggregate()
メソッドを呼び出す必要があります。
val docsStage = database.aggregate<Document>( // ... )
サンプル
sample()
メソッドを使用して、入力からドキュメントをランダムに選択するための$sampleパイプライン ステージを作成します。
次の例では、 movies
コレクションからランダムに 5 つのドキュメントを選択するパイプライン ステージを作成します。
Aggregates.sample(5)
Sort
sort()
メソッドを使用して、指定された条件で並べ替えるための$sortパイプライン ステージを作成します。
Tip
並べ替え条件はBson
を実装するクラスのインスタンスでもかまいませんが、 並べ替え の使用と組み合わせると便利です。
次の例では、 year
フィールドの値に従って降順でソートし、次にtitle
フィールドの値の昇順でソートするパイプライン ステージを作成します。
Aggregates.sort( Sorts.orderBy( Sorts.descending(Movie::year.name), Sorts.ascending(Movie::title.name) ) )
スキップ
skip()
メソッドを使用して$skipパイプライン ステージを作成し、ドキュメントを次のステージに渡す前に指定された数のドキュメントをスキップします。
次の例では、 movies
コレクション内の最初の5
ドキュメントをスキップするパイプライン ステージを作成します。
Aggregates.skip(5)
Limit
次のステージに渡されるドキュメントの数を制限するには、 $limitパイプライン ステージを使用します。
次の例では、 movies
コレクションから返されるドキュメントの数を4
に制限するパイプライン ステージを作成しています。
Aggregates.limit(4)
ルックアップ
lookup()
メソッドを使用して$lookupパイプライン ステージを作成し、2 つのコレクション間で結合と非相関サブクエリを実行します。
左外部結合
次の例では、サンプル データベースに コレクションと コレクション間で左外部結合を実行するパイプライン ステージを作成します。movies
comments
mflix
movies
の_id
フィールドをcomments
のmovie_id
フィールドに結合しますjoined_comments
フィールドに結果を出力します
Aggregates.lookup( "comments", "_id", "movie_id", "joined_comments" )
完全結合と非相関サブクエリ
次の例では、架空のorders
とwarehouses
コレクションを使用しています。 データは、次の Kotlin データ クラスを使用してモデル化されます。
data class Order( val id: Int, val customerId: Int, val item: String, val ordered: Int ) data class Inventory( val id: Int, val stockItem: String, val inStock: Int )
この例では、2 つのコレクションをアイテム別に結合し、 inStock
フィールドで利用可能な数量がordered
の数量を満たすのに十分かどうかを確認するパイプライン ステージを作成します。
val variables = listOf( Variable("order_item", "\$item"), Variable("order_qty", "\$ordered") ) val pipeline = listOf( Aggregates.match( Filters.expr( Document("\$and", listOf( Document("\$eq", listOf("$\$order_item", "\$${Inventory::stockItem.name}")), Document("\$gte", listOf("\$${Inventory::inStock.name}", "$\$order_qty")) )) ) ), Aggregates.project( Projections.fields( Projections.exclude(Order::customerId.name, Inventory::stockItem.name), Projections.excludeId() ) ) ) val innerJoinLookup = Aggregates.lookup("warehouses", variables, pipeline, "stockData")
グループ
group()
メソッドを使用して$groupパイプライン ステージを作成し、指定された式でドキュメントをグループ化し、個別のグループごとにドキュメントを出力します。
Tip
ドライバーには、サポートされているアキュムレータごとに静的ファクトリー メソッドを持つアキュムレータクラスが含まれています。
次の例では、 orders
コレクション内のドキュメントをcustomerId
フィールドの値でグループ化するパイプライン ステージを作成します。 各グループは、 ordered
フィールドの値の合計と平均をtotalQuantity
フィールドとaverageQuantity
フィールドに累積します。
Aggregates.group("\$${Order::customerId.name}", Accumulators.sum("totalQuantity", "\$${Order::ordered.name}"), Accumulators.avg("averageQuantity", "\$${Order::ordered.name}") )
アキュムレータ演算子の詳細については、サーバー マニュアルの「アキュムレータ 」セクションを参照してください。
Ticket-N アキュムレータ
選択可能アキュムレータ は、特定の順序指定された上位要素と最下位要素を返す集計アキュムレーション演算子です。 次のいずれかのビルダを使用して、集計アキュムレーション演算子を作成します。
Tip
これらの Ticket-n アキュムレータを使用して集計操作を実行できるのは、MongoDB v5.2 以降を実行している場合のみです。
アキュムレータ演算子を使用できる集計パイプライン ステージについては、サーバー マニュアルの「アキュムレータ 」セクションを参照してください。
選択可能アキュムレータの例では、 sample-mflix
データベース内のmovies
コレクションのドキュメントを使用します。
MinN
minN()
ビルダーは$minNアキュムレータを作成します。これはグループのn
最小値を含むドキュメントのデータを返します。
Tip
$minN
と$bottomN
のアキュムレータも同様のタスクを実行できます。 それぞれの推奨使用量については、「 $minN と $bottomN アキュムレータの比較」を参照してください。
次の例では、 minN()
メソッドを使用して、 year
でグループ化された映画の最小の 3 つのimdb.rating
値を返す方法を示しています。
Aggregates.group( "\$${Movie::year.name}", Accumulators.minN( "lowestThreeRatings", "\$${Movie::imdb.name}.${Movie.IMDB::rating.name}", 3 ) )
詳細については、 minN() APIドキュメントを参照してください。
MaxN
maxN()
アキュムレータは、グループの最大値n
を含むドキュメントのデータを返します。
次の例では、 maxN()
メソッドを使用して、 year
でグループ化された映画の上位 2 つのimdb.rating
値を返す方法を示します。
Aggregates.group( "\$${Movie::year.name}", Accumulators.maxN( "highestTwoRatings", "\$${Movie::imdb.name}.${Movie.IMDB::rating.name}", 2 ) )
詳細については、 maxN() APIドキュメントを参照してください。
FirstN
firstN()
アキュムレータは、指定されたソート順序の各グループの最初のn
ドキュメントのデータを返します。
Tip
$firstN
と$topN
のアキュムレータも同様のタスクを実行できます。 それぞれの推奨使用量については、「 $firstN と $topN アキュムレータの比較」を参照してください。
次の例では、 firstN()
メソッドを使用して、 ステージになった順序に基づいて最初の 2 つの映画のtitle
値をyear
でグループ化して返す方法を示しています。
Aggregates.group( "\$${Movie::year.name}", Accumulators.firstN( "firstTwoMovies", "\$${Movie::title.name}", 2 ) )
詳細については、 firstN() APIドキュメントを参照してください。
LastN
lastN()
アキュムレータは、指定されたソート順序の各グループ内の最後のn
ドキュメントのデータを返します。
次の例では、 lastN()
メソッドを使用して、ステージに入る順序に基づいて、最後の 3 つの映画のtitle
値をyear
でグループ化して表示する方法を示します。
Aggregates.group( "\$${Movie::year.name}", Accumulators.lastN( "lastThreeMovies", "\$${Movie::title.name}", 3 ) )
詳細については、 lastN() APIドキュメント を参照してください。
top
top()
アキュムレータは、指定ソート順に基づいてグループ内の最初のドキュメントのデータを返します。
top()
次の例では、title
imdb.rating
imdb.rating
メソッドを使用して、 でグループ化された に基づいて最高評価の映画の とyear
の値を返す方法を示します。
Aggregates.group( "\$${Movie::year.name}", Accumulators.top( "topRatedMovie", Sorts.descending("${Movie::imdb.name}.${Movie.IMDB::rating.name}"), listOf("\$${Movie::title.name}", "\$${Movie::imdb.name}.${Movie.IMDB::rating.name}") ) )
詳細については、top() APIドキュメントを参照してください。
TopN
topN()
アキュムレータは、指定されたフィールドの最大のn
値を含むドキュメントのデータを返します。
Tip
$firstN
と$topN
のアキュムレータも同様のタスクを実行できます。 それぞれの推奨使用量については、「 $firstN と $topN アキュムレータの比較」を参照してください。
次の例では、 topN()
メソッドを使用して、 year
でグループ化されたruntime
値に基づいて、最も長い 3 つの映画のtitle
とruntime
の値を返す方法を示します。
Aggregates.group( "\$${Movie::year.name}", Accumulators.topN( "longestThreeMovies", Sorts.descending(Movie::runtime.name), listOf("\$${Movie::title.name}", "\$${Movie::runtime.name}"), 3 ) )
詳細については、topN() APIドキュメントを参照してください。
下部
bottom()
アキュムレータは、指定されたソート順序に基づいてグループ内の最後のドキュメントのデータを返します。
次の例では、 bottom()
メソッドを使用して、 year
でグループ化されたruntime
値に基づいて最も短い映画のtitle
とruntime
の値を返す方法を示します。
Aggregates.group( "\$${Movie::year.name}", Accumulators.bottom( "shortestMovies", Sorts.descending(Movie::runtime.name), listOf("\$${Movie::title.name}", "\$${Movie::runtime.name}") ) )
詳細については、bottom() APIドキュメントを参照してください。
bottomN
bottomN()
アキュムレータは、指定されたフィールドの最小のn
値を含むドキュメントのデータを返します。
Tip
$minN
と$bottomN
のアキュムレータも同様のタスクを実行できます。 それぞれの推奨使用量については、「 $minN と $bottomN アキュムレータの比較」を参照してください。
次の例では、 bottomN()
メソッドを使用して、 year
でグループ化されたimdb.rating
値に基づき、評価が最も低い 2 つの映画のtitle
とimdb.rating
の値を返す方法を示しています。
Aggregates.group( "\$${Movie::year.name}", Accumulators.bottom( "lowestRatedTwoMovies", Sorts.descending("${Movie::imdb.name}.${Movie.IMDB::rating.name}"), listOf("\$${Movie::title.name}", "\$${Movie::imdb.name}.${Movie.IMDB::rating.name}"), ) )
詳細については、bottomN() APIドキュメントを参照してください。
Unwind
unwind()
メソッドを使用して$unwindパイプライン ステージを作成し、入力ドキュメントから配列フィールドを分解し、配列要素ごとに出力ドキュメントを作成します。
次の例では、 lowestRatedTwoMovies
配列内の各要素のドキュメントを作成します。
Aggregates.unwind("\$${"lowestRatedTwoMovies"}")
配列フィールドの欠落値またはnull
値、または配列が空のドキュメントを保持するには:
Aggregates.unwind( "\$${"lowestRatedTwoMovies"}", UnwindOptions().preserveNullAndEmptyArrays(true) )
配列インデックスを含めるには(この例では"position"
というフィールド内)
Aggregates.unwind( "\$${"lowestRatedTwoMovies"}", UnwindOptions().includeArrayIndex("position") )
アウト
out()
メソッドを使用して、すべてのドキュメントを同じデータベース内の指定されたコレクションに書込む$outパイプライン ステージを作成します。
重要
$out
ステージは、すべての集計パイプラインの 最後のステージ である必要があります。
次の例では、パイプラインの結果をclassic_movies
コレクションに書き込みます。
Aggregates.out("classic_movies")
merge
merge()
メソッドを使用して、すべてのドキュメントを指定されたコレクションにマージする$mergeパイプライン ステージを作成します。
重要
$merge
ステージは、すべての集計パイプラインの 最後のステージ である必要があります。
次の例では、デフォルトの オプションを使用してパイプラインをnineties_movies
コレクションにマージします。
Aggregates.merge("nineties_movies")
次の例では、 year
とtitle
の両方が一致する場合はドキュメントを置き換えるために指定するデフォルト以外のオプションを使用してパイプラインをaggregation
データベースのmovie_ratings
コレクションにマージします。それ以外の場合はドキュメントを挿入します。
Aggregates.merge( MongoNamespace("aggregation", "movie_ratings"), MergeOptions().uniqueIdentifier(listOf("year", "title")) .whenMatched(MergeOptions.WhenMatched.REPLACE) .whenNotMatched(MergeOptions.WhenNotMatched.INSERT) )
GraphLookup
graphLookup()
メソッドを使用して、指定されたコレクションに対して再帰検索を実行し、1 つのドキュメント内の指定されたフィールドを別のドキュメントの指定されたフィールドと照合する$graphLookupパイプライン ステージを作成します。
次の例では、 contacts
コレクションを使用します。 データは、次の Kotlin データ クラスを使用してモデル化されます。
data class Users( val name: String, val friends: List<String>?, val hobbies: List<String>? )
この例では、 contact
コレクション内のユーザーのレポート作成グラフを計算し、 friends
フィールドの値をname
フィールドに再帰的に照合します。
Aggregates.graphLookup( "contacts", "\$${Users::friends.name}", Users::friends.name, Users::name.name, "socialNetwork" )
GraphLookupOptions
を使用すると、必要に応じて再帰する深度と、深度フィールドの名前を指定できます。 この例では、 $graphLookup
は最大 2 回再帰し、すべてのドキュメントの再帰深度情報を持つdegrees
というフィールドを作成します。
Aggregates.graphLookup( "contacts", "\$${Users::friends.name}", Users::friends.name, Users::name.name, "socialNetwork", GraphLookupOptions().maxDepth(2).depthField("degrees") )
GraphLookupOptions
を使用すると、MongoDB が検索にドキュメントを含めるために一致する必要があるフィルターを指定できます。 この例では、 hobbies
フィールドに「golf」が含まれるリンクのみが含まれます。
Aggregates.graphLookup( "contacts", "\$${Users::friends.name}", Users::friends.name, Users::name.name, "socialNetwork", GraphLookupOptions().maxDepth(1).restrictSearchWithMatch( Filters.eq(Users::hobbies.name, "golf") ) )
カウントによる並べ替え
sortByCount()
メソッドを使用して、特定の式でドキュメントをグループ化し、これらのグループをカウントで降順にソートする$sortByCountパイプライン ステージを作成します。
Tip
$sortByCount
ステージは、 $sum
アキュムレータとそれに続く$sort
ステージを持つ$group
ステージと同一です。
[ { "$group": { "_id": <expression to group on>, "count": { "$sum": 1 } } }, { "$sort": { "count": -1 } } ]
次の例では、 movies
コレクション内のドキュメントをgenres
フィールドでグループ化し、個別の 値ごとにカウントを計算します。
Aggregates.sortByCount("\$${Movie::genres.name}"),
ReplaceRoot
replaceRoot()
メソッドを使用して、各入力ドキュメントを指定されたドキュメントで置き換える$replaceRootパイプライン ステージを作成します。
次の例では、次の Kotlin データ クラスを使用してモデル化されたデータを含む架空のbooks
コレクションを使用します。
data class Libro(val titulo: String) data class Book(val title: String, val spanishTranslation: Libro)
各入力ドキュメントは、 spanishTranslation
フィールドのネストされたドキュメントに置き換えられます。
Aggregates.replaceRoot("\$${Book::spanishTranslation.name}")
AddFields
addFields()
メソッドを使用して、ドキュメントに新しいフィールドを追加する$addFieldsパイプライン ステージを作成します。
Tip
フィールドの包含または除外をプロジェクトしない場合は、 $addFields
を使用します。
次の例では、 movie
コレクションの入力ドキュメントに 2 つの新しいフィールド ( watched
とtype
)を追加します。
Aggregates.addFields( Field("watched", false), Field("type", "movie") )
数
count()
メソッドを使用して、ステージに入るドキュメントの数をカウントし、その値を指定されたフィールド名に割り当てる$countパイプライン ステージを作成します。 If you do not specify a field, count()
defaults the field name to "count".
Tip
$count
ステージは、次の構文アプリです。
{ "$group":{ "_id": 0, "count": { "$sum" : 1 } } }
次の例では、「total」というフィールドに受信したドキュメントの数を出力するパイプライン ステージを作成します。
Aggregates.count("total")
バケット
bucket()
メソッドを使用して、事前定義された境界値の周囲のデータのバケットを自動化する$bucketパイプライン ステージを作成します。
次の例では、次の Kotlin データ クラスでモデル化されたデータを使用します。
data class Screen( val id: String, val screenSize: Int, val manufacturer: String, val price: Double )
この例では、受信ドキュメントをscreenSize
フィールドの値に基づいてグループ化するパイプライン ステージを作成します。このステージは下限を含み、上限を含まないものです。
Aggregates.bucket("\$${Screen::screenSize.name}", listOf(0, 24, 32, 50, 70, 1000))
指定された境界外の値のデフォルト バケットを指定し、追加のアキュムレータを指定するには、 BucketOptions
クラスを使用します。
次の例では、 screenSize
フィールドの値に基づいて受信ドキュメントをグループ化し、各バケットに含まれるドキュメントの数をカウントし、 screenSize
の値をmatches
というフィールドにプッシュして、次を取得するパイプライン ステージを作成します: 「70」を超える任意の画面サイズを、平均して大きな画面サイズ用の「mongostat」と呼ばれるバケットに格納します。
Tip
ドライバーには、サポートされているアキュムレータごとに静的ファクトリー メソッドを持つアキュムレータクラスが含まれています。
Aggregates.bucket("\$${Screen::screenSize.name}", listOf(0, 24, 32, 50, 70), BucketOptions() .defaultBucket("monster") .output( Accumulators.sum("count", 1), Accumulators.push("matches", "\$${Screen::screenSize.name}") ) )
BucketAuto
bucketAuto()
メソッドを使用して$bucketAutoパイプライン ステージを作成します。このステージは、指定された数のバケットにドキュメントを均等に分散するために各バケットの境界を自動的に決定します。
次の例では、次の Kotlin データ クラスでモデル化されたデータを使用します。
data class Screen( val id: String, val screenSize: Int, val manufacturer: String, val price: Double )
この例では、 price
フィールドの値を使用して、ドキュメントを作成し、5 つのバケットに均等に分散するパイプライン ステージを作成します。
Aggregates.bucketAuto("\$${Screen::screenSize.name}", 5)
BucketAutoOptions
クラスを使用して、境界値を設定するための推奨数値ベースのスキームを指定し、追加のアキュムレータを指定します。
次の例では、 price
フィールドの値を使用して 5 つのバケットにドキュメントを作成し、均等に分散するパイプライン ステージを作成し、バケット境界を 2 の累乗(2、4、8、16、...)に設定します。 。 また、各バケット内のドキュメント数をカウントし、 avgPrice
という新しいフィールドでそれらの平均price
を計算します。
Tip
ドライバーには、サポートされているアキュムレータごとに静的ファクトリー メソッドを持つアキュムレータクラスが含まれています。
Aggregates.bucketAuto( "\$${Screen::price.name}", 5, BucketAutoOptions() .granularity(BucketGranularity.POWERSOF2) .output(Accumulators.sum("count", 1), Accumulators.avg("avgPrice", "\$${Screen::price.name}")) )
Facet
facet()
メソッドを使用して$facetパイプライン ステージを作成し、並列パイプラインの定義を可能にします。
次の例では、次の Kotlin データ クラスでモデル化されたデータを使用します。
data class Screen( val id: String, val screenSize: Int, val manufacturer: String, val price: Double )
この例では、2 つの並列集計を実行するパイプライン ステージを作成しています。
最初の集計では、受信したドキュメントを
screenSize
フィールドに従って 5 つのグループに分散します。2 番目の集計では、すべてのメーカーをカウントし、上位 5 に限定したその数を返します。
Aggregates.facet( Facet( "Screen Sizes", Aggregates.bucketAuto( "\$${Screen::screenSize.name}", 5, BucketAutoOptions().output(Accumulators.sum("count", 1)) ) ), Facet( "Manufacturer", Aggregates.sortByCount("\$${Screen::manufacturer.name}"), Aggregates.limit(5) ) )
SetWindowFields
setWindowFields()
メソッドを使用して$setWindowFieldsパイプライン ステージを作成し、ウィンドウ演算子がコレクション内の指定された範囲のドキュメントに対して操作を実行できるようにします。
次の例では、次の Kotlin データ クラスでモデル化されたデータを使用する架空のweather
コレクションを使用します。
data class Weather( val localityId: String, val measurementDateTime: LocalDateTime, val rainfall: Double, val temperature: Double )
この例では、 フィールドと フィールドに表示されるより詳細な測定値から、各地域の過去 1 か月間の累積降量と平均温度を計算するパイプライン ステージを作成します。rainfall
temperature
val pastMonth = Windows.timeRange(-1, MongoTimeUnit.MONTH, Windows.Bound.CURRENT) val resultsFlow = weatherCollection.aggregate<Document>( listOf( Aggregates.setWindowFields("\$${Weather::localityId.name}", Sorts.ascending(Weather::measurementDateTime.name), WindowOutputFields.sum( "monthlyRainfall", "\$${Weather::rainfall.name}", pastMonth ), WindowOutputFields.avg( "monthlyAvgTemp", "\$${Weather::temperature.name}", pastMonth ) ) )
密度
densify()
メソッドを使用して、指定された間隔にわたるドキュメントのシーケンスを生成する$densifyパイプライン ステージを作成します。
Tip
$densify()
集計ステージは MongoDB v5.1 以降を実行している場合にのみ使用できます。
Atlas サンプル 気象 データセットから取得され、1 時間間隔で配置された同様のposition
フィールドの測定値を含む次のドキュメントを検討してください。
Document{{ _id=5553a..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:00:00 EST 1984, ... }} Document{{ _id=5553b..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 09:00:00 EST 1984, ... }}
これらのドキュメントは、次の Kotlin データ クラスを使用してモデル化されます。
data class Weather( val id: ObjectId = ObjectId(), val position: Point, val ts: LocalDateTime )
これらのドキュメントに対して次のアクションを実行するパイプライン ステージを作成する必要があるとします。
ts
値がまだ存在しないドキュメントを 15 分ごとに追加します。ドキュメントを
position
フィールドでグループ化します。
これらのアクションを実行するdensify()
集計ステージ ビルダへの呼び出しは、次のようになります。
Aggregates.densify( "ts", DensifyRange.partitionRangeWithStep(15, MongoTimeUnit.MINUTE), DensifyOptions.densifyOptions().partitionByFields("Position.coordinates") )
次の出力では、既存のドキュメント間で 15 分ごとのts
値を含む、 集計ステージによって生成されたドキュメントが強調表示されています。
Document{{ _id=5553a..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:00:00 EST 1984, ... }} Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:15:00 EST 1984 }} Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:30:00 EST 1984 }} Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:45:00 EST 1984 }} Document{{ _id=5553b..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 09:00:00 EST 1984, ... }}
詳細については、densifyパッケージAPIドキュメントを参照してください。
Fill
fill()
メソッドを使用して、 null
と欠落しているフィールド値を入力する$fillパイプライン ステージを作成します。
Tip
$fill()
集計ステージは MongoDB v5.3 以降を実行している場合にのみ使用できます。
1 時間ごとの温度と温度の測定値を含む次のドキュメントを検討してください。
Document{{_id=6308a..., hour=1, temperature=23C, air_pressure=29.74}} Document{{_id=6308b..., hour=2, temperature=23.5C}} Document{{_id=6308c..., hour=3, temperature=null, air_pressure=29.76}}
これらのドキュメントは、次の Kotlin データ クラスを使用してモデル化されます。
data class Weather( val id: ObjectId = ObjectId(), val hour: Int, val temperature: String?, val air_pressure: Double? )
次のように、ドキュメントに欠落している温度と温度のデータ ポイントを入力する必要があるとします。
線形補間 を使用して値を計算し、
air_pressure
フィールドに時間 "2" を入力します。欠落している
temperature
値を「23.6C」に設定します 時間の「3」。
これらのアクションを実行するfill()
集計ステージ ビルダへの呼び出しは次のようになります。
val resultsFlow = weatherCollection.aggregate<Weather>( listOf( Aggregates.fill( FillOptions.fillOptions().sortBy(Sorts.ascending(Weather::hour.name)), FillOutputField.value(Weather::temperature.name, "23.6C"), FillOutputField.linear(Weather::air_pressure.name) ) ) ) resultsFlow.collect { println(it) }
Weather(id=6308a..., hour=1, temperature=23C, air_pressure=29.74) Weather(id=6308b..., hour=2, temperature=23.5C, air_pressure=29.75) Weather(id=6308b..., hour=3, temperature=23.6C, air_pressure=29.76)
詳細については、fillパッケージAPIドキュメントを参照してください。
Atlas 全文検索
search()
メソッドを使用して、1 つ以上のフィールドの全文検索を指定する$searchパイプライン ステージを作成します。
Tip
Atlas Search インデックスを持つコレクションでのみ使用可能
この集計パイプライン演算子は、Atlas Searchインデックスを持つコレクションでのみ使用できます。必要な設定とこの演算子の機能の詳細については、Atlas Search のドキュメントを参照してください。
次の例では、 movies
コレクションのtitle
フィールドで「feature」という単語を含むテキストを検索するパイプライン ステージを作成します。
Aggregates.search( SearchOperator.text( SearchPath.fieldPath(Movie::title.name), "Future" ), SearchOptions.searchOptions().index("title") )
Atlas Search メタデータ
searchMeta()
メソッドを使用して、Atlas 全文検索クエリの結果のメタデータ部分のみを返す$searchMetaパイプライン ステージを作成します。
次の例では、Atlas Search 集計ステージのcount
メタデータを示しています。
Aggregates.searchMeta( SearchOperator.near(1985, 2, SearchPath.fieldPath(Movie::year.name)), SearchOptions.searchOptions().index("year") )
このヘルパーの詳細については、searchMeta() APIドキュメント を参照してください。
Atlas Vector Search
重要
この機能をサポートしている MongoDB Atlas のどのバージョンについては、Atlas ドキュメントの「制限」を参照してください。
vectorSearch()
メソッドを使用して、 セマンティック検索 を指定する $vectorSearch パイプライン ステージを作成します。セマンティック検索とは、意味が同様の情報を検索するタイプの検索です。
コレクションで集計を実行するときにこの機能を使用するには、ベクトル検索インデックスを作成し、ベクトル埋め込みにインデックスを付ける必要があります。 MongoDB Atlas で検索インデックスを設定する方法については、Atlas ドキュメントの「 ベクトル検索用のベクトル埋め込みのインデックスを作成する方法 」を参照してください。
このセクションの例では、次の Kotlin データ クラスでモデル化されたデータを使用します。
data class MovieAlt( val title: String, val year: Int, val plot: String, val plotEmbedding: List<Double> )
この例では、 vectorSearch()
メソッドを使用して次の仕様でベクトル検索を実行する集計パイプラインを構築する方法を示します。
string値のベクトル埋め込みを使用して
plotEmbedding
フィールド値を検索しますmflix_movies_embedding_index
ベクトル検索インデックスを使用します最大 2 つの最近傍を考慮
1 つのドキュメントを返します
year
の値が少なくとも2016
であるドキュメントをフィルタリングする
Aggregates.vectorSearch( SearchPath.fieldPath(MovieAlt::plotEmbedding.name), listOf(-0.0072121937, -0.030757688, -0.012945653), "mflix_movies_embedding_index", 2.toLong(), 1.toLong(), vectorSearchOptions().filter(Filters.gte(MovieAlt::year.name, 2016)) )
このヘルパーの詳細については、vectorSearch() APIドキュメントを参照してください。