Menu Docs
Página inicial do Docs
/ /
/ / /

$cachedLookup

$cachedLookup

O estágio $cachedLookup realiza uma junção externa esquerda do fluxo de mensagens do seu $source para uma collection do Atlas no seu Registro de Conexão.

Este estágio funciona de forma semelhante ao estágio $lookup, mas armazena em cache os resultados de suas queries de acordo com parâmetros configuráveis.

Importante

$cachedLookup não suporta os campos let ou pipeline.

Para saber mais, consulte Sintaxe $lookup.

O seguinte formulário protótipo ilustra todos os campos disponíveis:

{
"$lookup": {
"ttl": {
"size": <int>,
"unit": "ms" | "second" | "minute" | "hour" | "day"
},
"maxMemUsageBytes": <int>,
"from": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>",
"coll": "<atlas-collection-name>"
},
"localField": "<field-in-source-messages>",
"foreignField": "<field-in-from-collection>",
"as": "<output-array-field>"
}
}

O $cachedLookup utiliza alguns dos mesmos campos que a versão geral do $lookup. $cachedLookup inclui campos para configurar o comportamento do cache de query e fornece uma sintaxe modificada para o campo from para query de dados por meio de uma conexão do seu registro de conexão.

Campo
Tipo
necessidade
Descrição

TTL

documento

Obrigatório

Documento que especifica o TTL das suas queries em cache.

ttl.size

int

Obrigatório

Tamanho do TTL das suas queries em cache no units.

ttl.unit

string

Obrigatório

Unidade de tempo na qual medir o TTL das suas queries em cache. Deve ser um dos seguintes:

  • "ms"

  • "segundo"

  • "minuto"

  • "hora"

  • "dia"

maxMemUsageBytes

int

Obrigatório

Memória máxima, em bytes, para alocar para cache de query. Se o tamanho do cache exceder esse valor, o Atlas Stream Processing primeiro removerá os resultados mais antigos para liberar espaço. Se não houver resultados expirados suficientes para ficar abaixo desse limite, o Atlas Stream Processing removerá aleatoriamente as queries em cache até que o tamanho do cache esteja abaixo do limite.

O padrão é 10% da RAM disponível na sua instância de processamento de fluxo. Não é possível definir maxMemUsageBytes para mais de 12.5% da RAM disponível em sua instância de processamento de fluxo.

from

documento

Obrigatório

Documento que especifica uma collection em um banco de dados Atlas para unir às mensagens do seu $source. Você deve especificar uma collection somente a partir do seu Registro de Conexão.

Se você especificar este campo, você deverá especificar valores para todos os campos neste documento.

from.connectionName

string

Obrigatório

Nome da conexão no registro de conexões.

from.db

string

Obrigatório

Nome do banco de dados do Atlas que contém a coleção que você deseja unir.

from.coll

string

Obrigatório

Nome da coleção da qual você deseja participar.

localField

string

Obrigatório

Campo a partir de suas mensagens $source do qual participar.

foreignField

string

Obrigatório

Campo de documentos na coleção from para participar.

como

string

Obrigatório

Nome do novo campo de array a ser adicionado aos documentos de entrada. Este novo campo de array contém os documentos correspondentes da coleção from. Se o nome especificado já existir como campo no documento de entrada, esse campo será substituído.

$cachedLookup realiza uma junção externa esquerda de mensagens de seu $source e dos documentos em uma Atlas collection especificada. Essa versão se comporta de forma semelhante ao estágio $lookup disponível em um banco de dados MongoDB padrão. No entanto, esta versão exige que você especifique uma coleção do Atlas do seu Registro de Conexão como o valor para o campo from.

Além disso, o $cachedLookup armazena em cache os resultados de suas queries por um período de tempo configurável. Use essa funcionalidade para queries em dados alterados com pouca frequência para melhorar a eficiência. Quando o TTL de uma entrada em cache termina, o Atlas Stream Processing despeja essa entrada. Se o tamanho total das entradas em cache for igual a maxMemoryUsageBytes quando você fizer uma nova query, o Atlas Stream Processing removerá as entradas até que haja espaço para armazenar em cache a nova query.

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. Uma coleção chamada humidity_descriptions contém documentos do formato:

{
'dew_point': 16.2,
'relative_humidity': 79,
'condition': 'sticky, oppressive'
}

Onde o campo relative_humidity descreve a Umidade relativa à temperatura ambiente (20 Celsius) e condition lista descritores verbais apropriados para esse nível de Umidade. Você pode usar o estágio $cachedLookup para enriquecer a transmissão de relatórios meteorológicos com descritores sugeridos para os meteorológicos usarem em gravações meteorológicas.

A seguinte agregação tem quatro fases:

  1. O estágio estabelece $source uma conexão com o broker do Apache Kafka que coleta esses relatórios em um tópico chamado my_weatherdata, expondo cada registro à medida que ele é ingerido aos estágios de agregação posteriores. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o como ingestionTime.

  2. O estágio $cachedLookup une os registros do banco de dados do humidity_descriptions nos relatórios meteorológicos no campo dewPoint. Cada query tem um 5 minute TTL e o Atlas Stream Processing armazena até 200 MB de resultados.

  3. A fase $match exclui documentos que têm um campo humidity_info vazio e passa documentos com um campo humidity_info preenchido para a próxima fase.

  4. O estágio $merge grava a saída na coleção do Atlas chamada enriched_stream no banco de dados sample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$cachedLookup': {
"ttl": {
"size": 5,
"unit": "minute"
},
"maxMemUsageBytes": 209715200,
from: {
connectionName: 'weatherStream',
db: 'humidity',
coll: 'humidity_descriptions'
},
'localField':'dewPoint.value',
'foreignField':'dewPoint',
'as': 'humidity_info'
}
},
{ '$match': { 'humidity_info': { '$ne': [] } } },
{
'$merge': {
into: {
connectionName: 'weatherStream',
db: 'sample_weatherstream',
coll: 'enriched_stream'
}
}
}

Para visualizar os documentos na coleção sample_weatherstream.enriched_stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:

db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{
st: 'x+55100+006100',
position: {
type: 'Point',
coordinates: [
92.7,
-53.6
]
},
elevation: 9999,
callLetters: 'UECN',
qualityControlProcess: 'V020',
dataSource: '4',
type: 'FM-13',
airTemperature: {
value: -11,
quality: '9'
},
dewPoint: {
value: 12.5,
quality: '1'
},
pressure: {
value: 1032.7,
quality: '9'
},
wind: {
direction: {
angle: 300,
quality: '9'
},
type: '9',
speed: {
rate: 23.6,
quality: '2'
}
},
visibility: {
distance: {
value: 14000,
quality: '1'
},
variability: {
value: 'N',
quality: '1'
}
},
skyCondition: {
ceilingHeight: {
value: 390,
quality: '9',
determination: 'C'
},
cavok: 'N'
},
sections: [
'SA1',
'AA1',
'OA1',
'AY1',
'AG1'
],
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 21
},
atmosphericPressureChange: {
tendency: {
code: '1',
quality: '1'
},
quantity3Hours: {
value: 5.5,
quality: '1'
},
quantity24Hours: {
value: 99.9,
quality: '9'
}
},
seaSurfaceTemperature: {
value: 1.3,
quality: '9'
},
waveMeasurement: {
method: 'M',
waves: {
period: 4,
height: 2.5,
quality: '9'
},
seaState: {
code: '00',
quality: '9'
}
},
pastWeatherObservationManual: {
atmosphericCondition: {
value: '4',
quality: '1'
},
period: {
value: 6,
quality: '1'
}
},
skyConditionObservation: {
totalCoverage: {
value: '07',
opaque: '99',
quality: '1'
},
lowestCloudCoverage: {
value: '06',
quality: '1'
},
lowCloudGenus: {
value: '07',
quality: '9'
},
lowestCloudBaseHeight: {
value: 2250,
quality: '9'
},
midCloudGenus: {
value: '07',
quality: '9'
},
highCloudGenus: {
value: '00',
quality: '1'
}
},
presentWeatherObservationManual: {
condition: '75',
quality: '1'
},
atmosphericPressureObservation: {
altimeterSetting: {
value: 9999.9,
quality: '9'
},
stationPressure: {
value: 1032.6,
quality: '1'
}
},
skyCoverLayer: {
coverage: {
value: '09',
quality: '1'
},
baseHeight: {
value: 240,
quality: '9'
},
cloudType: {
value: '99',
quality: '9'
}
},
liquidPrecipitation: {
period: 6,
depth: 3670,
condition: '9',
quality: '9'
},
extremeAirTemperature: {
period: 99.9,
code: 'N',
value: -30.9,
quantity: '9'
},
ingestionTime: ISODate('2024-09-19T20:04:34.346Z'),
humidity_info: [
{
_id: ObjectId('66ec805ad3cfbba767ebf7a5'),
dewPoint: 12.5,
relativeHumidity: 62,
condition: 'humid, muggy'
}
],
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

Voltar

$lookup

Nesta página