$lookup
On this page
Definition
The $lookup stage performs a left outer
join of the stream of messages from your $source
to an
Atlas collection in your Connection Registry.
Depending on your use case, a $lookup
pipeline stage uses one of
the following three syntaxes:
To learn more, see $lookup Syntax.
Warning
Using $lookup
to enrich a stream can reduce stream processing
speed.
The following prototype form illustrates all available fields:
{ "$lookup": { "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "pipeline": [ <pipeline to run> ], "as": "<output-array-field>" } }
Syntax
The $lookup
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
from | document | Required | Document that specifies a collection in an Atlas database
to join to messages from your $source . You must
specify a collection from your Connection Registry. You must
specify a value for all fields in this document. |
from.connectionName | string | Required | Name of the connection in your Connection Registry. |
from.db | string | Required | Name of the Atlas database that contains the
collection you want to join. |
from.coll | string | Required | Name of the collection you want to join. |
localField | string | Conditional | Field from your This field is part of the following syntaxes: |
foreignField | string | Conditional | Field from documents in the This field is part of the following syntaxes: |
let | document | Conditional | |
pipeline | document | Conditional | Specifies the This field is part of the following syntaxes: |
as | string | Required | Name of the new array field to add to the input documents. This
new array field contains the matching documents from the
from collection. If the specified name already exists as a
field in the input document, that field is overwritten. |
Behavior
The Atlas Stream Processing version of $lookup
performs a left outer join of messages from your $source
and the
documents in a specified Atlas collection. This version behaves
similarly to the $lookup
stage available in a standard
MongoDB database. However, this version requires that you specify an
Atlas collection from your
Connection Registry as the value for
the from
field.
The pipeline can contain a nested
$lookup
stage. If you include a nested $lookup
stage in your pipeline, you must use the standard from
syntax to
specify a collection in the same remote Atlas connection as the
outer $lookup
stage.
Example
$lookup : { from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"}, …, pipeline: [ …, { $lookup: { from: "coll2", …, } }, …, ] }
If your pipeline has both $lookup
and $merge
on the same collection, Atlas Stream Processing results might vary if you
try to maintain an incremental view. Atlas Stream Processing processes
multiple source messages simultaneously and then merges them all
together. If multiple messages have the same ID, which both
$lookup
and $merge
use, Atlas Stream Processing might
return results that haven't yet materialized.
Example
Consider the following input stream:
{ _id: 1, count: 2 } { _id: 1, count: 3 }
Suppose your query contains the following inside the pipeline:
{ ..., pipeline: [ { $lookup on _id == foreignDoc._id from collection A } { $project: { _id: 1, count: $count + $foreignDoc.count } } { $merge: { into collection A } } ] }
If you are trying to maintain an incremental view, you might expect a result similar to the following:
{ _id: 1, count: 5 }
However, Atlas Stream Processing might return a count of 5
or 3
depending on whether Atlas Stream Processing has processed the documents.
For more information, see $lookup
.
Examples
A streaming data source generates detailed weather reports from
various locations, conformant to the schema of the Sample
Weather Dataset. A collection named
humidity_descriptions
contains documents of the form:
Where the relative_humidity
field describes the relative humidity at
room temperature (20 Celsius), and condition
lists verbal
descriptors appropriate to that level of humidity. You can use the
$lookup stage to enrich the streaming
weather reports with suggested descriptors for meteorologists to use
in weather broadcasts.
The following aggregation has four stages:
The
$source
stage establishes a connection with the Apache Kafka broker collecting these reports in a topic namedmy_weatherdata
, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it toingestionTime
.The
$lookup
stage joins the records from thehumidity_descriptions
database into the weather reports on thedewPoint
field.The
$match
stage excludes documents that have an emptyhumidity_info
field, and passes along documents with a populatedhumidity_info
field to the next stage.The
$merge
stage writes the output to an Atlas collection namedenriched_stream
in thesample_weatherstream
database. If no such database or collection exist, Atlas creates them.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$lookup': { from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } } { '$match': { 'humidity_info': { '$ne': [] } } } { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
To view the documents in the resulting sample_weatherstream.enriched_stream
collection, connect to your Atlas cluster and run the following command:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: 2055 } } }
Note
The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.