Definição
O estágio $cachedLookup
realiza uma junção externa esquerda do fluxo de mensagens do seu $source
para uma collection do Atlas no seu Registro de Conexão.
Este estágio funciona de forma semelhante ao estágio $lookup, mas armazena em cache os resultados de suas queries de acordo com parâmetros configuráveis.
Importante
$cachedLookup
não suporta os campos let
ou pipeline
.
Para saber mais, consulte Sintaxe $lookup.
O seguinte formulário protótipo ilustra todos os campos disponíveis:
{ "$lookup": { "ttl": { "size": <int>, "unit": "ms" | "second" | "minute" | "hour" | "day" }, "maxMemUsageBytes": <int>, "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "as": "<output-array-field>" } }
Sintaxe
O $cachedLookup
utiliza alguns dos mesmos campos que a versão geral do $lookup
. $cachedLookup
inclui campos para configurar o comportamento do cache de query e fornece uma sintaxe modificada para o campo from
para query de dados por meio de uma conexão do seu registro de conexão.
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
TTL | documento | Obrigatório | Documento que especifica o TTL das suas queries em cache. |
ttl.size | int | Obrigatório | Tamanho do TTL das suas queries em cache no |
ttl.unit | string | Obrigatório | Unidade de tempo na qual medir o TTL das suas queries em cache. Deve ser um dos seguintes:
|
maxMemUsageBytes | int | Obrigatório | Memória máxima, em bytes, para alocar para cache de query. Se o tamanho do cache exceder esse valor, o Atlas Stream Processing primeiro removerá os resultados mais antigos para liberar espaço. Se não houver resultados expirados suficientes para ficar abaixo desse limite, o Atlas Stream Processing removerá aleatoriamente as queries em cache até que o tamanho do cache esteja abaixo do limite. O padrão é 10% da RAM disponível na sua instância de processamento de fluxo. Não é possível definir |
from | documento | Obrigatório | Documento que especifica uma collection em um banco de dados Atlas para unir às mensagens do seu Se você especificar este campo, você deverá especificar valores para todos os campos neste documento. |
from.connectionName | string | Obrigatório | Nome da conexão no registro de conexões. |
from.db | string | Obrigatório | Nome do banco de dados do Atlas que contém a coleção que você deseja unir. |
from.coll | string | Obrigatório | Nome da coleção da qual você deseja participar. |
localField | string | Obrigatório | Campo a partir de suas mensagens |
foreignField | string | Obrigatório | Campo de documentos na coleção |
como | string | Obrigatório | Nome do novo campo de array a ser adicionado aos documentos de entrada. Este novo campo de array contém os documentos correspondentes da coleção |
Comportamento
$cachedLookup
realiza uma junção externa esquerda de mensagens de seu $source
e dos documentos em uma Atlas collection especificada. Essa versão se comporta de forma semelhante ao estágio $lookup
disponível em um banco de dados MongoDB padrão. No entanto, esta versão exige que você especifique uma coleção do Atlas do seu Registro de Conexão como o valor para o campo from
.
Além disso, o $cachedLookup
armazena em cache os resultados de suas queries por um período de tempo configurável. Use essa funcionalidade para queries em dados alterados com pouca frequência para melhorar a eficiência. Quando o TTL de uma entrada em cache termina, o Atlas Stream Processing despeja essa entrada. Se o tamanho total das entradas em cache for igual a maxMemoryUsageBytes
quando você fizer uma nova query, o Atlas Stream Processing removerá as entradas até que haja espaço para armazenar em cache a nova query.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. Uma coleção chamada humidity_descriptions
contém documentos do formato:
{ 'dew_point': 16.2, 'relative_humidity': 79, 'condition': 'sticky, oppressive' }
Onde o campo relative_humidity
descreve a Umidade relativa à temperatura ambiente (20 Celsius) e condition
lista descritores verbais apropriados para esse nível de Umidade. Você pode usar o estágio $cachedLookup para enriquecer a transmissão de relatórios meteorológicos com descritores sugeridos para os meteorológicos usarem em gravações meteorológicas.
A seguinte agregação tem quatro fases:
O estágio estabelece
$source
uma conexão com o broker do Apache Kafka que coleta esses relatórios em um tópico chamadomy_weatherdata
, expondo cada registro à medida que ele é ingerido aos estágios de agregação posteriores. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime
.O estágio
$cachedLookup
une os registros do banco de dados dohumidity_descriptions
nos relatórios meteorológicos no campodewPoint
. Cada query tem um5 minute
TTL e o Atlas Stream Processing armazena até 200 MB de resultados.A fase
$match
exclui documentos que têm um campohumidity_info
vazio e passa documentos com um campohumidity_info
preenchido para a próxima fase.O estágio
$merge
grava a saída na coleção do Atlas chamadaenriched_stream
no banco de dadossample_weatherstream
. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$cachedLookup': { "ttl": { "size": 5, "unit": "minute" }, "maxMemUsageBytes": 209715200, from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } }, { '$match': { 'humidity_info': { '$ne': [] } } }, { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.enriched_stream
resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.