Definition
The $cachedLookup
stage performs a left outer join of the
stream of messages from your $source
to an Atlas
collection in your Connection Registry.
This stage functions similarly to the $lookup stage, but caches the results of your queries according to configurable parameters.
Important
$cachedLookup
doesn't support the let
or pipeline
fields.
To learn more, see $lookup Syntax.
The following prototype form illustrates all available fields:
{ "$lookup": { "ttl": { "size": <int>, "unit": "ms" | "second" | "minute" | "hour" | "day" }, "maxMemUsageBytes": <int>, "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "as": "<output-array-field>" } }
Syntax
The $cachedLookup
uses some of the same fields as the
generalized version of $lookup
. $cachedLookup
includes
fields for configuring query caching behavior, and provides a
modified syntax for the from
field for querying data over a
connection from your connection
registry.
Field | Type | Necessity | Description |
---|---|---|---|
ttl | document | Required | Document that specifies the TTL of your cached queries. |
ttl.size | int | Required | Size of the TTL of your cached queries in |
ttl.unit | string | Required | Unit of time in which to measure the TTL of your cached queries. Must be one of the following:
|
maxMemUsageBytes | int | Required | Maximum memory, in bytes, to allocate to query caching. If the size of the cache exceeds this value, Atlas Stream Processing first evicts older results to free space. If there are not enough expired results to get below this threshold, Atlas Stream Processing randomly evicts cached queries until the cache size is below the threshold. Defaults to 10% of the available RAM in your stream processing instance.
You can't set |
from | document | Required | Document that specifies a collection in an Atlas database
to join to messages from your If you specify this field, you must specify values for all fields in this document. |
from.connectionName | string | Required | Name of the connection in your Connection Registry. |
from.db | string | Required | Name of the Atlas database that contains the collection you want to join. |
from.coll | string | Required | Name of the collection you want to join. |
localField | string | Required | Field from your |
foreignField | string | Required | Field from documents in the |
as | string | Required | Name of the new array field to add to the input documents. This
new array field contains the matching documents from the
|
Behavior
$cachedLookup
performs a left outer join of messages from
your $source
and the documents in a specified Atlas
collection. This version behaves similarly to the $lookup
stage available in a standard MongoDB database. However, this version
requires that you specify an Atlas collection from your
Connection Registry as the value
for the from
field.
Additionally, $cachedLookup
caches the results of your queries for
a configurable length of time. Use this functionality for queries
against infrequently-changed data to improve efficiency. When a cached
entry's TTL elapses, Atlas Stream Processing evicts that entry. If the total size
of cached entries equals maxMemoryUsageBytes
when you make a new
query, Atlas Stream Processing evicts entries until there is space to cache the
new query.
Examples
A streaming data source generates detailed weather reports from
various locations, conformant to the schema of the Sample
Weather Dataset. A collection named
humidity_descriptions
contains documents of the form:
{ 'dew_point': 16.2, 'relative_humidity': 79, 'condition': 'sticky, oppressive' }
Where the relative_humidity
field describes the relative humidity at
room temperature (20 Celsius), and condition
lists verbal
descriptors appropriate to that level of humidity. You can use the
$cachedLookup stage to enrich the streaming
weather reports with suggested descriptors for meteorologists to use
in weather broadcasts.
The following aggregation has four stages:
The
$source
stage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata
, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it toingestionTime
.The
$cachedLookup
stage joins the records from thehumidity_descriptions
database into the weather reports on thedewPoint
field. Each query has a5 minute
TTL, and Atlas Stream Processing stores up to 200 MB of results.The
$match
stage excludes documents that have an emptyhumidity_info
field, and passes along documents with a populatedhumidity_info
field to the next stage.The
$merge
stage writes the output to an Atlas collection namedenriched_stream
in thesample_weatherstream
database. If no such database or collection exist, Atlas creates them.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$cachedLookup': { "ttl": { "size": 5, "unit": "minute" }, "maxMemUsageBytes": 209715200, from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } }, { '$match': { 'humidity_info': { '$ne': [] } } }, { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
To view the documents in the resulting sample_weatherstream.enriched_stream
collection, connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.