Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /
/ / /

$lookup (Procesamiento de flujo)

La La etapa $lookup realiza un join externo izquierdo de la secuencia de mensajes de tu $source a una colección de Atlas en tu Registro de Conexión.

Dependiendo de tu caso de uso, una etapa de pipeline $lookup utiliza una de las siguientes tres sintaxis:

Para obtener más información, consulte Sintaxis de $lookup.

Advertencia

El uso de $lookup para enriquecer un flujo puede reducir la velocidad de stream processing.

El siguiente formulario prototipo ilustra todos los campos disponibles:

{
"$lookup": {
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"pipeline": [
<pipeline to run>
],
"as": "<output-array-field>",
"parallelism": <integer>,
"partitionBy": <expression>
}
}

La etapa $lookup procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

de

Documento

Condicional

Documento que especifica una colección en una base de datos de Atlas a la cual unir los mensajes de tu $source. Debes especificar una colección únicamente desde tu Registro de conexiones.

Si especifica este campo, debe especificar valores para todos los campos de este documento.

Este campo no es obligatorio si se especifica un campo pipeline.

de.connectionName

string

Condicional

Nombre de la conexión en tu Registro de Conexiones.

Este campo no es obligatorio si se especifica un campo pipeline.

from.db

string

Condicional

Nombre de la base de datos Atlas que contiene la colección a la que desea unirse.

Este campo no es obligatorio si se especifica un campo pipeline.

from.coll

string

Condicional

Nombre de la colección a la que deseas unirte.

Este campo no es obligatorio si se especifica un campo pipeline.

localField

string

Condicional

Campo de tus mensajes $source para realizar la unión.

Este campo forma parte de las siguientes sintaxis:

foreignField

string

Condicional

Campo de documentos en la colección from para unir.

Este campo forma parte de las siguientes sintaxis:

permitir

Documento

Condicional

Especifica las variables a utilizar en las etapas de pipeline. Para obtener más información, consulte Let.

Este campo forma parte de las siguientes sintaxis:

pipeline

Documento

Condicional

Especifica el pipeline para ejecutar en la colección unida. Para obtener más información, consulta pipeline.

Este campo forma parte de las siguientes sintaxis:

como

string

Requerido

Nombre del nuevo campo de matriz que se añadirá a los documentos de entrada. Este nuevo campo de matriz contiene los documentos coincidentes de la colección from. Si el nombre especificado ya existe como campo en el documento de entrada, se sobrescribirá.

parallelism

entero

Opcional

Número máximo de solicitudes paralelas realizadas al destino $lookup. Valores de paralelismo más altos pueden aumentar el rendimiento de $lookup. Sin embargo, valores más altos también pueden consumir más recursos en el clúster de destino $lookup.

Debe ser un número entero entre 1 y 16. Por defecto es 1.

Cada procesador de flujo tiene un valor máximo de paralelismo acumulado determinado por su nivel. El paralelismo acumulativo de un procesador de flujo se calcula de la siguiente manera:

parallelism total - parallelized stages

Donde parallelism total es la suma de todos los valores parallelism mayores que 1 a través de las etapas $source, $lookup, y $merge, y parallelized stages es la cantidad de estas etapas con valores parallelism mayores que 1.

Por ejemplo, si tu etapa $source establece un valor parallelism de 4, tu etapa $lookup no define un valor parallelism (por lo tanto, se toma el valor por defecto 1), y tu etapa $merge define un valor parallelism de 2, entonces tiene dos parallelized stages y el paralelismo acumulativo de su procesador de flujo se calcula como (4 + 2) - 2.

Si un procesador de flujo supera el paralelismo acumulado máximo para su nivel, Atlas Stream Processing genera un error y le informa del nivel mínimo de procesador necesario para el nivel de paralelismo deseado. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para obtener más información, consulte Procesamiento de flujo.

partitionBy

expresión

Opcional

Expresión utilizada para dividir el flujo de entrada en hilos paralelos. Atlas Stream Processing asigna cada documento de entrada con el mismo resultado de expresión partitionBy al mismo hilo.

Si este campo no se especifica, los documentos de entrada se envían a subprocesos paralelos de manera secuencial.

La versión de Atlas Stream Processing de $lookup realiza una unión externa izquierda de mensajes de tu $source y los documentos en una colección de Atlas especificada. Esta versión se comporta de forma similar a la etapa $lookup disponible en una base de datos estándar de MongoDB. Sin embargo, esta versión requiere que especifiques una colección de Atlas desde tu Registro de conexiones como el valor para el campo from.

La canalización puede contener una etapa$lookupanidada. Si incluye una etapa$lookupanidada en su canalización, debe usar la sintaxis estándar from para especificar una colección en la misma conexión remota de Atlas que la etapa externa$lookup.

Ejemplo

$lookup : {
from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"},
,
pipeline: [
,
{
$lookup: {
from: "coll2",
,
}
},
,
]
}

Si tu pipeline contiene tanto $lookup como $merge en la misma colección, los resultados de Atlas Stream Processing pueden variar si se intenta mantener una vista incremental. Atlas Stream Processing procesa varios mensajes fuente simultáneamente y luego los fusiona todos juntos. Si varios mensajes tienen el mismo ID, que tanto $lookup como $merge utilizan, Atlas Stream Processing puede devolver resultados que aún no se hayan materializado.

Ejemplo

Considere el siguiente flujo de entrada:

{ _id: 1, count: 2 }
{ _id: 1, count: 3 }

Supongamos que su consulta contiene lo siguiente dentro de la canalización:

{
...,
pipeline: [
{ $lookup on _id == foreignDoc._id from collection A }
{ $project: { _id: 1, count: $count + $foreignDoc.count } }
{ $merge: { into collection A } }
]
}

Si se intenta mantener una vista incremental, se podría esperar un resultado similar al siguiente:

{ _id: 1, count: 5 }

Sin embargo, Atlas Stream Processing podría devolver un recuento de 5 o 3 dependiendo de si Atlas Stream Processing ha procesado los documentos.

Para obtener más información, consulta en $lookup.

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. Una colección llamada humidity_descriptions contiene documentos con la siguiente estructura:

Donde el campo relative_humidity describe la humedad relativa a temperatura ambiente (20 Celsius), y condition enumera descriptores verbales adecuados para ese nivel de humedad. Puedes usar la etapa $lookup para enriquecer los informes meteorológicos en transmisión con sugerencias de descriptores para que los meteorólogos usen en las emisiones meteorológicas.

La siguiente agregación tiene cuatro etapas:

  1. La etapa $source establece una conexión con Apache Kafka broker que recopila estos informes en un tema llamado my_weatherdata, exponiendo cada registro a medida que se ingiere para las etapas de agregación subsecuentes. Esta etapa también sobrescribe el nombre del campo de timestamp que proyecta, configurándolo en ingestionTime.

  2. La etapa $lookup fusiona los registros de la base de datos humidity_descriptions en los informes meteorológicos del campo dewPoint.

  3. La etapa $match excluye los documentos que tienen un campo humidity_info vacío y transfiere los documentos con un campo humidity_info poblado a la siguiente etapa.

  4. La etapa escribe la salida en una colección de Atlas $merge llamada enriched_stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$lookup': {
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
}
{ '$match': { 'humidity_info': { '$ne': [] } } }
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.enriched_stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.

Volver

$https

En esta página