Definición
El $cachedLookup stage realiza una unión externa izquierda del flujo de mensajes de su $source a una colección Atlas en su Registro de Conexión.
Esta etapa funciona de manera similar a la etapa $lookup, pero almacena en caché los resultados de sus consultas según parámetros configurables.
Importante
$cachedLookup no admite los campos let o pipeline.
Para obtener más información, consulte Sintaxis de $lookup.
El siguiente formulario prototipo ilustra todos los campos disponibles:
{ "$cachedLookup": { "ttl": { "size": <int>, "unit": "ms" | "second" | "minute" | "hour" | "day" }, "maxMemUsageBytes": <int>, "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "as": "<output-array-field>" } }
Sintaxis
$cachedLookup utiliza algunos de los mismos campos que la versión generalizada $lookup de. $cachedLookup incluye campos para configurar el comportamiento del almacenamiento en caché de consultas y proporciona una sintaxis modificada para el from campo para consultar datos a través de una conexión desde su registro de conexión.
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
ttl | Documento | Requerido | Documento que especifica el TTL de sus consultas almacenadas en caché. |
tamaño ttl | Int | Requerido | Tamaño del TTL de sus consultas almacenadas en caché en |
unidad ttl | string | Requerido | Unidad de tiempo para medir el TTL de las consultas almacenadas en caché. Debe ser una de las siguientes:
|
bytes de uso de memoria máximos | Int | Requerido | Memoria máxima, en bytes, para asignar al almacenamiento en caché de consultas. Si el tamaño de la caché supera este valor, Atlas Stream Processing primero expulsa los resultados antiguos para liberar espacio. Si no hay suficientes resultados caducados para alcanzar este umbral, Atlas Stream Processing expulsa aleatoriamente las consultas almacenadas en caché hasta que el tamaño de la caché sea inferior al umbral. El valor predeterminado es el 10% de la RAM disponible en el espacio de trabajo de procesamiento de flujos. No se puede establecer |
de | Documento | Requerido | Documento que especifica una colección en una base de datos Atlas para unirse a los mensajes de su. Debe especificar una colección solo desde su Registro de Si especifica este campo, debe especificar valores para todos los campos de este documento. |
de.nombreDeConexión | string | Requerido | Nombre de la conexión en su Registro de conexión. |
desde.db | string | Requerido | Nombre de la base de datos Atlas que contiene la colección a la que desea unirse. |
de.coll | string | Requerido | Nombre de la colección a la que deseas unirte. |
localField | string | Requerido | Campo de tus |
foreignField | string | Requerido | Campo de los documentos de la colección |
como | string | Requerido | Nombre del nuevo campo de matriz que se añadirá a los documentos de entrada. Este nuevo campo de matriz contiene los documentos coincidentes de la colección |
Comportamiento
$cachedLookup realiza una unión externa izquierda de los mensajes de su $source y los documentos de una colección Atlas especificada. Esta versión funciona de forma similar a la etapa disponible en una base de datos MongoDB estándar. Sin $lookup embargo, esta versión requiere que especifique una colección Atlas de su Registro de Conexión como valor para el from campo.
Además, $cachedLookup almacena en caché los resultados de sus consultas durante un periodo configurable. Utilice esta función para consultas sobre datos que se modifican con poca frecuencia para mejorar la eficiencia. Cuando transcurre el tiempo de vida de una entrada en caché, Atlas Stream Processing la expulsa. Si el tamaño total de las entradas en caché es igual a maxMemoryUsageBytes al realizar una nueva consulta, Atlas Stream Processing las expulsa hasta que haya espacio para almacenarla en caché.
Ejemplos
Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. Una colección denominada humidity_descriptions contiene documentos con el formato:
{ 'dew_point': 16.2, 'relative_humidity': 79, 'condition': 'sticky, oppressive' }
Donde el relative_humidity campo describe la humedad relativa a temperatura ambiente (20 Celsius) y condition enumera los descriptores verbales apropiados para ese nivel de humedad. Puede usar la etapa $cachedLookup para enriquecer los informes meteorológicos en directo con descriptores sugeridos para que los meteorólogos los utilicen en sus transmisiones.
La siguiente agregación tiene cuatro etapas:
La etapa establece una conexión
$sourcecon Apache KafkaEl agente recopila estos informes en un tema llamadomy_weatherdatay expone cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndoloingestionTimeen.La etapa
$cachedLookupintegra los registros de la base de datoshumidity_descriptionscon los informes meteorológicos del campodewPoint. Cada consulta tiene un TTL de5 minutey Atlas Stream Processing almacena hasta 200 MB de resultados.La
$matchetapa excluye los documentos que tienen unhumidity_infocampo vacío y pasa los documentos con unhumidity_infocampo completado a la siguiente etapa.La etapa escribe la salida en una colección de Atlas
$mergellamadaenriched_streamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$cachedLookup': { "ttl": { "size": 5, "unit": "minute" }, "maxMemUsageBytes": 209715200, from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } }, { '$match': { 'humidity_info': { '$ne': [] } } }, { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
Para ver los documentos en la colección sample_weatherstream.enriched_stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], }
Nota
El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.