정의
$source 단계에서는 데이터를 스트리밍할 연결 레지스트리의 연결을 지정합니다. 지원되는 연결 유형은 다음과 같습니다.
Apache Kafka 브로커
MongoDB collection change stream
MongoDB database change stream
문서 배열
구문
Apache Kafka 브로커
Apache Kafka 브로커의 스트리밍 데이터로 작업하기 위해 $source 단계의 프로토타입 형식은 다음과 같습니다.
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
$source 단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 | |
|---|---|---|---|---|
| 문자열 | 필수 사항 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. | |
| 문자열 또는 문자열 배열 | 필수 사항 | 메시지를 스트리밍할 하나 이상의 Apache Kafka 주제 이름입니다. 두 개 이상의 주제에서 메시지를 스트리밍하려면 배열로 지정합니다. | |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
| |
| 문서 | 옵션 | 파티션이 워터마크 계산에서 무시되기 전에 유휴 상태로 허용되는 시간을 지정하는 문서입니다. 이 필드는 기본적으로 비활성화되어 있습니다. 유휴 상태로 인해 진행되지 않는 파티션을 처리하려면 이 필드에 값을 설정하세요. | |
| integer | 옵션 | 파티션 유휴 시간 초과 기간을 지정하는 숫자입니다. | |
| 문자열 | 옵션 | 파티션 유휴 시간 초과 기간의 시간 단위입니다.
| |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. | |
| 문자열 | 옵션 | 수집을 시작할 Apache Kafka 소스 주제 의 이벤트 지정합니다.
기본값은 | |
| 문자열 | 옵션 | 스트림 프로세서와 연결할 kafka 소비자 그룹 의 ID . 생략된 경우 Atlas Stream Processing 스트림 처리 작업 공간을 다음 형식으로 자동 생성된 ID 와 연결합니다. Atlas Stream Processing 모든 영구 스트림 프로세서에 대해 이 매개변수 값을 자동으로 생성합니다.SP로 정의된 임시 스트림 프로세서의 경우. 프로세스()에서 이 매개변수는 수동으로 정의한 경우에만 설정하다 됩니다. | |
| 부울 | 조건부 | Kafka 브로커 파티션 오프셋에 대한 커밋 정책을 결정하는 플래그입니다. Atlas Stream Processing 두 가지 커밋 정책을 지원합니다.
이 매개변수는 SP로 정의된 임시 스트림 프로세서의 경우. 프로세스()에서 | |
| 문자열 | 옵션 | Apache Kafka 키 데이터를 역직렬화하는 데 사용되는 데이터 유형입니다. 다음 값 중 하나여야 합니다.
기본값은 | |
| 문자열 | 옵션 |
참고
Atlas Stream Processing에서는 소스 데이터 스트림의 문서가 유효한 json 또는 ejson이어야 합니다. Atlas Stream Processing은 데드 레터 큐를 구성한 경우 이 요구 사항을 충족하지 않는 문서를 데드 레터 큐로 설정합니다.
MongoDB collection change stream
Atlas 컬렉션 변경 스트림을 통해 애플리케이션은 단일 컬렉션의 실시간 데이터 변경 사항에 액세스할 수 있습니다. 컬렉션에 대한 변경 스트림을 여는 방법을 알아보려면 변경 스트림을 참조하세요.
Atlas 컬렉션 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source 단계는 다음과 같은 프로토타입 형식을 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "coll" : ["<source-coll>",...], "initialSync": { "enable": <boolean>, "parallelism": <integer> }, "readPreference": "<read-preference>", "readPreferenceTags": [ {"<key>": "<value>"}, . . . ] "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }], "maxAwaitTimeMS": <time-ms>, } } }
$source 단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
|---|---|---|---|
| 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 필수 사항 |
|
| 문자열 또는 문자열 배열 | 필수 사항 |
|
| 문서 | 옵션 |
Atlas Stream Processing
중요
|
| 부울 | 조건부 |
|
| integer | 옵션 |
|
| 문서 | 옵션 | 작업에대한 읽기 설정. 기본값은 |
| 문서 | 옵션 | 작업에대한 기본 설정 태그를 읽습니다. |
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| token | 조건부 | 원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.
|
| timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
MongoDB 확장 JSON |
| 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다. |
| 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다. |
| 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다. |
| 문서 | 옵션 | 추가 처리를 위한 전달 전에 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 변경 스트림 출력 수정에 설명된 매개변수를 준수해야 합니다. 중요각 변경 이벤트에는 |
| integer | 옵션 | 빈 배치를 반환하기 전에 변경 스트림 커서에 보고할 새 데이터 변경 내용을 기다리는 최대 시간(밀리초)입니다. 기본값은 |
MongoDB database change stream
Atlas 데이터베이스 변경 스트림을 통해 애플리케이션은 단일 데이터베이스의 실시간 데이터 변경 사항에 액세스할 수 있습니다. 데이터베이스에 대한 변경 스트림을 여는 방법을 알아보려면 변경 스트림을 참조하세요.
Atlas 데이터베이스 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source 단계는 다음과 같은 프로토타입 형식을 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source 단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
|---|---|---|---|
| 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문자열 | 필수 사항 |
|
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| token | 조건부 | 원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.
|
| timestamp | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
MongoDB 확장 JSON |
| 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 변경 스트림 출력 수정에 설명된 매개변수를 준수해야 합니다. 중요각 변경 이벤트에는 |
| integer | 옵션 | 빈 배치를 반환하기 전에 변경 스트림 커서에 보고할 새 데이터 변경 내용을 기다리는 최대 시간(밀리초)입니다. 기본값은 |
MongoDB 클러스터 전체 변경 스트림 소스
전체 Atlas 클러스터 변경 스트림에서 스트리밍 데이터를 처리하기 위해 $source 단계는 다음과 같은 프로토타입 형태를 갖습니다.
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source 단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
|---|---|---|---|
| 문자열 | 조건부 | 데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다. |
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 문서 | 옵션 | 다양한 기본값을 재정의하는 필드가 포함된 문서입니다. |
| token | 조건부 | 원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.
|
| 날짜 | 타임스탬프 | 조건부 | 소스가 보고를 시작해야 하는 optime입니다.
MongoDB 확장 JSON |
| 문자열 | 조건부 | change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:
fullDocument에 값을 지정하지 않으면 기본값은 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 부울 | 조건부 | change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문자열 | 옵션 | change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:
이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다. |
| 문서 | 옵션 | 원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 변경 스트림 출력 수정에 설명된 매개변수를 준수해야 합니다. Atlas Stream Processing 수집된 각 변경 이벤트에서 |
| integer | 옵션 | 빈 배치를 반환하기 전에 변경 스트림 커서에 보고할 새 데이터 변경 내용을 기다리는 최대 시간(밀리초)입니다. 기본값은 |
문서 배열
문서 배열에서 작업하기 위해 $source 단계의 프로토타입 형식은 다음과 같습니다.
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "documents" : [{source-doc},...] | <expression> } }
$source 단계에서는 다음 필드가 있는 문서를 사용합니다.
필드 | 유형 | 필요성 | 설명 |
|---|---|---|---|
| 문서 | 옵션 | 수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.
|
| 배열 | 조건부 | Array of documents to use as a streaming data source. 이 필드의 값은 객체 배열이거나 객체 배열로 평가되는 표현식일 수 있습니다. |
행동
$source 는 표시되는 모든 파이프라인의 첫 번째 단계여야 합니다. 파이프라인당 $source 단계는 하나만 사용할 수 있습니다.
예시
Kafka 예시
스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.
단계는
$source라는 주제 에서 이러한 보고서를 수집하는 Apache Kafka 브로커와의 연결을my_weatherdata설정하여 수집되는 각 기록 후속 집계 단계에 노출합니다. 또한 이 단계에서는 프로젝션하는 타임스탬프 필드 의 이름을ingestionTime로 재정의하여 로 설정합니다.$match단계에서는dewPoint.value값이5.0미만인 문서를 제외하고dewPoint.value값이5.0보다 큰 문서를 다음 단계로 전달합니다.$merge단계는sample_weatherstream데이터베이스의stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
참고
위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.
변경 스트림 예시
다음 집계 샘플 데이터 cluster0-collection 세트가 로드된 Atlas cluster 에 연결되는 소스에서 데이터를 수집합니다. 스트림 처리 작업 공간을 만들고 연결 레지스트리에 Atlas cluster 에 대한 연결을 추가하는 방법을 학습 Atlas Stream Processing 시작하기를 참조하세요. 이 집계 두 단계를 실행하여 변경 스트림 열고 데이터베이스 의 컬렉션 에 대한 변경 사항을 기록 data sample_weatherdata.
$source단계는cluster0-collection소스에 연결하고sample_weatherdata데이터베이스의data컬렉션에 대한 변경 스트림을 엽니다.$merge단계는 필터링된 변경 스트림 문서를sample_weatherdata데이터베이스의data_changes라는 Atlas 컬렉션에 기록합니다. 해당 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.
{ $source: { connectionName: "cluster0-connection", db : "sample_weatherdata", coll : "data" }, $merge: { into: { connectionName: "cluster0-connection", db: "sample_weatherdata", coll: "data_changes" } } }
다음 mongosh 명령은 data 문서를 삭제합니다:
db.getSiblingDB("sample_weatherdata").data.deleteOne( { _id: ObjectId("5553a99ae4b02cf715120e4b") } )
data 문서가 삭제된 후, 스트림 프로세서는 변경 스트림 이벤트 문서를 sample_weatherdata.data_changes 컬렉션에 기록합니다. sample_weatherdata.data_changes 컬렉션 결과에 있는 문서를 보려면 mongosh를 사용하여 Atlas 클러스터에 연결한 후 다음 명령을 실행하세요.
db.getSiblingDB("sample_weatherdata").data_changes.find()
[ { _id: { _data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004' }, clusterTime: Timestamp({ t: 1738790819, i: 1 }), documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') }, ns: { db: 'sample_weatherdata', coll: 'data' }, operationType: 'delete', wallTime: ISODate('2025-02-05T21:26:59.313Z') } ]