Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
Atlas
/ /

$cachedLookup

$cachedLookup

$cachedLookup ステージは、$source からのメッセージストリームを接続レジストリの Atlasコレクション に左外部結合を実行します。

このステージは $lookup ステージと同様に機能しますが、構成可能なパラメーターに従ってクエリの結果をキャッシュします。

重要

$cachedLookuplet または pipeline フィールドをサポートしていません。

詳しくは、「 $lookup 構文 」を参照してください。

次のプロトタイプ形式は、利用可能なすべてのフィールドを示しています。

{
"$lookup": {
"ttl": {
"size": <int>,
"unit": "ms" | "second" | "minute" | "hour" | "day"
},
"maxMemUsageBytes": <int>,
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"as": "<output-array-field>"
}
}

$cachedLookup は、$lookup の一般化バージョンと同じフィールドの一部を使用します。$cachedLookup にはクエリ キャッシュ動作を構成するフィールドが含まれており、接続レジストリからの接続を介してデータをクエリするための fromフィールドの変更された構文を提供します。

フィールド
タイプ
必要性
説明

ttl

ドキュメント

必須

キャッシュされたクエリの TTL を指定するドキュメント。

TTL.size

整数

必須

units のキャッシュされたクエリの TTL のサイズ。

ttl.unit

string

必須

キャッシュされたクエリの TTL を測定する時間の単位。次のいずれかである必要があります。

  • "ms"

  • "second"

  • "minute"

  • "hour"

  • "day"

maxMemUsageBytes

整数

必須

クエリ キャッシュに割り当てる最大メモリ(バイト単位)。キャッシュのサイズがこの値を超える場合、Atlas Stream Processing はまず古い結果を空き領域にエビクションします。このしきい値を下回るするための有効期限切れの結果が十分でない場合、Atlas Stream Processing は、キャッシュサイズがしきい値を下回るまで、キャッシュされたクエリをランダムにエビクションします。

デフォルトは、Stream Processingインスタンス内の使用可能なRAMの 10% です。 maxMemUsageBytes は、Stream Processingインスタンス内の使用可能なRAMの 12.5% を超えて設定できません。

from

ドキュメント

必須

$sourceからのメッセージに参加する Atlasデータベース内のコレクションを指定するドキュメント。接続レジストリからのみコレクションを指定する必要があります。

このフィールドを指定する場合は、 このドキュメント内のすべてのフィールドに値を指定する必要があります。

from. connectionName

string

必須

接続レジストリ内の接続の名前。

from.db

string

必須

参加するコレクションを含む Atlas database の名前。

from.coll

string

必須

参加するコレクションの名前。

localField

string

必須

参加する$sourceメッセージのフィールド。

foreignField

string

必須

結合するfromコレクション内のドキュメントのフィールド。

as

string

必須

入力ドキュメントに追加する新しい配列フィールドの名前。 この新しい配列フィールドには、 fromコレクションと一致するドキュメントが含まれます。 指定した名前がすでに入力ドキュメントのフィールドとして存在する場合、そのフィールドは上書きされます。

$cachedLookup は、$source からのメッセージと指定された Atlasコレクション内のドキュメントの左外部結合を実行します。このバージョンは、標準のMongoDBデータベースで使用できる $lookup ステージと同様に動作します。ただし、このバージョンでは、fromフィールドの値として接続レジストリから Atlasコレクションを指定する必要があります。

さらに、$cachedLookup は設定可能な期間、クエリの結果をキャッシュします。この機能を効率を向上させるために、変更の頻度の低いデータに適用するクエリに使用します。キャッシュされたエントリの TTL が経過すると、Atlas Stream Processing はそのエントリをエビクションします。新しいクエリを作成するときに、キャッシュされたエントリの合計サイズが maxMemoryUsageBytes と等しい場合、Atlas Stream Processing は新しいクエリをキャッシュスペースが利用可能になるまでエントリをエビクションします。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 humidity_descriptionsという名前のコレクションには次の形式のドキュメントが含まれています。

{
'dew_point': 16.2,
'relative_humidity': 79,
'condition': 'sticky, oppressive'
}

ここで、relative_humidityフィールドは温度(20 摂氏)での相対的な温度を表し、condition はその温度のレベルに適した言語記述子をリストします。$ cachedLookup ステージを使用して、気象よりも気象が受信するように推奨される記述子を使用して、ストリーミング気象レポートを強化できます。

次の集計には 4 つのステージがあります。

  1. ステージは Apache$source Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、フィールドはingestionTimeに設定されます。

  2. $cachedLookup ステージは、humidity_descriptionsデータベースのレコードを dewPointフィールドの気象レポートに結合します。各クエリには 5 minute TTL があり、Atlas Stream Processing は最大 200 MB の結果を保存します。

  3. $matchステージでは、 humidity_infoフィールドが空のドキュメントは除外され、 humidity_infoフィールドに入力されたドキュメントは次のステージに渡されます。

  4. $mergeステージは、 sample_weatherstreamデータベース内のenriched_streamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$cachedLookup': {
"ttl": {
"size": 5,
"unit": "minute"
},
"maxMemUsageBytes": 209715200,
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
},
{ '$match': { 'humidity_info': { '$ne': [] } } },
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

結果のsample_weatherstream.enriched_streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$lookup

項目一覧