Definición
El La etapa$lookup realiza una unión externa izquierda del flujo de mensajes de su $source a una colección Atlas en su Registro de Conexión.
Dependiendo de su caso de uso, una etapa de canalización $lookup utiliza una de las siguientes tres sintaxis:
Para obtener más información, consulte Sintaxis de $lookup.
Advertencia
El uso de $lookup para enriquecer una transmisión puede reducir la velocidad de procesamiento de la transmisión.
El siguiente formulario prototipo ilustra todos los campos disponibles:
{ "$lookup": { "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "pipeline": [ <pipeline to run> ], "as": "<output-array-field>", "parallelism": <integer>, "partitionBy": <expression> } }
Sintaxis
La etapa $lookup procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
de | Documento | Condicional | Documento que especifica una colección en una base de datos Atlas para unirse a los mensajes de su. Debe especificar una colección solo desde su Registro de Si especifica este campo, debe especificar valores para todos los campos de este documento. Este campo no es obligatorio si especifica un campo |
de.nombreDeConexión | string | Condicional | Nombre de la conexión en su Registro de conexión. Este campo no es obligatorio si especifica un campo |
desde.db | string | Condicional | Nombre de la base de datos Atlas que contiene la colección a la que desea unirse. Este campo no es obligatorio si especifica un campo |
de.coll | string | Condicional | Nombre de la colección a la que deseas unirte. Este campo no es obligatorio si especifica un campo |
localField | string | Condicional | Campo de tus Este campo es parte de las siguientes sintaxis: |
foreignField | string | Condicional | Campo de los documentos de la colección Este campo es parte de las siguientes sintaxis: |
permitir | Documento | Condicional | Especifica las variables que se usarán en las etapas de la canalización. Para obtener más información, consulte let. Este campo es parte de las siguientes sintaxis: |
pipeline | Documento | Condicional | Especifica el Este campo es parte de las siguientes sintaxis: |
como | string | Requerido | Nombre del nuevo campo de matriz que se añadirá a los documentos de entrada. Este nuevo campo de matriz contiene los documentos coincidentes de la colección |
| entero | Opcional | Número máximo de solicitudes paralelas realizadas al destino Debe ser un número entero entre Cada procesador de flujo tiene un valor máximo de paralelismo acumulado, determinado por su nivel. El paralelismo acumulado de un procesador de flujo se calcula de la siguiente manera:
Donde Por ejemplo, si su etapa Si un procesador de flujo supera el paralelismo acumulado máximo para su nivel, Atlas Stream Processing genera un error y le informa del nivel mínimo de procesador necesario para el nivel de paralelismo deseado. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para obtener más información, consulte Procesamiento de flujo. |
| expresión | Opcional | Expresión utilizada para dividir el flujo de entrada en hilos paralelos. Atlas Stream Processing asigna cada documento de entrada con el mismo resultado de expresión Si no se especifica este campo, los documentos de entrada se envían a subprocesos paralelos mediante un proceso rotatorio. |
Comportamiento
La versión de $lookup con procesamiento de flujo Atlas realiza una unión externa izquierda de los mensajes de $source su y los documentos de una colección Atlas especificada. Esta versión funciona de forma similar a la $lookup etapa disponible en una base de datos MongoDB estándar. Sin embargo, esta versión requiere que se especifique una colección Atlas de su registro de conexión como valor para el from campo.
La canalización puede contener una etapa$lookupanidada. Si incluye una etapa$lookupanidada en su canalización, debe usar la sintaxis estándar from para especificar una colección en la misma conexión remota de Atlas que la etapa externa$lookup.
Ejemplo
$lookup : { from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"}, …, pipeline: [ …, { $lookup: { from: "coll2", …, } }, …, ] }
Si su canalización tiene $lookup y en la misma colección, los resultados de Atlas Stream Processing podrían variar si intenta mantener una vista incremental. Atlas $merge $lookup $merge Stream Processing procesa varios mensajes de origen simultáneamente y luego los fusiona. Si varios mensajes tienen el mismo ID, que utilizan y, Atlas Stream Processing podría devolver resultados que aún no se han materializado.
Ejemplo
Considere el siguiente flujo de entrada:
{ _id: 1, count: 2 } { _id: 1, count: 3 }
Supongamos que su consulta contiene lo siguiente dentro de la canalización:
{ ..., pipeline: [ { $lookup on _id == foreignDoc._id from collection A } { $project: { _id: 1, count: $count + $foreignDoc.count } } { $merge: { into collection A } } ] }
Si está intentando mantener una vista incremental, podría esperar un resultado similar al siguiente:
{ _id: 1, count: 5 }
Sin embargo, Atlas Stream Processing podría devolver un recuento de 5 o 3 dependiendo de si Atlas Stream Processing ha procesado los documentos.
Para obtener más información,$lookup consulte.
Ejemplos
Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. Una colección denominada humidity_descriptions contiene documentos con el formato:
Donde el relative_humidity campo describe la humedad relativa a temperatura ambiente (20 Celsius) y condition enumera los descriptores verbales apropiados para ese nivel de humedad. Puede usar la etapa $lookup para enriquecer los informes meteorológicos en directo con descriptores sugeridos para que los meteorólogos los utilicen en sus transmisiones.
La siguiente agregación tiene cuatro etapas:
La etapa establece una conexión
$sourcecon Apache KafkaEl agente recopila estos informes en un tema llamadomy_weatherdatay expone cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndoloingestionTimeen.La etapa
$lookupune los registros de la base de datoshumidity_descriptionsen los informes meteorológicos en el campodewPoint.La
$matchetapa excluye los documentos que tienen unhumidity_infocampo vacío y pasa los documentos con unhumidity_infocampo completado a la siguiente etapa.La etapa escribe la salida en una colección de Atlas
$mergellamadaenriched_streamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$lookup': { from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } } { '$match': { 'humidity_info': { '$ne': [] } } } { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
Para ver los documentos en la colección sample_weatherstream.enriched_stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], }
Nota
El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.