Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$lookup

On this page

  • Definition
  • Syntax
  • Behavior
  • Examples

The $lookup stage performs a left outer join of the stream of messages from your $source to an Atlas collection in your Connection Registry.

Depending on your use case, a $lookup pipeline stage uses one of the following three syntaxes:

To learn more, see $lookup Syntax.

Warning

Using $lookup to enrich a stream can reduce stream processing speed.

The following prototype form illustrates all available fields:

{
"$lookup": {
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
…,
<var_n>: <expression>
},
"pipeline": [
<pipeline to run>
],
"as": "<output-array-field>"
}
}

The $lookup stage takes a document with the following fields:

Field
Type
Necessity
Description
from
document
Required
Document that specifies a collection in an Atlas database to join to messages from your $source. You must specify a collection from your Connection Registry. You must specify a value for all fields in this document.
from.connectionName
string
Required
Name of the connection in your Connection Registry.
from.db
string
Required
Name of the Atlas database that contains the collection you want to join.
from.coll
string
Required
Name of the collection you want to join.
localField
string
Conditional

Field from your $source messages to join on.

This field is part of the following syntaxes:

foreignField
string
Conditional

Field from documents in the from collection to join on.

This field is part of the following syntaxes:

let
document
Conditional

Specifies the variables to use in the pipeline stages. To learn more, see let.

This field is part of the following syntaxes:

pipeline
document
Conditional

Specifies the pipeline to run on the joined collection. To learn more, see pipeline.

This field is part of the following syntaxes:

as
string
Required
Name of the new array field to add to the input documents. This new array field contains the matching documents from the from collection. If the specified name already exists as a field in the input document, that field is overwritten.

The Atlas Stream Processing version of $lookup performs a left outer join of messages from your $source and the documents in a specified Atlas collection. This version behaves similarly to the $lookup stage available in a standard MongoDB database. However, this version requires that you specify an Atlas collection from your Connection Registry as the value for the from field.

The pipeline can contain a nested $lookup stage. If you include a nested $lookup stage in your pipeline, you must use the standard from syntax to specify a collection in the same remote Atlas connection as the outer $lookup stage.

Example

$lookup : {
from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"},
…,
pipeline: [
…,
{
$lookup: {
from: "coll2",
…,
}
},
…,
]
}

If your pipeline has both $lookup and $merge on the same collection, Atlas Stream Processing results might vary if you try to maintain an incremental view. Atlas Stream Processing processes multiple source messages simultaneously and then merges them all together. If multiple messages have the same ID, which both $lookup and $merge use, Atlas Stream Processing might return results that haven't yet materialized.

Example

Consider the following input stream:

{ _id: 1, count: 2 }
{ _id: 1, count: 3 }

Suppose your query contains the following inside the pipeline:

{
...,
pipeline: [
{ $lookup on _id == foreignDoc._id from collection A }
{ $project: { _id: 1, count: $count + $foreignDoc.count } }
{ $merge: { into collection A } }
]
}

If you are trying to maintain an incremental view, you might expect a result similar to the following:

{ _id: 1, count: 5 }

However, Atlas Stream Processing might return a count of 5 or 3 depending on whether Atlas Stream Processing has processed the documents.

For more information, see $lookup.

A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. A collection named humidity_descriptions contains documents of the form:

Where the relative_humidity field describes the relative humidity at room temperature (20 Celsius), and condition lists verbal descriptors appropriate to that level of humidity. You can use the $lookup stage to enrich the streaming weather reports with suggested descriptors for meteorologists to use in weather broadcasts.

The following aggregation has four stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it to ingestionTime.

  2. The $lookup stage joins the records from the humidity_descriptions database into the weather reports on the dewPoint field.

  3. The $match stage excludes documents that have an empty humidity_info field, and passes along documents with a populated humidity_info field to the next stage.

  4. The $merge stage writes the output to an Atlas collection named enriched_stream in the sample_weatherstream database. If no such database or collection exist, Atlas creates them.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$lookup': {
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
}
{ '$match': { 'humidity_info': { '$ne': [] } } }
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

To view the documents in the resulting sample_weatherstream.enriched_stream collection, connect to your Atlas cluster and run the following command:

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: 2055
}
}
}

Note

The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.

Back

$validate