Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

$cachedLookup

$cachedLookup

La etapa$cachedLookuprealiza una unión externa izquierda del flujo de mensajes desde su$sourcea una colección Atlas en su Registro de conexión.

Esta etapa funciona de manera similar a la etapa $lookup , pero almacena en caché los resultados de tus consultas según los parámetros configurables.

Importante

$cachedLookup no admite los campos let o pipeline.

Para obtener más información, consulte la sintaxis de $lookup.

El siguiente formulario prototipo ilustra todos los campos disponibles:

{
"$cachedLookup": {
"ttl": {
"size": <int>,
"unit": "ms" | "second" | "minute" | "hour" | "day"
},
"maxMemUsageBytes": <int>,
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"as": "<output-array-field>"
}
}

$cachedLookup utiliza algunos de los mismos campos que la versión generalizada $lookup de. $cachedLookup incluye campos para configurar el comportamiento del almacenamiento en caché de consultas y proporciona una sintaxis modificada para el from campo para consultar datos a través de una conexión desde su registro de conexión.

Campo
Tipo
Necesidad
Descripción

ttl

Documento

Requerido

Documento que especifica el TTL de tus consultas en caché.

tamaño ttl

Int

Requerido

Tamaño del TTL de las queries almacenadas en caché en units.

ttl.unit

string

Requerido

Unidad de tiempo en la que medir el TTL de tus consultas guardadas en caché. Debe ser uno de los siguientes:

  • "ms"

  • "segundos"

  • "minuto"

  • "hora"

  • "día"

maxMemUsageBytes

Int

Requerido

Memoria máxima, en bytes, para asignar a la caché de consultas. Si el tamaño del caché supera este valor, Atlas Stream Processing primero expulsa los resultados más antiguos para liberar espacio. Si no hay suficientes resultados caducados para quedar por debajo de este umbral, Atlas Stream Processing descarta aleatoriamente las consultas en caché hasta que el tamaño de la caché esté por debajo del umbral.

Por defecto, se asigna 10% de la RAM disponible en el espacio de trabajo de Stream Processing. No puedes establecer maxMemUsageBytes en más del 12.5% de la RAM disponible en tu espacio de trabajo de Stream Processing.

de

Documento

Requerido

Documento que especifica una colección en una base de datos de Atlas a la cual unir los mensajes de tu $source. Debes especificar una colección únicamente desde tu Registro de conexiones.

Si especifica este campo, debe especificar valores para todos los campos de este documento.

de.connectionName

string

Requerido

Nombre de la conexión en tu Registro de Conexiones.

from.db

string

Requerido

Nombre de la base de datos Atlas que contiene la colección a la que desea unirse.

from.coll

string

Requerido

Nombre de la colección que deseas unir.

localField

string

Requerido

Campo de tus mensajes $source para realizar la unión.

foreignField

string

Requerido

Campo de documentos en la colección from para unir.

como

string

Requerido

Nombre del nuevo campo de arreglo que se añadirá a los documentos de entrada. Este nuevo campo de arreglo contiene los documentos coincidentes de la colección from. Si el nombre especificado ya existe como campo en el documento de entrada, ese campo se sobrescribe.

$cachedLookup realiza una unión externa izquierda de los mensajes de su $source y los documentos de una colección Atlas especificada. Esta versión funciona de forma similar a la etapa disponible en una base de datos MongoDB estándar. Sin $lookup embargo, esta versión requiere que especifique una colección Atlas de su Registro de Conexión como valor para el from campo.

Además, $cachedLookup almacena en caché los resultados de sus consultas durante un tiempo configurable. Utilice esta funcionalidad para realizar consultas sobre datos que cambian con poca frecuencia y mejorar la eficiencia. Cuando el TTL de una entrada en caché expira, Atlas Stream Processing elimina dicha entrada. Si el tamaño total de las entradas almacenadas en caché es igual a maxMemoryUsageBytes cuando se realiza una nueva query, Atlas Stream Processing expulsa las entradas hasta que haya espacio para almacenar en caché la nueva query.

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. Una colección llamada humidity_descriptions contiene documentos con la siguiente estructura:

{
'dew_point': 16.2,
'relative_humidity': 79,
'condition': 'sticky, oppressive'
}

Donde el relative_humidity campo describe la humedad relativa a temperatura ambiente (20 Celsius) y condition enumera los descriptores verbales apropiados para ese nivel de humedad. Puede usar la etapa $cachedLookup para enriquecer los informes meteorológicos en directo con descriptores sugeridos para que los meteorólogos los utilicen en sus transmisiones.

La siguiente agregación tiene cuatro etapas:

  1. La etapa $source establece una conexión con el Apache Kafka broker que recopila estos informes en un tema denominado my_weatherdata, exponiendo cada registro a medida que se procesa a las siguientes etapas de agregación. Este etapa también reemplaza el nombre del campo de marca de tiempo que proyecta, estableciéndolo en ingestionTime.

  2. La etapa $cachedLookup une los registros de la base de datos humidity_descriptions con los informes meteorológicos en el campo dewPoint. Cada query tiene un 5 minute TTL, y Atlas Stream Processing almacena hasta 200 MB de resultados.

  3. La etapa $match excluye los documentos que tienen un campo humidity_info vacío y transfiere los documentos con un campo humidity_info poblado a la siguiente etapa.

  4. La fase $merge escribe la salida en una colección Atlas denominada enriched_stream en la base de datos sample_weatherstream. Si no existe tal base de datos o colección, Atlas los creará.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$cachedLookup': {
"ttl": {
"size": 5,
"unit": "minute"
},
"maxMemUsageBytes": 209715200,
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
},
{ '$match': { 'humidity_info': { '$ne': [] } } },
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

Para ver los documentos de la colección sample_weatherstream.enriched_stream resultante, conéctate a un clúster de Atlas y ejecuta el siguiente comando:

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.