정의
$cachedLookup
단계는 $source
에서 연결 레지스트리의 Atlas 컬렉션으로 메시지 스트림의 왼쪽 외부 조인을 수행합니다.
이 단계는 $lookup 단계와 유사하게 작동하지만 구성 가능한 매개 변수에 따라 쿼리 결과를 캐시합니다.
중요
$cachedLookup
let
또는 pipeline
필드를 지원 하지 않습니다.
자세히 알아보려면 $lookup 구문을 참조하세요.
다음 프로토타입 양식은 사용 가능한 모든 필드를 보여 줍니다.
{ "$lookup": { "ttl": { "size": <int>, "unit": "ms" | "second" | "minute" | "hour" | "day" }, "maxMemUsageBytes": <int>, "from": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>", "coll": "<atlas-collection-name>" }, "localField": "<field-in-source-messages>", "foreignField": "<field-in-from-collection>", "as": "<output-array-field>" } }
구문
$cachedLookup
는 $lookup
의 일반화된 버전과 동일한 필드 중 일부를 사용합니다. $cachedLookup
에는 쿼리 캐싱 동작을 구성하기 위한 필드가 포함되어 있으며 연결 레지스트리에서 연결을 통해 데이터를 쿼리하기 위한 from
필드 에 대한 수정된 구문을 제공합니다.
필드 | 유형 | 필요성 | 설명 |
---|---|---|---|
TTL | 문서 | 필수 사항 | 캐시된 쿼리의 TTL 지정하는 문서입니다. |
ttl.size | int | 필수 사항 |
|
ttl.unit | 문자열 | 필수 사항 | 캐시된 쿼리의 TTL 측정하는 시간 단위입니다. 다음 중 하나여야 합니다.
|
maxMemUsageBytes | int | 필수 사항 | 쿼리 캐싱에 할당할 최대 메모리(바이트)입니다. 캐시 크기가 이 값을 초과하면 Atlas Stream Processing 먼저 이전 결과를 여유 공간으로 제거합니다. 만료된 결과가 충분하지 않아 이 임계값 아래로 떨어질 경우, Atlas Stream Processing 캐시 크기가 임계값 미만이 될 때까지 캐시된 쿼리를 무작위로 제거합니다. 기본값은 스트림 처리 인스턴스 에서 사용 가능한 RAM 의 10%입니다. 스트림 처리 인스턴스 에서 사용 가능한 RAM 의 |
FROM | 문서 | 필수 사항 | 의 메시지에 결합할 Atlas 데이터베이스 의 컬렉션 지정하는 이 필드 지정하는 경우 이 문서 의 모든 필드에 대한 값을 지정해야 합니다. |
from.connectionName | 문자열 | 필수 사항 | 연결 레지스트리의 연결 이름입니다. |
from.db | 문자열 | 필수 사항 | 참여하려는 컬렉션이 포함된 Atlas 데이터베이스의 이름입니다. |
from.coll | 문자열 | 필수 사항 | 가입하려는 컬렉션의 이름입니다. |
localField | 문자열 | 필수 사항 | 참여할 |
foreignField | 문자열 | 필수 사항 | 조인할 |
방식 | 문자열 | 필수 사항 | 입력 문서에 추가할 새 배열 필드의 이름입니다. 새 배열 필드에는 |
행동
$cachedLookup
은(는) $source
의 메시지와 지정된 Atlas 컬렉션 의 문서를 왼쪽 외부 조인을 수행합니다. 이 버전은 표준 MongoDB database 에서 사용할 수 있는 $lookup
단계와 유사하게 작동합니다. 그러나 이 버전에서는 연결 레지스트리 의 Atlas 컬렉션을 from
필드의 값으로 지정해야 합니다.
또한 $cachedLookup
는 구성 가능한 시간 동안 쿼리 결과를 캐시합니다. 자주 변경되지 않는 데이터에 대한 쿼리에 이 기능을 사용하여 효율성 개선합니다. 캐시된 항목의 TTL 이 경과하면 Atlas Stream Processing 해당 항목을 제거합니다. 새 쿼리 만들 때 캐시된 항목의 총 크기가 maxMemoryUsageBytes
인 경우 Atlas Stream Processing 새 쿼리 캐시 할 공간이 있을 때까지 항목을 제거합니다.
예시
스트리밍 데이터 소스는 샘플 날씨 데이터셋의 스키마에 따라 다양한 위치에서 상세한 날씨 보고서를 생성합니다. humidity_descriptions
라는 이름의 컬렉션에는 다음과 같은 형식의 문서가 포함되어 있습니다.
{ 'dew_point': 16.2, 'relative_humidity': 79, 'condition': 'sticky, oppressive' }
여기서 relative_humidity
필드 상온(20 섭씨)의 상대습도를 설명하고, condition
는 해당 수준의습도에 적합한 구두 설명을 나열합니다. $cachedLookup 단계를 사용하면 기상학자가 날씨 방송에서 사용할 수 있는 추천 설명자로 스트리밍 일기 예보를 보강할 수 있습니다.
다음 집계에는 4단계가 있습니다.
$source
단계는 Apache Kafka 브로커와 연결을 설정하여my_weatherdata
라는 주제에서 이러한 보고서를 수집하므로 각 기록이 수집될 때 후속 집계 단계에 노출됩니다. 또한 이 단계는 프로젝션하는 타임스탬프 필드의 이름을 재정의하여ingestionTime
으로 설정합니다.$cachedLookup
단계에서는humidity_descriptions
데이터베이스 의 기록을dewPoint
필드 의 일기 예보에 결합합니다. 각 쿼리 에는5 minute
TTL 있으며, Atlas Stream Processing 최대 200 MB의 결과를 저장합니다.$match
단계에서는humidity_info
필드가 비어 있는 문서를 제외하고,humidity_info
필드가 채워진 문서를 다음 단계로 전달합니다.$merge
단계는sample_weatherstream
데이터베이스의enriched_stream
라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$cachedLookup': { "ttl": { "size": 5, "unit": "minute" }, "maxMemUsageBytes": 209715200, from: { connectionName: 'weatherStream', db: 'humidity', coll: 'humidity_descriptions' }, 'localField':'dewPoint.value', 'foreignField':'dewPoint', 'as': 'humidity_info' } }, { '$match': { 'humidity_info': { '$ne': [] } } }, { '$merge': { into: { connectionName: 'weatherStream', db: 'sample_weatherstream', coll: 'enriched_stream' } } }
결과 sample_weatherstream.enriched_stream
컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").enriched_stream.find()
{ st: 'x+55100+006100', position: { type: 'Point', coordinates: [ 92.7, -53.6 ] }, elevation: 9999, callLetters: 'UECN', qualityControlProcess: 'V020', dataSource: '4', type: 'FM-13', airTemperature: { value: -11, quality: '9' }, dewPoint: { value: 12.5, quality: '1' }, pressure: { value: 1032.7, quality: '9' }, wind: { direction: { angle: 300, quality: '9' }, type: '9', speed: { rate: 23.6, quality: '2' } }, visibility: { distance: { value: 14000, quality: '1' }, variability: { value: 'N', quality: '1' } }, skyCondition: { ceilingHeight: { value: 390, quality: '9', determination: 'C' }, cavok: 'N' }, sections: [ 'SA1', 'AA1', 'OA1', 'AY1', 'AG1' ], precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 21 }, atmosphericPressureChange: { tendency: { code: '1', quality: '1' }, quantity3Hours: { value: 5.5, quality: '1' }, quantity24Hours: { value: 99.9, quality: '9' } }, seaSurfaceTemperature: { value: 1.3, quality: '9' }, waveMeasurement: { method: 'M', waves: { period: 4, height: 2.5, quality: '9' }, seaState: { code: '00', quality: '9' } }, pastWeatherObservationManual: { atmosphericCondition: { value: '4', quality: '1' }, period: { value: 6, quality: '1' } }, skyConditionObservation: { totalCoverage: { value: '07', opaque: '99', quality: '1' }, lowestCloudCoverage: { value: '06', quality: '1' }, lowCloudGenus: { value: '07', quality: '9' }, lowestCloudBaseHeight: { value: 2250, quality: '9' }, midCloudGenus: { value: '07', quality: '9' }, highCloudGenus: { value: '00', quality: '1' } }, presentWeatherObservationManual: { condition: '75', quality: '1' }, atmosphericPressureObservation: { altimeterSetting: { value: 9999.9, quality: '9' }, stationPressure: { value: 1032.6, quality: '1' } }, skyCoverLayer: { coverage: { value: '09', quality: '1' }, baseHeight: { value: 240, quality: '9' }, cloudType: { value: '99', quality: '9' } }, liquidPrecipitation: { period: 6, depth: 3670, condition: '9', quality: '9' }, extremeAirTemperature: { period: 99.9, code: 'N', value: -30.9, quantity: '9' }, ingestionTime: ISODate('2024-09-19T20:04:34.346Z'), humidity_info: [ { _id: ObjectId('66ec805ad3cfbba767ebf7a5'), dewPoint: 12.5, relativeHumidity: 62, condition: 'humid, muggy' } ], }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.