Definition
The $cachedLookup stage performs a left outer join of the
stream of messages from your $source to an Atlas
collection in your Connection Registry.
This stage functions similarly to the $lookup stage, but caches the results of your queries according to configurable parameters.
Important
$cachedLookup doesn't support the let or pipeline fields.
To learn more, see $lookup Syntax.
The following prototype form illustrates all available fields:
{ "$lookup": { "ttl": { "size": <int>, "unit": "ms" | "second" | "minute" | "hour" | "day" }, "maxMemUsageBytes": <int>, "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "as": "<output-array-field>" } }
Syntax
The $cachedLookup uses some of the same fields as the
generalized version of $lookup. $cachedLookup includes
fields for configuring query caching behavior, and provides a
modified syntax for the from field for querying data over a
connection from your connection
registry.
Field | Type | Necessity | Description |
|---|---|---|---|
ttl | document | Required | Document that specifies the TTL of your cached queries. |
ttl.size | int | Required | Size of the TTL of your cached queries in |
ttl.unit | string | Required | Unit of time in which to measure the TTL of your cached queries. Must be one of the following:
|
maxMemUsageBytes | int | Required | Maximum memory, in bytes, to allocate to query caching. If the size of the cache exceeds this value, Atlas Stream Processing first evicts older results to free space. If there are not enough expired results to get below this threshold, Atlas Stream Processing randomly evicts cached queries until the cache size is below the threshold. Defaults to 10% of the available RAM in your stream processing instance.
You can't set |
from | document | Required | Document that specifies a collection in an Atlas database
to join to messages from your If you specify this field, you must specify values for all fields in this document. |
from.connectionName | string | Required | Name of the connection in your Connection Registry. |
from.db | string | Required | Name of the Atlas database that contains the collection you want to join. |
from.coll | string | Required | Name of the collection you want to join. |
localField | string | Required | Field from your |
foreignField | string | Required | Field from documents in the |
as | string | Required | Name of the new array field to add to the input documents. This
new array field contains the matching documents from the
|
Behavior
$cachedLookup performs a left outer join of messages from
your $source and the documents in a specified Atlas
collection. This version behaves similarly to the $lookup
stage available in a standard MongoDB database. However, this version
requires that you specify an Atlas collection from your
Connection Registry as the value
for the from field.
Additionally, $cachedLookup caches the results of your queries for
a configurable length of time. Use this functionality for queries
against infrequently-changed data to improve efficiency. When a cached
entry's TTL elapses, Atlas Stream Processing evicts that entry. If the total size
of cached entries equals maxMemoryUsageBytes when you make a new
query, Atlas Stream Processing evicts entries until there is space to cache the
new query.
Examples
A streaming data source generates detailed weather reports from
various locations, conformant to the schema of the Sample
Weather Dataset. A collection named
humidity_descriptions contains documents of the form:
{ 'dew_point': 16.2, 'relative_humidity': 79, 'condition': 'sticky, oppressive' }
Where the relative_humidity field describes the relative humidity at
room temperature (20 Celsius), and condition lists verbal
descriptors appropriate to that level of humidity. You can use the
$cachedLookup stage to enrich the streaming
weather reports with suggested descriptors for meteorologists to use
in weather broadcasts.
The following aggregation has four stages:
The
$sourcestage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it toingestionTime.The
$cachedLookupstage joins the records from thehumidity_descriptionsdatabase into the weather reports on thedewPointfield. Each query has a5 minuteTTL, and Atlas Stream Processing stores up to 200 MB of results.The
$matchstage excludes documents that have an emptyhumidity_infofield, and passes along documents with a populatedhumidity_infofield to the next stage.The
$mergestage writes the output to an Atlas collection namedenriched_streamin thesample_weatherstreamdatabase. If no such database or collection exist, Atlas creates them.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$cachedLookup': { "ttl": { "size": 5, "unit": "minute" }, "maxMemUsageBytes": 209715200, from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } }, { '$match': { 'humidity_info': { '$ne': [] } } }, { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
To view the documents in the resulting sample_weatherstream.enriched_stream
collection, connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.