Docs Menu
Docs Home
/ /
/ / /

$lookup (Procesamiento de flujo)

El La etapa$lookup realiza una unión externa izquierda del flujo de mensajes de su $source a una colección Atlas en su Registro de Conexión.

Dependiendo de su caso de uso, una etapa de canalización $lookup utiliza una de las siguientes tres sintaxis:

Para obtener más información, consulte Sintaxis de $lookup.

Advertencia

El uso de $lookup para enriquecer una transmisión puede reducir la velocidad de procesamiento de la transmisión.

El siguiente formulario prototipo ilustra todos los campos disponibles:

{
"$lookup": {
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"pipeline": [
<pipeline to run>
],
"as": "<output-array-field>",
"parallelism": <integer>,
"partitionBy": <expression>
}
}

La etapa $lookup procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

de

Documento

Condicional

Documento que especifica una colección en una base de datos Atlas para unirse a los mensajes de su. Debe especificar una colección solo desde su Registro de $source Conexión.

Si especifica este campo, debe especificar valores para todos los campos de este documento.

Este campo no es obligatorio si especifica un campo pipeline.

de.nombreDeConexión

string

Condicional

Nombre de la conexión en su Registro de conexión.

Este campo no es obligatorio si especifica un campo pipeline.

desde.db

string

Condicional

Nombre de la base de datos Atlas que contiene la colección a la que desea unirse.

Este campo no es obligatorio si especifica un campo pipeline.

de.coll

string

Condicional

Nombre de la colección a la que deseas unirte.

Este campo no es obligatorio si especifica un campo pipeline.

localField

string

Condicional

Campo de tus $source mensajes para unirte.

Este campo es parte de las siguientes sintaxis:

foreignField

string

Condicional

Campo de los documentos de la colección from al que unirse.

Este campo es parte de las siguientes sintaxis:

permitir

Documento

Condicional

Especifica las variables que se usarán en las etapas de la canalización. Para obtener más información, consulte let.

Este campo es parte de las siguientes sintaxis:

pipeline

Documento

Condicional

Especifica el pipeline que se ejecutará en la colección unida. Para obtener más información, consulte la canalización.

Este campo es parte de las siguientes sintaxis:

como

string

Requerido

Nombre del nuevo campo de matriz que se añadirá a los documentos de entrada. Este nuevo campo de matriz contiene los documentos coincidentes de la colección from. Si el nombre especificado ya existe como campo en el documento de entrada, se sobrescribirá.

parallelism

entero

Opcional

Número máximo de solicitudes paralelas realizadas al destino $lookup. Valores de paralelismo más altos pueden aumentar el rendimiento de $lookup. Sin embargo, valores más altos también pueden consumir más recursos en el clúster de destino $lookup.

Debe ser un número entero entre 1 y 16. El valor predeterminado es 1.

Cada procesador de flujo tiene un valor máximo de paralelismo acumulado, determinado por su nivel. El paralelismo acumulado de un procesador de flujo se calcula de la siguiente manera:

parallelism total - parallelized stages

Donde parallelism total es la suma de todos los parallelism valores mayores que 1 en las etapas,$source $lookup y $merge, y parallelized stages es el número de estas etapas con parallelism valores mayores 1 que.

Por ejemplo, si su etapa $source establece un valor parallelism de 4, su etapa $lookup no establece ningún valor parallelism (por lo tanto, el valor predeterminado es 1) y su etapa $merge establece un valor parallelism de 2, entonces tiene dos parallelized stages y el paralelismo acumulativo de su procesador de flujo se calcula como (4 + 2) - 2.

Si un procesador de flujo supera el paralelismo acumulado máximo para su nivel, Atlas Stream Processing genera un error y le informa del nivel mínimo de procesador necesario para el nivel de paralelismo deseado. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para obtener más información, consulte Procesamiento de flujo.

partitionBy

expresión

Opcional

Expresión utilizada para dividir el flujo de entrada en hilos paralelos. Atlas Stream Processing asigna cada documento de entrada con el mismo resultado de expresión partitionBy al mismo hilo.

Si no se especifica este campo, los documentos de entrada se envían a subprocesos paralelos mediante un proceso rotatorio.

La versión de $lookup con procesamiento de flujo Atlas realiza una unión externa izquierda de los mensajes de $source su y los documentos de una colección Atlas especificada. Esta versión funciona de forma similar a la $lookup etapa disponible en una base de datos MongoDB estándar. Sin embargo, esta versión requiere que se especifique una colección Atlas de su registro de conexión como valor para el from campo.

La canalización puede contener una etapa$lookupanidada. Si incluye una etapa$lookupanidada en su canalización, debe usar la sintaxis estándar from para especificar una colección en la misma conexión remota de Atlas que la etapa externa$lookup.

Ejemplo

$lookup : {
from: {connectionName: "dbsrv1", db: "db1", coll: "coll1"},
,
pipeline: [
,
{
$lookup: {
from: "coll2",
,
}
},
,
]
}

Si su canalización tiene $lookup y en la misma colección, los resultados de Atlas Stream Processing podrían variar si intenta mantener una vista incremental. Atlas $merge $lookup $merge Stream Processing procesa varios mensajes de origen simultáneamente y luego los fusiona. Si varios mensajes tienen el mismo ID, que utilizan y, Atlas Stream Processing podría devolver resultados que aún no se han materializado.

Ejemplo

Considere el siguiente flujo de entrada:

{ _id: 1, count: 2 }
{ _id: 1, count: 3 }

Supongamos que su consulta contiene lo siguiente dentro de la canalización:

{
...,
pipeline: [
{ $lookup on _id == foreignDoc._id from collection A }
{ $project: { _id: 1, count: $count + $foreignDoc.count } }
{ $merge: { into collection A } }
]
}

Si está intentando mantener una vista incremental, podría esperar un resultado similar al siguiente:

{ _id: 1, count: 5 }

Sin embargo, Atlas Stream Processing podría devolver un recuento de 5 o 3 dependiendo de si Atlas Stream Processing ha procesado los documentos.

Para obtener más información,$lookup consulte.

Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. Una colección denominada humidity_descriptions contiene documentos con el formato:

Donde el relative_humidity campo describe la humedad relativa a temperatura ambiente (20 Celsius) y condition enumera los descriptores verbales apropiados para ese nivel de humedad. Puede usar la etapa $lookup para enriquecer los informes meteorológicos en directo con descriptores sugeridos para que los meteorólogos los utilicen en sus transmisiones.

La siguiente agregación tiene cuatro etapas:

  1. La etapa establece una conexión $source con Apache KafkaEl agente recopila estos informes en un tema llamado my_weatherdata y expone cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndolo ingestionTime en.

  2. La etapa $lookup une los registros de la base de datos humidity_descriptions en los informes meteorológicos en el campo dewPoint.

  3. La $match etapa excluye los documentos que tienen un humidity_info campo vacío y pasa los documentos con un humidity_info campo completado a la siguiente etapa.

  4. La etapa escribe la salida en una colección de Atlas $merge llamada enriched_stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$lookup': {
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
}
{ '$match': { 'humidity_info': { '$ne': [] } } }
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.enriched_stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
}

Nota

El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.

Volver

$https

En esta página