Definición
La La etapa $lookup realiza un join externo izquierdo de la secuencia de mensajes de tu $source a una colección de Atlas en tu Registro de Conexión.
Dependiendo de tu caso de uso, una etapa de pipeline $lookup utiliza una de las siguientes tres sintaxis:
Para obtener más información, consulte Sintaxis de $lookup.
Advertencia
El uso de $lookup para enriquecer un flujo puede reducir la velocidad de stream processing.
El siguiente formulario prototipo ilustra todos los campos disponibles:
{ "$lookup": { "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "pipeline": [ <pipeline to run> ], "as": "<output-array-field>", "parallelism": <integer>, "partitionBy": <expression> } }
Sintaxis
La etapa $lookup procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
de | Documento | Condicional | Documento que especifica una colección en una base de datos de Atlas a la cual unir los mensajes de tu Si especifica este campo, debe especificar valores para todos los campos de este documento. Este campo no es obligatorio si se especifica un campo |
de.connectionName | string | Condicional | Nombre de la conexión en tu Registro de Conexiones. Este campo no es obligatorio si se especifica un campo |
from.db | string | Condicional | Nombre de la base de datos Atlas que contiene la colección a la que desea unirse. Este campo no es obligatorio si se especifica un campo |
from.coll | string | Condicional | Nombre de la colección a la que deseas unirte. Este campo no es obligatorio si se especifica un campo |
localField | string | Condicional | Campo de tus mensajes Este campo forma parte de las siguientes sintaxis: |
foreignField | string | Condicional | Campo de documentos en la colección Este campo forma parte de las siguientes sintaxis: |
permitir | Documento | Condicional | |
pipeline | Documento | Condicional | Especifica el Este campo forma parte de las siguientes sintaxis: |
como | string | Requerido | Nombre del nuevo campo de matriz que se añadirá a los documentos de entrada. Este nuevo campo de matriz contiene los documentos coincidentes de la colección |
| entero | Opcional | Número máximo de solicitudes paralelas realizadas al destino Debe ser un número entero entre Cada procesador de flujo tiene un valor máximo de paralelismo acumulado determinado por su nivel. El paralelismo acumulativo de un procesador de flujo se calcula de la siguiente manera:
Donde Por ejemplo, si tu etapa Si un procesador de flujo supera el paralelismo acumulado máximo para su nivel, Atlas Stream Processing genera un error y le informa del nivel mínimo de procesador necesario para el nivel de paralelismo deseado. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para obtener más información, consulte Procesamiento de flujo. |
| expresión | Opcional | Expresión utilizada para dividir el flujo de entrada en hilos paralelos. Atlas Stream Processing asigna cada documento de entrada con el mismo resultado de expresión Si este campo no se especifica, los documentos de entrada se envían a subprocesos paralelos de manera secuencial. |
Comportamiento
La versión de Atlas Stream Processing de $lookup realiza una unión externa izquierda de mensajes de tu $source y los documentos en una colección de Atlas especificada. Esta versión se comporta de forma similar a la etapa $lookup disponible en una base de datos estándar de MongoDB. Sin embargo, esta versión requiere que especifiques una colección de Atlas desde tu Registro de conexiones como el valor para el campo from.
La canalización puede contener una etapa$lookupanidada. Si incluye una etapa$lookupanidada en su canalización, debe usar la sintaxis estándar from para especificar una colección en la misma conexión remota de Atlas que la etapa externa$lookup.
Ejemplo
$lookup : { from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"}, …, pipeline: [ …, { $lookup: { from: "coll2", …, } }, …, ] }
Si tu pipeline contiene tanto $lookup como $merge en la misma colección, los resultados de Atlas Stream Processing pueden variar si se intenta mantener una vista incremental. Atlas Stream Processing procesa varios mensajes fuente simultáneamente y luego los fusiona todos juntos. Si varios mensajes tienen el mismo ID, que tanto $lookup como $merge utilizan, Atlas Stream Processing puede devolver resultados que aún no se hayan materializado.
Ejemplo
Considere el siguiente flujo de entrada:
{ _id: 1, count: 2 } { _id: 1, count: 3 }
Supongamos que su consulta contiene lo siguiente dentro de la canalización:
{ ..., pipeline: [ { $lookup on _id == foreignDoc._id from collection A } { $project: { _id: 1, count: $count + $foreignDoc.count } } { $merge: { into collection A } } ] }
Si se intenta mantener una vista incremental, se podría esperar un resultado similar al siguiente:
{ _id: 1, count: 5 }
Sin embargo, Atlas Stream Processing podría devolver un recuento de 5 o 3 dependiendo de si Atlas Stream Processing ha procesado los documentos.
Para obtener más información, consulta en $lookup.
Ejemplos
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. Una colección llamada humidity_descriptions contiene documentos con la siguiente estructura:
Donde el campo relative_humidity describe la humedad relativa a temperatura ambiente (20 Celsius), y condition enumera descriptores verbales adecuados para ese nivel de humedad. Puedes usar la etapa $lookup para enriquecer los informes meteorológicos en transmisión con sugerencias de descriptores para que los meteorólogos usen en las emisiones meteorológicas.
La siguiente agregación tiene cuatro etapas:
La etapa
$sourceestablece una conexión con Apache Kafka broker que recopila estos informes en un tema llamadomy_weatherdata, exponiendo cada registro a medida que se ingiere para las etapas de agregación subsecuentes. Esta etapa también sobrescribe el nombre del campo de timestamp que proyecta, configurándolo eningestionTime.La etapa
$lookupfusiona los registros de la base de datoshumidity_descriptionsen los informes meteorológicos del campodewPoint.La etapa
$matchexcluye los documentos que tienen un campohumidity_infovacío y transfiere los documentos con un campohumidity_infopoblado a la siguiente etapa.La etapa escribe la salida en una colección de Atlas
$mergellamadaenriched_streamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$lookup': { from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } } { '$match': { 'humidity_info': { '$ne': [] } } } { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
Para ver los documentos en la colección sample_weatherstream.enriched_stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.