定義
 $lookupステージは、  $sourceからのメッセージストリームを接続レジストリの Atlas コレクションに左外部結合を実行します。
ユースケースに応じて、 $lookupパイプライン ステージは次の 3 つの構文のいずれかを使用します。
詳しくは、「 $lookup 構文 」を参照してください。
警告
$lookupを使用してストリームを強化することで、ストリーム処理速度が低下する可能性があります。
次のプロトタイプ形式は、利用可能なすべてのフィールドを示しています。
{   "$lookup": {     "from": {       "connectionName": "<registered-atlas-connection>",       "db": "<registered-database-name>",       "coll": "<atlas-collection-name>"     },     "localField": "<field-in-source-messages>",     "foreignField": "<field-in-from-collection>",     "let": {       <var_1>: <expression>,       <var_2>: <expression>,       …,       <var_n>: <expression>     },     "pipeline": [       <pipeline to run>     ],     "as": "<output-array-field>",     "parallelism": <integer>,     "partitionBy": <expression>   } } 
構文
$lookupステージは、次のフィールドを持つドキュメントを取得します。
フィールド  | タイプ  | 必要性  | 説明  | 
|---|---|---|---|
from  | ドキュメント  | 条件付き  | 
 このフィールドを指定する場合は、 このドキュメント内のすべてのフィールドに値を指定する必要があります。 このフィールドは  | 
from. connectionName  | string  | 条件付き  | 接続レジストリ内の接続の名前。 このフィールドは  | 
from.db  | string  | 条件付き  | 参加するコレクションを含む Atlas database の名前。 このフィールドは  | 
from.coll  | string  | 条件付き  | 参加するコレクションの名前。 このフィールドは  | 
localField  | string  | 条件付き  | |
foreignField  | string  | 条件付き  | |
let  | ドキュメント  | 条件付き  | パイプラインステージで使用する変数を指定します。 詳しくは、 let を参照してください。 このフィールドは、次の構文の一部です。  | 
パイプライン  | ドキュメント  | 条件付き  | 結合済みコレクションで実行する このフィールドは、次の構文の一部です。  | 
as  | string  | 必須  | 入力ドキュメントに追加する新しい配列フィールドの名前。 この新しい配列フィールドには、   | 
  | integer  | 任意  | 
 
  | 
  | 式  | 任意  | 入力ストリームを並列スレッドに分割するために使用される 式 。Atlas Stream Processing は、同じ  このフィールドが指定されていない場合、入力ドキュメントはラウンドロ取りによって並列スレッドに送信されます。  | 
動作
Atlas Stream Processing バージョンの$lookupは、 $sourceからのメッセージと指定された Atlas コレクション内のドキュメントの左外部結合を実行します。 このバージョンは、標準の MongoDB database で使用可能な$lookupステージと同様に動作します。 ただし、このバージョンでは、 fromフィールドの値として接続レジストリの Atlas コレクションを指定する必要があります。
パイプラインにはネストされた$lookupステージを含めることができます。 パイプラインにネストされた$lookupステージを含める場合は、標準のfrom構文を使用して、外側の$lookupステージと同じリモート Atlas 接続でコレクションを指定する必要があります。
例
$lookup : {   from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"},     …,   pipeline: [     …,     {       $lookup: {         from: "coll2",         …,       }     },     …,   ] } 
パイプラインで同じコレクションに$lookupと$mergeの両方がある場合、インクリメンタル ビューを維持しようとすると、Atlas Stream Processing の結果が異なる場合があります。 Atlas Stream Processing は複数のソース メッセージを同時に処理し、それらをすべてマージします。 複数のメッセージが同じ ID を持ち、 $lookupと$mergeの両方が使用する場合、Atlas Stream Processing ではまだマテリアライズドされていない結果が返されることがあります。
例
次の入力ストリームを考えてみましょう。
{ _id: 1, count: 2 } { _id: 1, count: 3 } 
パイプライン内でクエリに次の内容が含まれているとします。
{   ...,   pipeline: [     { $lookup on _id == foreignDoc._id from collection A }     { $project: { _id: 1, count: $count + $foreignDoc.count } }     { $merge: { into collection A } }   ] } 
インクリメンタル ビューを維持しようとすると、次のような結果が予想されます。
{ _id: 1, count: 5 } 
ただし、 Atlas Stream Processing Atlas Stream Processingドキュメントを処理したかどうかに応じて、5 または 3 のカウントが返される場合があります。
詳細については、 $lookupを参照してください。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 humidity_descriptionsという名前のコレクションには次の形式のドキュメントが含まれています。
relative_humidityフィールドは温度( 20摂氏)の相対温度を表し、 conditionはその温度のレベルに適した言語記述子をリストします。 $lookupステージを使用して、気象配信で使用する推奨される記述子を使用してストリーミング気象レポートを強化できます。
次の集計には 4 つのステージがあります。
$sourceステージでは、 という名前のトピックでこれらのレポートを収集するApache Kafkamy_weatherdataプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTimeに設定されます。$lookupステージは、humidity_descriptionsデータベースのレコードをdewPointフィールドの気象レポートに結合します。$matchステージでは、humidity_infoフィールドが空のドキュメントは除外され、humidity_infoフィールドに入力されたドキュメントは次のステージに渡されます。$mergeステージは、sample_weatherstreamデータベース内のenriched_streamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{   '$source': {     connectionName: 'sample_weatherdata',     topic: 'my_weatherdata',     tsFieldName: 'ingestionTime'   } }, {   '$lookup': {     from: {       connectionName: 'weatherStream',       db: 'humidity',       coll: 'humidity_descriptions'     },   'localField':'dewPoint.value',   'foreignField':'dewPoint',   'as': 'humidity_info'   } } { '$match': { 'humidity_info': { '$ne': [] } } } {   '$merge': {     into: {       connectionName: 'weatherStream',       db: 'sample_weatherstream',       coll: 'enriched_stream'     }   } } 
結果のsample_weatherstream.enriched_streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").enriched_stream.find() 
{   st: 'x+55100+006100',   position: {     type: 'Point',     coordinates: [       92.7,       -53.6     ]   },   elevation: 9999,   callLetters: 'UECN',   qualityControlProcess: 'V020',   dataSource: '4',   type: 'FM-13',   airTemperature: {     value: -11,     quality: '9'   },   dewPoint: {     value: 12.5,     quality: '1'   },   pressure: {     value: 1032.7,     quality: '9'   },   wind: {     direction: {       angle: 300,       quality: '9'     },     type: '9',     speed: {       rate: 23.6,       quality: '2'     }   },   visibility: {     distance: {       value: 14000,       quality: '1'     },     variability: {       value: 'N',       quality: '1'     }   },   skyCondition: {     ceilingHeight: {       value: 390,       quality: '9',       determination: 'C'     },     cavok: 'N'   },   sections: [     'SA1',     'AA1',     'OA1',     'AY1',     'AG1'   ],   precipitationEstimatedObservation: {     discrepancy: '4',     estimatedWaterDepth: 21   },   atmosphericPressureChange: {     tendency: {       code: '1',       quality: '1'     },     quantity3Hours: {       value: 5.5,       quality: '1'     },     quantity24Hours: {       value: 99.9,       quality: '9'     }   },   seaSurfaceTemperature: {     value: 1.3,     quality: '9'   },   waveMeasurement: {     method: 'M',     waves: {       period: 4,       height: 2.5,       quality: '9'     },     seaState: {       code: '00',       quality: '9'     }   },   pastWeatherObservationManual: {     atmosphericCondition: {       value: '4',       quality: '1'     },     period: {       value: 6,       quality: '1'     }   },   skyConditionObservation: {     totalCoverage: {       value: '07',       opaque: '99',       quality: '1'     },     lowestCloudCoverage: {       value: '06',       quality: '1'     },     lowCloudGenus: {       value: '07',       quality: '9'     },     lowestCloudBaseHeight: {       value: 2250,       quality: '9'     },     midCloudGenus: {       value: '07',       quality: '9'     },     highCloudGenus: {       value: '00',       quality: '1'     }   },   presentWeatherObservationManual: {     condition: '75',     quality: '1'   },   atmosphericPressureObservation: {     altimeterSetting: {       value: 9999.9,       quality: '9'     },     stationPressure: {       value: 1032.6,       quality: '1'     }   },   skyCoverLayer: {     coverage: {       value: '09',       quality: '1'     },     baseHeight: {       value: 240,       quality: '9'     },     cloudType: {       value: '99',       quality: '9'     }   },   liquidPrecipitation: {     period: 6,     depth: 3670,     condition: '9',     quality: '9'   },   extremeAirTemperature: {     period: 99.9,     code: 'N',     value: -30.9,     quantity: '9'   },   ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),   humidity_info: [     {       _id: ObjectId('66ec805ad3cfbba767ebf7a5'),       dewPoint: 12.5,       relativeHumidity: 62,       condition: 'humid, muggy'     }   ], } 
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。