Definición
La $cachedLookup stage realiza una unión externa izquierda del flujo de mensajes de tu $source a una colección de Atlas en tu Registro de Conexión.
Esta etapa funciona de manera similar a la etapa $lookup , pero almacena en caché los resultados de tus consultas según los parámetros configurables.
Importante
$cachedLookup no admite los campos let o pipeline.
Para obtener más información, consulte Sintaxis de $lookup.
El siguiente formulario prototipo ilustra todos los campos disponibles:
{ "$cachedLookup": { "ttl": { "size": <int>, "unit": "ms" | "second" | "minute" | "hour" | "day" }, "maxMemUsageBytes": <int>, "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "as": "<output-array-field>" } }
Sintaxis
El $cachedLookup utiliza algunos de los mismos campos que la versión generalizada de $lookup. $cachedLookup incluye campos para configurar el comportamiento del almacenamiento en caché de queries y proporciona una sintaxis modificada para el campo from para query datos a través de una conexión desde tu registro de conexiones.
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
ttl | Documento | Requerido | Documento que especifica el TTL de tus consultas en caché. |
tamaño ttl | Int | Requerido | Tamaño del TTL de las queries almacenadas en caché en |
ttl.unit | string | Requerido | Unidad de tiempo en la que medir el TTL de tus consultas guardadas en caché. Debe ser uno de los siguientes:
|
maxMemUsageBytes | Int | Requerido | Memoria máxima, en bytes, para asignar al almacenamiento en caché de consultas. Si el tamaño de la caché supera este valor, Atlas Stream Processing primero expulsa los resultados antiguos para liberar espacio. Si no hay suficientes resultados caducados para alcanzar este umbral, Atlas Stream Processing expulsa aleatoriamente las consultas almacenadas en caché hasta que el tamaño de la caché sea inferior al umbral. El valor predeterminado es el 10% de la RAM disponible en el espacio de trabajo de procesamiento de flujos. No se puede establecer |
de | Documento | Requerido | Documento que especifica una colección en una base de datos de Atlas a la cual unir los mensajes de tu Si especifica este campo, debe especificar valores para todos los campos de este documento. |
de.connectionName | string | Requerido | Nombre de la conexión en tu Registro de Conexiones. |
from.db | string | Requerido | Nombre de la base de datos Atlas que contiene la colección a la que desea unirse. |
from.coll | string | Requerido | Nombre de la colección a la que deseas unirte. |
localField | string | Requerido | Campo de tus mensajes |
foreignField | string | Requerido | Campo de documentos en la colección |
como | string | Requerido | Nombre del nuevo campo de matriz que se añadirá a los documentos de entrada. Este nuevo campo de matriz contiene los documentos coincidentes de la colección |
Comportamiento
$cachedLookup realiza una unión externa izquierda de los mensajes de su $source y los documentos de una colección Atlas especificada. Esta versión funciona de forma similar a la etapa disponible en una base de datos MongoDB estándar. Sin $lookup embargo, esta versión requiere que especifique una colección Atlas de su Registro de Conexión como valor para el from campo.
Además, $cachedLookup almacena en caché los resultados de sus consultas durante un tiempo configurable. Utilice esta funcionalidad para realizar consultas sobre datos que cambian con poca frecuencia y mejorar la eficiencia. Cuando el TTL de una entrada en caché expira, Atlas Stream Processing elimina dicha entrada. Si el tamaño total de las entradas almacenadas en caché es igual a maxMemoryUsageBytes cuando se realiza una nueva query, Atlas Stream Processing expulsa las entradas hasta que haya espacio para almacenar en caché la nueva query.
Ejemplos
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. Una colección llamada humidity_descriptions contiene documentos con la siguiente estructura:
{ 'dew_point': 16.2, 'relative_humidity': 79, 'condition': 'sticky, oppressive' }
Donde el relative_humidity campo describe la humedad relativa a temperatura ambiente (20 Celsius) y condition enumera los descriptores verbales apropiados para ese nivel de humedad. Puede usar la etapa $cachedLookup para enriquecer los informes meteorológicos en directo con descriptores sugeridos para que los meteorólogos los utilicen en sus transmisiones.
La siguiente agregación tiene cuatro etapas:
La etapa
$sourceestablece una conexión con Apache Kafka broker que recopila estos informes en un tema llamadomy_weatherdata, exponiendo cada registro a medida que se ingiere para las etapas de agregación subsecuentes. Esta etapa también sobrescribe el nombre del campo de timestamp que proyecta, configurándolo eningestionTime.La etapa
$cachedLookupune los registros de la base de datoshumidity_descriptionscon los informes meteorológicos en el campodewPoint. Cada query tiene un5 minuteTTL, y Atlas Stream Processing almacena hasta 200 MB de resultados.La etapa
$matchexcluye los documentos que tienen un campohumidity_infovacío y transfiere los documentos con un campohumidity_infopoblado a la siguiente etapa.La etapa escribe la salida en una colección de Atlas
$mergellamadaenriched_streamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$cachedLookup': { "ttl": { "size": 5, "unit": "minute" }, "maxMemUsageBytes": 209715200, from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } }, { '$match': { 'humidity_info': { '$ne': [] } } }, { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
Para ver los documentos en la colección sample_weatherstream.enriched_stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.