Docs Menu
Docs Home
/ /

$merge(集計ステージ)

注意

このページでは、集計パイプラインの結果をコレクションに出力する$mergeステージについて説明します。$mergeObjects 演算子(複数のドキュメントを 1 つのドキュメントにマージする)については、$mergeObjects を参照してください。

$merge

集計パイプラインの結果を、指定したコレクションに書き込みます。$merge 演算子はパイプラインの最後ののステージでなければなりません。

$merge ステージでは、次のことができます。

  • 同じデータベースまたは異なるデータベース内のコレクションに出力できます。

  • 集約されている同じコレクションに出力できます。詳細については、集計されているのと同じコレクションへの出力」を参照してください。

  • 集計パイプライン$merge または $out ステージを使用する場合は、次の点を考慮してください。

    • MongoDB 5.0 以降では、クラスター内のすべてのノードの featureCompatibilityVersion5.0 以上に設定されており、読み込み設定(read preference) でセカンダリ読み取りが許可されている場合、$merge ステージのパイプラインをレプリカセットのセカンダリノードで実行できます。

      • $merge $out ステージはセカンダリ ノードで実行されますが、書込み (write) 操作はプライマリ ノードに送信されます。

      • すべてのドライバー バージョンがセカンダリ ノードに送信される $merge 操作をサポートしているわけではありません。詳細についてはドライバーのドキュメントを参照してください。

    • MongoDB の以前のバージョンでは、$out または $merge ステージを持つパイプラインは常にプライマリ ノードで実行され、読み込み設定 (read preference) は考慮されませんでした。

  • 出力コレクションがまだ存在しない場合は、新しいコレクションを作成します。

  • 結果(新しいドキュメントの挿入、ドキュメントのマージ、ドキュメントの置換、既存のドキュメントの保持、操作の失敗、カスタムアップデートパイプラインによるドキュメントの処理)を既存のコレクションに組み込むことができます。

  • シャーディングされたコレクションに出力できます。入力コレクションもシャーディング可能です。

集計結果もコレクションに出力する$outステージとの比較については、「 $merge$outの比較 」を参照してください。

注意

オンデマンドのマテリアライズドビュー

$merge は、コレクションの完全な置換を行うのではなく、パイプラインの結果を既存の出力コレクションに組み込むことができます。この機能により、ユーザーはオンデマンドのマテリアライズドビューを作成できます。ここでは、パイプラインの実行時に出力コレクションの内容が段階的に更新されます。

このユースケースの詳細については、このページの例だけでなく、オンデマンド マテリアライズドビューも参照してください。

マテリアライズドビューは読み取り専用のビューとは別物です。読み取り専用ビューの作成については、「読み取り専用ビュー」を参照してください。

次の環境でホストされる配置には $merge を使用できます。

  • MongoDB Atlas はクラウドでの MongoDB 配置のためのフルマネージド サービスです

  • MongoDB Enterprise: サブスクリプションベースの自己管理型 MongoDB バージョン

  • MongoDB Community: ソースが利用可能で、無料で使用できる自己管理型の MongoDB のバージョン

$merge の構文は次のとおりです。

{ $merge: {
into: <collection> -or- { db: <db>, coll: <collection> },
on: <identifier field> -or- [ <identifier field1>, ...], // Optional
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

以下に例を挙げます。

{ $merge: { into: "myOutput", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }

同じデータベース内のコレクションへの書き込みを含め、$merge のすべてのデフォルト オプションを使用する場合は、簡略化された形式を使用できます。

{ $merge: <collection> } // Output collection is in the same database

$mergeステージは、次のフィールドを持つドキュメントを取得します。

フィールド
説明

出力コレクション。次のいずれかを指定します。

  • 集計が実行されるのと同じデータベース内のコレクションに出力する文字列としてのコレクション名。以下に例を示します。

    into: "myOutput"

  • 指定されたデータベース内のコレクションに出力するドキュメント内のデータベース名とコレクション名。以下に例を示します。

    into: { db:"myDB", coll:"myOutput" }

出力コレクションが存在しない場合、$merge はコレクションを作成します。

出力コレクションはシャーディングされたコレクションにすることができます。

任意。 ドキュメントの一意の識別子として機能するフィールド。 識別子は、結果ドキュメントが出力コレクション内の既存のドキュメントと 一致する かどうかを判断します。次のいずれかを指定します。

  • 文字列としての 1 つのフィールド名。以下に例を示します。

    on: "_id"

  • 配列内のフィールドの組み合わせ。以下に例を示します。

    on: [ "date", "customerId" ]
    The order of the fields in the array does not matter, and you cannot specify the same field multiple times.

指定されたフィールドの場合、次のようになります。

  • 集計結果ドキュメントには、on フィールドが _id フィールドである場合を除き、on で指定されたフィールドが含まれている必要があります。結果ドキュメントに _id フィールドがない場合、MongoDB はそれを自動的に追加します。

  • 指定されたフィールドに null 値または配列値を含めることはできません。

$merge ユニークインデックスには、オン識別子フィールドに対応するキーが必要です。インデックスキーを指定する順序は重要ではありませんが、ユニークインデックスにはキーとして on フィールドのみを含める必要があります。

  • また、インデックスが保持する照合は、集計が保持する照合と同じであることが必要です。

  • 一意なインデックスはスパース インデックスにできます。

  • 一意なインデックスを部分インデックスにすることはできません。

  • 既に存在する出力コレクションの場合、対応するインデックスが既に存在している必要があります。

オン のデフォルト値は出力コレクションによって異なります。

  • 出力コレクションが存在しない場合、 オン 識別子は である必要があり、デフォルトは_id フィールドになります。対応する一意の _idインデックスが自動的に作成されます。

  • 既存の出力コレクションがシャーディングされていない場合、on 識別子はデフォルトで _id フィールドになります。

  • 既存の出力コレクションがシャーディングされたコレクションの場合、 オン 識別子はデフォルトですべてのシャードキーフィールドと_id フィールドに設定されます。別の on 識別子を指定する場合、on にはすべてのシャードキーフィールドが含まれている必要があります。

任意。結果ドキュメントとコレクション内の既存のドキュメントで指定されたオン フィールドの値が同じ場合の $merge の動作。

次のいずれかを指定できます。

  • 以下の事前定義されたアクション文字列の中から 1 つ。

    アクション
    説明

    出力コレクション内の既存のドキュメントを一致する結果ドキュメントに置き換えます。

    置換を実行する場合、置換ドキュメントによって _id 値やシャードキー値(出力コレクションがシャーディングされている場合)は変更されません。それ以外の場合、操作はエラーを生成します。

    このエラーを回避するには、 オンフィールドに_id フィールドが含まれていない場合は、集計結果の_id フィールドを削除してエラーを回避します(たとえば、先行する$unset ステージなど)。

    "merge" (デフォルト)

    一致するドキュメントをマージします($mergeObjects 演算子と同様)。

    • 結果ドキュメントに既存のドキュメントにないフィールドが含まれている場合は、それらの新しいフィールドを既存のドキュメントに追加します。

    • 結果ドキュメントに既存のドキュメントのフィールドが含まれている場合は、既存のフィールド値を結果ドキュメントのフィールド値に置き換えます。

    たとえば、出力コレクションに以下のドキュメントがあるとします。

    { _id: 1, a: 1, b: 1 }

    また、集計結果には以下のドキュメントがあるとします。

    { _id: 1, b: 5, z: 1 }

    その場合、マージされたドキュメントは以下のようになります。

    { _id: 1, a: 1, b: 5, z: 1 }

    マージを実行する場合、マージされたドキュメントによって _id 値やシャードキー値(出力コレクションがシャーディングされている場合)は変更されません。それ以外の場合、操作はエラーを生成します。

    このエラーを回避するには、 オンフィールドに_id フィールドが含まれていない場合は、集計結果の_id フィールドを削除してエラーを回避します(たとえば、先行する$unset ステージなど)。

    集計操作を停止して失敗させます。前ドキュメントからの出力コレクションへの変更は、元には戻されません。

  • コレクション内のドキュメントを更新するための集計パイプライン。

    [ <stage1>, <stage2> ... ]

    パイプラインは、次のステージのみで構成できます。

    パイプラインはオン フィールドの値を変更できません。例、フィールドmonth で照合している場合、パイプラインはmonthフィールド を変更できません。

    whenMatched pipeline は、$<field> を使用して出力コレクション内の既存のドキュメントのフィールドに直接アクセスできます。

    集計結果ドキュメントのフィールドにアクセスするには、次のいずれかを使用します。

    • フィールドにアクセスするための組み込み $$new 変数。 具体的には、$$new.<field>$$new変数は、 let 指定が省略されている場合にのみ使用できます。

    • letフィールド内のユーザー定義の変数

      二重ドル記号($$)プレフィックスと変数名を $$<variable_name> 形式で指定します。たとえば、$$year のような形式です。変数がドキュメントに設定されている場合は、$$<variable_name>.<field> という形式でフィールドを含めることもできます。たとえば、$$year.month のような形式です。

      その他の例については、「 変数を使用したマージのカスタマイズ 」を参照してください。

任意。 whenMatchedパイプラインで使用する変数を指定します。

以下のように、変数名と値の式を使ってドキュメントを指定します。

{ <variable_name_1>: <expression_1>,
...,
<variable_name_n>: <expression_n> }

指定されていない場合、デフォルトは{ new: "$$ROOT" } になります(ROOT を参照)。whenMatchedパイプラインは $$new変数にアクセスできます。

whenMatchedパイプライン内の変数にアクセスするには、次の手順に従います。

二重ドル記号($$)プレフィックスと変数名を $$<variable_name> 形式で指定します。たとえば、$$year のような形式です。変数がドキュメントに設定されている場合は、$$<variable_name>.<field> という形式でフィールドを含めることもできます。たとえば、$$year.month のような形式です。

例については、「 変数を使用したマージのカスタマイズ 」を参照してください。

任意。結果ドキュメントがアウトコレクション内の既存のドキュメントと一致しない場合の $merge の動作。

以下の事前定義されたアクション文字列のいずれかを指定できます。

アクション
説明

"insert" (Default)

ドキュメントを出力コレクションに挿入します。

ドキュメントを破棄します。具体的には、$merge はドキュメントを出力コレクションに挿入しません。

集計操作を停止して失敗させます。すでに出力コレクションに書き込まれた変更は、元には戻されません。

集計パイプライン結果のドキュメントに _id フィールドが存在しない場合は、$merge ステージによって自動的にフィールドが生成されます。

たとえば、次の集計パイプラインでは、$project$merge に渡されるドキュメントから_id フィールドを除外します。$merge がこれらのドキュメントを "newCollection" に書き込むと、$merge は新しい_id フィールドと値を生成します。

db.movies.aggregate( [
{ $project: { _id: 0 } },
{ $merge : { into : "newCollection" } }
] )

指定された出力コレクションが存在しない場合は、$merge操作によって新しいコレクションが作成されます。

  • 出力コレクションは、$merge が最初のドキュメントをコレクションに書込んだときに作成され、すぐに表示されます。

  • 集計が失敗した場合、エラーの前に $merge によって完了した書き込みはロールバックされません。

注意

レプリカセットまたはスタンドアロンでは、出力データベースが存在しない場合、$merge によってデータベースも作成されます。

シャーディングされたクラスターの場合、指定された出力データベースがすでに存在している必要があります。

出力コレクションが存在しない場合、$mergeではオン識別子が _id フィールドである必要があります。存在しないコレクションに対して別の on フィールド値を使用するには、まず目的のフィールドに一意なインデックスを作成してコレクションを作成します。たとえば、出力コレクション newDailyCommentCount が存在せず、オン識別子としてcommentDate フィールドを指定する場合は次のようになります。

db.newDailyCommentCount.createIndex(
{ commentDate: 1 }, { unique: true } )
db.comments.aggregate( [
{ $match: { date: { $gte: new Date("2002-01-01"),
$lt: new Date("2002-02-01") } } },
{ $group: { _id: { $dateToString: { format: "%Y-%m-%d",
date: "$date" } }, count: { $sum: 1 } } },
{ $project: { _id: 0, commentDate: { $toDate: "$_id" },
count: 1 } },
{ $merge : { into : "newDailyCommentCount",
on: "commentDate" } }
] )

$merge ステージはシャーディングされたコレクションに出力できます。出力コレクションがシャーディングされた場合、$merge_id フィールドとすべてのシャードキー フィールドをデフォルトのオン識別子として使用します。デフォルトを上書きする場合、オン識別子にすべてのシャードキー フィールドを含める必要があります。

{ $merge: {
into: "<shardedColl>" or { db:"<sharding enabled db>", coll: "<shardedColl>" },
on: [ "<shardkeyfield1>", "<shardkeyfield2>",... ], // Shard key fields and any additional fields
let: <variables>, // Optional
whenMatched: <replace|keepExisting|merge|fail|pipeline>, // Optional
whenNotMatched: <insert|discard|fail> // Optional
} }

たとえば、sh.shardCollection() メソッドを使用して、rated フィールドをシャードキーとして持つ新しいシャーディングされたコレクションmoviesByYearAndRating を作成します。

sh.shardCollection(
"sample_mflix.moviesByYearAndRating", // Namespace of the collection to shard
{ rated: 1 }, // Shard key
);

moviesByYearAndRatingコレクションには、年(year フィールド)とコンテンツレーティング(シャードキー)ごとの映画統計情報を含むドキュメントが格納されます。具体的には、オン識別子は ["year", "rated"] です(フィールドの順序は関係ありません)。$mergeには、オン識別子フィールドに対応するキーを持つユニークインデックスが必要なので、ユニークインデックスを作成します(フィールドの順序は関係ありません)。[1]

db.moviesByYearAndRating.createIndex(
{ rated: 1, year: 1 }, { unique: true } )

シャーディングされたコレクション moviesByYearAndRating と一意のインデックスを作成すると、$merge を使用して集計結果をこのコレクションに出力でき、この例のように [ "year", "rated" ] に一致します。

db.movies.aggregate( [
{ $match: { rated: { $ne: null }, year: { $ne: null } } },
{ $group: {
_id: { year: "$year", rated: "$rated" },
movieCount: { $sum: 1 } } },
{ $project: { _id: 0, year: "$_id.year", rated: "$_id.rated",
movieCount: 1 } },
{ $merge: { into: "moviesByYearAndRating",
"on": [ "year", "rated" ], whenMatched: "replace",
whenNotMatched: "insert" } }
] )
[1] sh.shardCollection() メソッドは、シャードキーが範囲ベースであり、コレクションが空であり、シャードキーにユニークインデックスがまだ存在しない場合に、{ unique: true } オプションを渡すとシャードキーにユニークインデックスを作成することもできます。前述の例では、on 識別子はシャードキーと別のフィールドであるため、対応するインデックスを作成するための別の操作が必要です。

$merge は、集計結果にオンの指定に基づいて一致するドキュメントが含まれている場合、出力先コレクション内の既存のドキュメントを置き換えることができます。そのため、集約結果にコレクション内のすべての既存ドキュメントと一致するドキュメントが含まれており、かつ「replace」whenMatched に対して指定した場合、$merge は既存のコレクション内のすべてのドキュメントを置き換えることができます。

ただし、集計結果に関係なく既存のコレクションを置き換えるには、代わりに $out を使用します。

$mergeによって既存のドキュメントの _id 値が変更されると、$merge エラーが発生します。

Tip

このエラーを回避するには、オンフィールドに_idフィールドが含まれていない場合は、集計結果の_idフィールドを削除してエラーを回避します(たとえば、先行する$unsetステージなど)。

さらに、シャーディングされたコレクションの場合、既存のドキュメントのシャードキー値が変更されると、$merge はエラーも生成します。

エラーが発生する前に $merge によって完了した書き込みはロールバックされません。

$mergeオンフィールドに使用する一意のインデックスが集計の途中で削除された場合、集計が強制終了される保証はありません。集計が続行される場合、ドキュメントに重複する on フィールド値が存在しないという保証はありません。

$merge が出力コレクションの一意のインデックスに違反するドキュメントを書込もうとした場合、操作でエラーが生成されます。例:

コレクションでスキーマ検証が使用されており、 validationActionerrorに設定されている場合、無効なドキュメントを挿入したり、 $mergeを使用して無効な値を持つドキュメントを更新したりすると、 MongoServerErrorがスローされ、ドキュメントはターゲット コレクションに書き込まれません。 無効なドキュメントが複数ある場合、最初に見つかった無効なドキュメントのみがエラーをスローします。 すべての有効なドキュメントはターゲット コレクションに書き込まれ、すべての無効なドキュメントは書込みに失敗します。

$merge 次のすべてが真である場合、ドキュメントを出力コレクションに直接挿入します。

  • whenMatchedの値は集計パイプラインです。

  • whenNotMatched の値は insert です。

  • 出力コレクションに一致するドキュメントがありません。

$merge の導入により、MongoDB は集計パイプラインの結果をコレクションに書き込むための 2 つのステージ $merge$out を提供します。

$merge
  • 同じデータベースまたは異なるデータベース内のコレクションに出力できます。

  • 同じデータベースまたは異なるデータベース内のコレクションに出力できます。

  • 出力コレクションがまだ存在しない場合は、新しいコレクションを作成します。

  • 出力コレクションがまだ存在しない場合は、新しいコレクションを作成します。

  • 結果(新しいドキュメントの挿入、ドキュメントのマージ、ドキュメントの置換、既存のドキュメントの保持、操作の失敗、カスタムアップデートパイプラインによるドキュメントの処理)を既存のコレクションに組み込むことができます。

  • 出力コレクションが既に存在する場合は、完全に置き換えます。

  • シャーディングされたコレクションに出力できます。入力コレクションもシャーディング可能です。

  • シャーディングされたコレクションに出力できません。ただし、入力コレクションはシャーディングできます。

  • 以下の SQL ステートメントに対応します。

    • MERGE.

    • INSERT INTO T2 SELECT FROM T1.

    • SELECT INTO T2 FROM T1.

    • マテリアライズドビューの作成と更新。

  • 以下の SQL ステートメントに対応します。

    • INSERT INTO T2 SELECT FROM T1.

    • SELECT INTO T2 FROM T1.

警告

$merge の出力が集約されている同じコレクションに出力される場合、ドキュメントが複数回更新されたり、操作によって無限ループが発生したりする可能性があります。この動作は、$merge によって実行されるアップデートによって、ディスクに保存されているドキュメントの物理的な場所が変更された場合に発生します。ドキュメントの物理的な場所が変更されると、$merge はそのドキュメントを完全に新しいドキュメントとして表示し、追加のアップデートが行われることがあります。この動作の詳細については、日付の問題 を参照してください。

$merge は、集約されている同じコレクションに出力できます。$lookupなど、パイプラインの他のステージに表示されるコレクションに出力することもできます。

制限事項
説明

集約パイプラインでは、トランザクション内で $merge を使用できません。

集計パイプラインでは、$merge を使用して時系列コレクションに出力することはできません。

Separate from materialized view

ビュー定義に $merge ステージを含めることはできません。ビュー定義に入れ子パイプラインが含まれている場合(たとえば、ビュー定義に $facet ステージが含まれている場合)、この $merge ステージの制限は入れ子パイプラインにも適用されます。

$lookup ステージ

$lookup ステージのネストされたパイプライン$merge ステージを含めることはできません。

$facet ステージ

$facet ステージのネストされたパイプライン$merge ステージを含めることはできません。

$unionWith ステージ

$unionWith ステージのネストされたパイプライン$merge ステージを含めることはできません。

"linearizable" 読み取り保証 (read concern)

$mergeステージは読み取り保証(read concern)"linearizable"と組み合わせて使用することはできません。つまり、db.collection.aggregate()に対して読み取り保証(read concern)"linearizable"を指定した場合、パイプラインに$mergeステージを含めることはできません。

このページの例では、sample_mflixサンプルデータセットのデータを使用します。このデータセットを自己管理型MongoDB配置にロードする方法の詳細については、サンプルデータセットをロードする を参照してください。サンプルデータベースに変更を加えた場合、このページの例を実行するには、データベースを削除して再作成する必要がある場合があります。

出力コレクションが存在しない場合、$merge はコレクションを作成します。

注意

レプリカセット またはスタンドアロン配置では、出力データベースが存在しない場合、$merge によってデータベースも作成されます。

シャーディングされたクラスター配置の場合、指定された出力データベースがすでに存在している必要があります。

$group$merge を使用すると、リリース年とコンテンツレーティングごとに高い評価を受けた映画をまとめたmovieRatingSummary コレクションを作成できます。

db.movies.aggregate( [
{ $match: { metacritic: 100, rated: { $ne: null },
year: { $lte: 1972 } } },
{ $group: { _id: { year: "$year", rated: "$rated" },
count: { $sum: 1 } } },
{ $merge : { into: "movieRatingSummary", on: "_id",
whenMatched: "replace", whenNotMatched: "insert" } }
] )

このパイプラインでは、次のステージを使用します。

  • $match 1972 年までにリリースされ、コンテンツ レーティングが設定されている高評価映画を抽出するステージ

  • $group ステージ(yearrated で映画をグループ化します)

  • $merge 直前の $group ステージの出力を、sample_mflix データベース内の movieRatingSummary コレクションに書き込むステージ

新しい movieRatingSummary コレクション内のドキュメントを表示するには次のようにします。

db.movieRatingSummary.find().sort(
{ _id: 1 } )
[
{ _id: { year: 1939, rated: 'PASSED' }, count: 1 },
{ _id: { year: 1962, rated: 'PG' }, count: 1 },
{ _id: { year: 1963, rated: 'PG' }, count: 1 },
{ _id: { year: 1970, rated: 'R' }, count: 1 },
{ _id: { year: 1972, rated: 'R' }, count: 1 }
]

前の例の movieRatingSummary コレクションを更新して、1963 年以降の高評価映画を含めるために、この集約パイプラインでは次のステージを使用します。

  • $match ステージ(metacritic: 100、コンテンツレーティングが設定されており、リリース年が 1963 以上であるすべての映画を検索します)。

  • $group ステージ(yearrated で映画をグループ化します)。

  • $merge movieRatingSummary コレクションに結果セットを書き込み、同じ _id 値のドキュメントを置き換えます。コレクション内に一致するドキュメントが存在しない場合、$merge は新しいドキュメントを挿入します。

db.movies.aggregate( [
{ $match: { metacritic: 100, rated: { $ne: null },
year: { $gte: 1963 } } },
{ $group: { _id: { year: "$year", rated: "$rated" },
count: { $sum: 1 } } },
{ $merge : { into: "movieRatingSummary", on: "_id",
whenMatched: "replace", whenNotMatched: "insert" } }
] )

集計の実行後、movieRatingSummary コレクション内のドキュメントを表示します。

db.movieRatingSummary.find().sort(
{ _id: 1 } )
[
{ _id: { year: 1939, rated: 'PASSED' }, count: 1 },
{ _id: { year: 1962, rated: 'PG' }, count: 1 },
{ _id: { year: 1963, rated: 'PG' }, count: 1 },
{ _id: { year: 1970, rated: 'R' }, count: 1 },
{ _id: { year: 1972, rated: 'R' }, count: 1 },
{ _id: { year: 1982, rated: 'R' }, count: 1 },
{ _id: { year: 2014, rated: 'R' }, count: 1 }
]

$merge がコレクション内の既存のデータを上書きしないようにするには、whenMatched keepExisting または失敗に設定します。

sample_mflix データベースの movieArchive コレクションには、リリース年ごとの高評価映画の履歴レコードが格納されています。

movieArchive コレクションでは、year フィールドに一意のインデックスがあります。リリース年ごとに存在するレコードは最大 1 件でなければなりません。

db.movieArchive.createIndex(
{ year: 1 }, { unique: true } )

この集計パイプラインは、movieArchive コレクションを movies コレクションのデータで更新し、1963 年以降の高評価映画を含めます。このパイプラインでは、次のステージを使用します。

  • $matchステージ(metacritic: 100、コンテンツレーティング、およびyear >= 1963を持つすべての映画を検索します)。

  • $groupステージ(映画タイトルをyearでグループ化します)。

  • $project ステージ(_idフィールドを抑制し、yearを最上位フィールドに昇格させます)。ドキュメントが $merge に渡されると、$merge はドキュメントに対して新しい_id フィールドを自動的に生成します。

  • $merge 結果セットを movieArchive に書き込みます。

    $merge ステージは、year フィールドでドキュメントを照合し、一致した場合は失敗します。つまり、そのリリース年のドキュメントがすでに存在する場合、$merge はエラーになります。

db.movies.aggregate( [
{ $match: { metacritic: 100, rated: { $ne: null },
year: { $gte: 1963 } } },
{ $group: { _id: "$year",
titles: { $push: "$title" } } },
{ $project: { _id: 0, year: "$_id", titles: 1 } },
{ $merge : { into: "movieArchive", on: "year",
whenMatched: "fail" } }
] )

movieArchive コレクションに、1963 年から 2014 年までの範囲のドキュメントがすでに含まれている場合、集約処理は重複キーエラーのため失敗します。ただし、このパイプラインはエラー発生前に挿入したドキュメントをロールバックしません。

一致するドキュメントにkeepExistingを指定すると、集計は一致するドキュメントに影響を与えず、重複キー エラーは発生しません。同様に、replaceを指定した場合、操作は失敗しませんが、既存のドキュメントが置き換えられます。

デフォルトでは、集計結果内のドキュメントがコレクション内のドキュメントと一致する場合、$merge ステージはドキュメントをマージします。

$mergeを使用してmoviesコレクションの結果とcommentsコレクションの結果をマージし、新しいコレクションyearlyStatsを作成できます。

yearlyStats コレクションを作成するには、次のパイプラインを実行します。

db.movies.aggregate( [
{ $match: { metacritic: 100, rated: { $ne: null },
year: { $gte: 1970, $lte: 1972 } } },
{ $group: { _id: "$year", movieCount: { $sum: 1 } } },
{ $merge : { into: "yearlyStats", on: "_id",
whenMatched: "merge", whenNotMatched: "insert" } }
])
第 1 ステージ:
$match ステージ(コンテンツレーティングを持ち、1970 年から 1972 年の間にリリースされた、評価の高い映画(metacritic: 100)をフィルタリングします)。
第 2 ステージ:
$group ステージ(yearでグループ化し、新しい movieCount フィールドで映画をカウントします)。
第 3 ステージ:
$mergeステージでは、ドキュメントを同じデータベース内のyearlyStatsコレクションに書き込みます。ステージがコレクション内で_idフィールドに一致する既存のドキュメントを見つけた場合、ステージは一致するドキュメントをマージします。それ以外の場合、ステージはドキュメントを挿入します。最初の作成では、一致するドキュメントはありません。

コレクション内の文書を表示するには、以下の操作を実行します。

db.yearlyStats.find().sort( { _id: 1 } )
[
{ _id: 1970, movieCount: 1 },
{ _id: 1972, movieCount: 1 }
]

同様に、comments コレクションに対して次の集計パイプラインを実行して、コメント数を yearlyStats コレクションにマージします。

db.comments.aggregate( [
{ $match: { date: { $gte: new Date("1970-01-01"),
$lt: new Date("1973-01-01") } } },
{ $group: { _id: { $year: "$date" },
commentCount: { $sum: 1 } } },
{ $merge : { into: "yearlyStats", on: "_id",
whenMatched: "merge", whenNotMatched: "insert" } }
])
第 1 ステージ:
$matchステージ(1970 年から 1972 年の間に投稿されたコメントをフィルタリングします)。
第 2 ステージ:
$group ステージ(コメント date から抽出した年でグループ化し、コメント数を新しい commentCount フィールドにカウントします)。
第 3 ステージ:
$merge ステージでは、ドキュメントを同じデータベース内の yearlyStats コレクションに書き込みます。ステージがコレクション内で _id フィールド(年)に一致する既存のドキュメントを見つけた場合、ステージは一致するドキュメントをマージします。それ以外の場合、ステージはドキュメントを挿入します。

データがマージされた後に yearlyStats コレクション内のドキュメントを表示するには、次の操作を実行します。

db.yearlyStats.find().sort( { _id: 1 } )
[
{ _id: 1970, movieCount: 1, commentCount: 889 },
{ _id: 1971, commentCount: 825 },
{ _id: 1972, movieCount: 1, commentCount: 863 }
]

$merge は、ドキュメントが一致する場合にカスタム更新パイプラインを使用できます。whenMatched パイプラインには次のステージがあります。

monthlyCommentTotals コレクションは、各月のコメント数の累計を管理しています。

毎日、新しいコメントが sample_mflix.comments コレクションに届きます。次の集約パイプラインは、その日のコメント数を月ごとの合計に反映して更新します。

db.comments.aggregate([
{ $match: { date: { $gte: new Date("1970-01-15"),
$lt: new Date("1970-01-16") } } },
{ $group: { _id: { $dateToString: { format: "%Y-%m",
date: "$date" } }, count: { $sum: 1 } } },
{ $merge: {
into: "monthlyCommentTotals",
on: "_id",
whenMatched: [
{ $addFields: {
count: { $add: [ "$count", "$$new.count" ] }
} }
],
whenNotMatched: "insert"
} }
])
第 1 ステージ:
$match ステージ(1970 年 1 月 15 日に投稿されたすべてのコメントを検索します)。
第 2 ステージ:
$group ステージ(一致したコメントを年月でグループ化し、その件数をカウントします)。
第 3 ステージ:

$merge ステージは、ドキュメントを monthlyCommentTotals コレクションに書き込みます。ステージが、コレクション内で _id フィールドに一致する既存のドキュメントを見つけた場合、そのステージはパイプラインを使用して、その日の count を既存の月次合計に追加します。

  • このパイプラインは、結果ドキュメントのフィールドに直接アクセスすることはできません。結果ドキュメント内の count フィールドにアクセスするために、このパイプラインは $$new 変数、つまり $$new.count を使用します。

  • このパイプラインは、コレクション内の既存ドキュメントの count フィールド、つまり、$count に直接アクセスできます。

結果のドキュメントによって、既存のドキュメントが置き換えられます。

マージ操作後に monthlyCommentTotals コレクション内のドキュメントを表示するには、次の操作を実行します。

db.monthlyCommentTotals.find()
[ { _id: '1970-01', count: 71 } ]

$merge ステージの whenMatched フィールドで変数を使用できます。変数は、使用する前に定義する必要があります。

次のいずれかまたは両方で変数を定義します。

whenMatched で変数を使用するには次のようにします。

二重ドル記号($$)プレフィックスと変数名を $$<variable_name> 形式で指定します。たとえば、$$year のような形式です。変数がドキュメントに設定されている場合は、$$<variable_name>.<field> という形式でフィールドを含めることもできます。たとえば、$$year.month のような形式です。

以下のタブは、変数を mergeステージ、aggregate コマンド、またはその両方で定義したときの動作を示します。

$mergeステージ let で変数を定義し、whenMatchedフィールドの変数を使用できます。

次の例:

  • movieDetails コレクションの映画を使用して movies コレクションを作成します

  • $mergeletyear 変数を定義するaggregate コマンドを実行し、whenMatchedを使用して年数をmovieDetails に追加します

  • retrieves the movieDetails document

db.movies.aggregate( [
{ $match: { title: "The Godfather" } },
{ $limit: 1 },
{ $project: { title: 1 } },
{ $merge: { into: "movieDetails", whenNotMatched: "insert" } }
] )
db.runCommand( {
aggregate: db.movieDetails.getName(),
pipeline: [ {
$merge: {
into: db.movieDetails.getName(),
let : { year: "2023" },
whenMatched: [ {
$addFields: { "addedYear": "$$year" }
} ]
}
} ],
cursor: {}
} )
db.movieDetails.find()
[ { _id: ..., title: 'The Godfather', addedYear: '2023' } ]

バージョン 5.0 で追加

aggregateコマンド let で変数を定義し、$merge ステージのwhenMatched フィールドで変数を使用できます。

次の例:

  • movieDetails コレクションの映画を使用して movies コレクションを作成します

  • aggregateコマンドletyear変数を定義するaggregateコマンドを実行しwhenMatchedを使用してmovieDetailsに年数を追加します

  • retrieves the movieDetails document

db.movies.aggregate( [
{ $match: { title: "The Godfather" } },
{ $limit: 1 },
{ $project: { title: 1 } },
{ $merge: { into: "movieDetails", whenNotMatched: "insert" } }
] )
db.runCommand( {
aggregate: db.movieDetails.getName(),
pipeline: [ {
$merge: {
into: db.movieDetails.getName(),
whenMatched: [ {
$addFields: { "addedYear": "$$year" }
} ]
}
} ],
cursor: {},
let : { year: "2023" }
} )
db.movieDetails.find()
[ { _id: ..., title: 'The Godfather', addedYear: '2023' } ]

変数は、 $mergeステージで定義できます。また、MongoDB 5.0以降では、 aggregateコマンドでも定義できます。

$merge ステージとaggregate コマンドで同じ名前の2つの変数が定義されている場合は、$merge ステージ変数が使用されます。

この例では、パイプラインはyear: "2019" aggregate コマンド変数の代わりにyear: "2023" を使用します。

db.movies.aggregate( [
{ $match: { title: "The Godfather" } },
{ $limit: 1 },
{ $project: { title: 1 } },
{ $merge: { into: "movieDetails", whenNotMatched: "insert" } }
] )
db.runCommand( {
aggregate: db.movieDetails.getName(),
pipeline: [ {
$merge: {
into: db.movieDetails.getName(),
let : { year: "2023" },
whenMatched: [ {
$addFields: { "addedYear": "$$year" }
} ]
}
} ],
cursor: {},
let : { year: "2019" }
} )
db.movieDetails.find()
[ { _id: ..., title: 'The Godfather', addedYear: '2023' } ]

次の Movie クラスは、この例で使用されるドキュメントをモデル化します。

[BsonIgnoreExtraElements]
public class Movie
{
[BsonId]
public ObjectId Id { get; set; }
[BsonElement("title")]
public string Title { get; set; } = null!;
[BsonElement("year")]
public int? Year { get; set; }
[BsonElement("runtime")]
public int? Runtime { get; set; }
[BsonElement("rated")]
public string? Rated { get; set; }
[BsonElement("metacritic")]
public int Metacritic { get; set; }
[BsonElement("plot")]
public string? Plot { get; set; }
[BsonElement("type")]
public string? Type { get; set; }
[BsonElement("cast")]
public string[]? Cast { get; set; }
[BsonElement("directors")]
public string[]? Directors { get; set; }
[BsonElement("writers")]
public string[]? Writers { get; set; }
[BsonElement("imdb")]
public ImdbData? Imdb { get; set; }
}

MongoDB .NET/ C#ドライバーを使用して$merge ステージを集計パイプラインに追加するには、 PipelineDefinitionオブジェクトで merge() メソッドを呼び出します。

Merge() メソッドを呼び出すときは、mergeStageOptionsクラスのインスタンスを渡す必要があります。このオブジェクトを使用すると、一致するドキュメントの処理方法など、$merge ステージのオプションを指定できます。

次の例では、パイプライン内のドキュメントを moviesコレクションにマージするパイプラインステージを作成しています。MergeStageOptionsオブジェクトは次のオプションを指定します。

  • OnFieldNames オプションは、操作が "_id" フィールドを使用して一致するドキュメントを識別することを指定します。

  • WhenMatched オプションは、ソース コレクション内のドキュメントがターゲット コレクション内のドキュメントと一致する場合、操作によって一致したドキュメントが置き換えられることを指定します。

  • WhenNotMatched オプションは、ソース コレクション内のドキュメントがターゲット コレクション内のドキュメントと一致しない場合に、この操作によってドキュメントがターゲット コレクションに挿入されることを指定します。

var pipeline = new EmptyPipelineDefinition<Movie>()
.Merge(_targetCollection,
new MergeStageOptions<Movie>()
{
OnFieldNames = new List<string>() { "_id" },
WhenMatched = MergeStageWhenMatched.Replace,
WhenNotMatched = MergeStageWhenNotMatched.Insert,
});
{ "_id": "...", "title": "Back to the Future", "metacritic": 96 }
{ "_id": "...", "title": "Jurassic Park", "metacritic": 68 }
{ "_id": "...", "title": "The Shawshank Redemption", "metacritic": 80 }

このページのNode.js の例では、Atlasサンプルデータセットsample_mflixデータベースを使用します。無料のMongoDB Atlas cluster を作成し、サンプルデータセットをロードする方法については、 MongoDB Node.jsドライバーのドキュメントの開始を参照してください。

MongoDB Node.jsドライバーを使用して $merge ステージを集計パイプラインに追加するには、パイプラインオブジェクトで $merge 演算子を使用します。

次の例では、パイプライン内のドキュメントを moviesコレクションにマージするパイプラインステージを作成しています。この例には、次のフィールドが含まれています。

  • on オプションは、操作が"_id" フィールドと "title" フィールドを使用して、ソースコレクションと moviesコレクション内の一致するドキュメントを検索することを指定します。

  • whenMatched オプションは、ソースコレクション内のドキュメントがmoviesコレクション内のドキュメントと一致する場合に、moviesコレクション内のドキュメントを置き換えることを指定します。

  • whenNotMatched オプションは、ソースコレクション内のドキュメントがmoviesコレクション内のドキュメントと一致しない場合に、この操作ドキュメントが moviesコレクションに挿入されることを指定します。

次に、この例では集計パイプラインを実行します。

const pipeline = [
{
$merge: {
into: "movies",
on: ["_id", "title"],
whenMatched: "replace",
whenNotMatched: "insert"
}
}
];
const cursor = collection.aggregate(pipeline);
return cursor;

戻る

$match

項目一覧