Docs Menu
Docs Home
/ /
/ / /

$lookup (ストリーム プロセシング)

$lookupステージは、 $sourceからのメッセージストリームを接続レジストリの Atlas コレクションに左外部結合を実行します。

ユースケースに応じて、 $lookupパイプライン ステージは次の 3 つの構文のいずれかを使用します。

詳しくは、「 $lookup 構文 」を参照してください。

警告

$lookupを使用してストリームを強化することで、ストリーム処理速度が低下する可能性があります。

次のプロトタイプ形式は、利用可能なすべてのフィールドを示しています。

{
"$lookup": {
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"pipeline": [
<pipeline to run>
],
"as": "<output-array-field>",
"parallelism": <integer>,
"partitionBy": <expression>
}
}

$lookupステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

from

ドキュメント

条件付き

$sourceからのメッセージに参加する Atlasデータベース内のコレクションを指定するドキュメント。接続レジストリからのみコレクションを指定する必要があります。

このフィールドを指定する場合は、 このドキュメント内のすべてのフィールドに値を指定する必要があります。

このフィールドはpipelineフィールドを指定する場合は必須ではありません。

from. connectionName

string

条件付き

接続レジストリ内の接続の名前。

このフィールドはpipelineフィールドを指定する場合は必須ではありません。

from.db

string

条件付き

参加するコレクションを含む Atlas database の名前。

このフィールドはpipelineフィールドを指定する場合は必須ではありません。

from.coll

string

条件付き

参加するコレクションの名前。

このフィールドはpipelineフィールドを指定する場合は必須ではありません。

localField

string

条件付き

参加する$sourceメッセージのフィールド。

このフィールドは、次の構文の一部です。

foreignField

string

条件付き

結合するfromコレクション内のドキュメントのフィールド。

このフィールドは、次の構文の一部です。

let

ドキュメント

条件付き

パイプラインステージで使用する変数を指定します。 詳しくは、 let を参照してください。

このフィールドは、次の構文の一部です。

パイプライン

ドキュメント

条件付き

結合済みコレクションで実行するpipelineを指定します。 詳しくは、「パイプライン 」を参照してください。

このフィールドは、次の構文の一部です。

as

string

必須

入力ドキュメントに追加する新しい配列フィールドの名前。 この新しい配列フィールドには、 fromコレクションと一致するドキュメントが含まれます。 指定した名前がすでに入力ドキュメントのフィールドとして存在する場合、そのフィールドは上書きされます。

parallelism

integer

任意

$lookup ターゲットに対して行われた並列リクエストの最大数。並列処理の値が大きいと、 $lookupスループットが増加する可能性があります。ただし、値が大きいと、$lookup ターゲット クラスターのリソースも多く使用される可能性があります。

1 から 16 までの整数である必要があります。デフォルトは 1 です。

partitionBy

任意

入力ストリームを並列スレッドに分割するために使用される 式 。Atlas Stream Processing は、同じ partitionBy式の結果を持つ各入力ドキュメントを同じスレッドに割り当てます。

このフィールドが指定されていない場合、入力ドキュメントはラウンドロ取りによって並列スレッドに送信されます。

Atlas Stream Processing バージョンの$lookupは、 $sourceからのメッセージと指定された Atlas コレクション内のドキュメントの左外部結合を実行します。 このバージョンは、標準の MongoDB database で使用可能な$lookupステージと同様に動作します。 ただし、このバージョンでは、 fromフィールドの値として接続レジストリの Atlas コレクションを指定する必要があります。

パイプラインにはネストされた$lookupステージを含めることができます。 パイプラインにネストされた$lookupステージを含める場合は、標準のfrom構文を使用して、外側の$lookupステージと同じリモート Atlas 接続でコレクションを指定する必要があります。

$lookup : {
from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"},
,
pipeline: [
,
{
$lookup: {
from: "coll2",
,
}
},
,
]
}

パイプラインで同じコレクションに$lookup$mergeの両方がある場合、インクリメンタル ビューを維持しようとすると、Atlas Stream Processing の結果が異なる場合があります。 Atlas Stream Processing は複数のソース メッセージを同時に処理し、それらをすべてマージします。 複数のメッセージが同じ ID を持ち、 $lookup$mergeの両方が使用する場合、Atlas Stream Processing ではまだマテリアライズドされていない結果が返されることがあります。

次の入力ストリームを考えてみましょう。

{ _id: 1, count: 2 }
{ _id: 1, count: 3 }

パイプライン内でクエリに次の内容が含まれているとします。

{
...,
pipeline: [
{ $lookup on _id == foreignDoc._id from collection A }
{ $project: { _id: 1, count: $count + $foreignDoc.count } }
{ $merge: { into collection A } }
]
}

インクリメンタル ビューを維持しようとすると、次のような結果が予想されます。

{ _id: 1, count: 5 }

ただし、 Atlas Stream Processing Atlas Stream Processingドキュメントを処理したかどうかに応じて、5 または 3 のカウントが返される場合があります。

詳細については、 $lookupを参照してください。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 humidity_descriptionsという名前のコレクションには次の形式のドキュメントが含まれています。

relative_humidityフィールドは温度( 20摂氏)の相対温度を表し、 conditionはその温度のレベルに適した言語記述子をリストします。 $lookupステージを使用して、気象配信で使用する推奨される記述子を使用してストリーミング気象レポートを強化できます。

次の集計には 4 つのステージがあります。

  1. $sourceステージでは、 という名前のトピックでこれらのレポートを収集するApache Kafkamy_weatherdata プロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTime に設定されます。

  2. $lookupステージは、 humidity_descriptionsデータベースのレコードをdewPointフィールドの気象レポートに結合します。

  3. $matchステージでは、 humidity_infoフィールドが空のドキュメントは除外され、 humidity_infoフィールドに入力されたドキュメントは次のステージに渡されます。

  4. $mergeステージは、 sample_weatherstreamデータベース内のenriched_streamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$lookup': {
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
}
{ '$match': { 'humidity_info': { '$ne': [] } } }
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

結果のsample_weatherstream.enriched_streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$https

項目一覧