Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
Atlas
/ /

$cachedLookup

$cachedLookup

The $cachedLookup stage performs a left outer join of the stream of messages from your $source to an Atlas collection in your Connection Registry.

This stage functions similarly to the $lookup stage, but caches the results of your queries according to configurable parameters.

Important

$cachedLookup doesn't support the let or pipeline fields.

To learn more, see $lookup Syntax.

The following prototype form illustrates all available fields:

{
"$lookup": {
"ttl": {
"size": <int>,
"unit": "ms" | "second" | "minute" | "hour" | "day"
},
"maxMemUsageBytes": <int>,
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"as": "<output-array-field>"
}
}

The $cachedLookup uses some of the same fields as the generalized version of $lookup. $cachedLookup includes fields for configuring query caching behavior, and provides a modified syntax for the from field for querying data over a connection from your connection registry.

Field
Type
Necessity
Description

ttl

document

Required

Document that specifies the TTL of your cached queries.

ttl.size

int

Required

Size of the TTL of your cached queries in units.

ttl.unit

string

Required

Unit of time in which to measure the TTL of your cached queries. Must be one of the following:

  • "ms"

  • "second"

  • "minute"

  • "hour"

  • "day"

maxMemUsageBytes

int

Required

Maximum memory, in bytes, to allocate to query caching. If the size of the cache exceeds this value, Atlas Stream Processing first evicts older results to free space. If there are not enough expired results to get below this threshold, Atlas Stream Processing randomly evicts cached queries until the cache size is below the threshold.

Defaults to 10% of the available RAM in your stream processing instance. You can't set maxMemUsageBytes to more than 12.5% of the available RAM in your stream processing instance.

from

document

Required

Document that specifies a collection in an Atlas database to join to messages from your $source. You must specify a collection only from your Connection Registry.

If you specify this field, you must specify values for all fields in this document.

from.connectionName

string

Required

Name of the connection in your Connection Registry.

from.db

string

Required

Name of the Atlas database that contains the collection you want to join.

from.coll

string

Required

Name of the collection you want to join.

localField

string

Required

Field from your $source messages to join on.

foreignField

string

Required

Field from documents in the from collection to join on.

as

string

Required

Name of the new array field to add to the input documents. This new array field contains the matching documents from the from collection. If the specified name already exists as a field in the input document, that field is overwritten.

$cachedLookup performs a left outer join of messages from your $source and the documents in a specified Atlas collection. This version behaves similarly to the $lookup stage available in a standard MongoDB database. However, this version requires that you specify an Atlas collection from your Connection Registry as the value for the from field.

Additionally, $cachedLookup caches the results of your queries for a configurable length of time. Use this functionality for queries against infrequently-changed data to improve efficiency. When a cached entry's TTL elapses, Atlas Stream Processing evicts that entry. If the total size of cached entries equals maxMemoryUsageBytes when you make a new query, Atlas Stream Processing evicts entries until there is space to cache the new query.

A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. A collection named humidity_descriptions contains documents of the form:

{
'dew_point': 16.2,
'relative_humidity': 79,
'condition': 'sticky, oppressive'
}

Where the relative_humidity field describes the relative humidity at room temperature (20 Celsius), and condition lists verbal descriptors appropriate to that level of humidity. You can use the $cachedLookup stage to enrich the streaming weather reports with suggested descriptors for meteorologists to use in weather broadcasts.

The following aggregation has four stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it to ingestionTime.

  2. The $cachedLookup stage joins the records from the humidity_descriptions database into the weather reports on the dewPoint field. Each query has a 5 minute TTL, and Atlas Stream Processing stores up to 200 MB of results.

  3. The $match stage excludes documents that have an empty humidity_info field, and passes along documents with a populated humidity_info field to the next stage.

  4. The $merge stage writes the output to an Atlas collection named enriched_stream in the sample_weatherstream database. If no such database or collection exist, Atlas creates them.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$cachedLookup': {
"ttl": {
"size": 5,
"unit": "minute"
},
"maxMemUsageBytes": 209715200,
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
},
{ '$match': { 'humidity_info': { '$ne': [] } } },
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

To view the documents in the resulting sample_weatherstream.enriched_stream collection, connect to your Atlas cluster and run the following command:

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
}

Note

The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.

Back

$lookup

On this page