MongoDB.local SF, Jan 15: See the speaker lineup & ship your AI vision faster. Use WEB50 to save 50%
Find out more >
Docs Menu
Docs Home
/ /
/ / /

$source 단계(Stream Processing)

$source

$source 단계에서는 데이터를 스트리밍할 연결 레지스트리의 연결을 지정합니다. 지원되는 연결 유형은 다음과 같습니다.

  • Apache Kafka 브로커

  • MongoDB collection change stream

  • MongoDB database change stream

  • MongoDB 클러스터 변경 스트림

  • AWS Kinesis 데이터 스트림

  • 문서 배열

Apache Kafka 브로커의 스트리밍 데이터로 작업하기 위해 $source 단계의 프로토타입 형식은 다음과 같습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

필수 사항

데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.

topic

문자열 또는 문자열 배열

필수 사항

메시지를 스트림할 하나 이상의 Apache Kafka 주제 이름입니다. 둘 이상의 주제 에서 메시지를 스트림 하려면 해당 주제를 배열 로 지정합니다.

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드 인수로 사용하는 a : 표현식 :$toDate 입니다.

  • 소스 메시지 필드 인수로 사용하는 a : 표현식 :$dateFromString 입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

partitionIdleTimeout

문서

옵션

파티션이 워터마크 계산에서 무시되기 전에 유휴 상태로 허용되는 시간을 지정하는 문서입니다.

이 필드는 기본적으로 비활성화되어 있습니다. 유휴 상태로 인해 진행되지 않는 파티션을 처리하려면 이 필드에 값을 설정하세요.

partitionIdleTimeout.size

integer

옵션

파티션 유휴 시간 초과 기간을 지정하는 숫자입니다.

partitionIdleTimeout.unit

문자열

옵션

파티션 유휴 시간 초과 기간의 시간 단위입니다.

unit 의 값은 다음 중 하나일 수 있습니다.

  • "ms" (밀리초)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.auto_offset_reset

문자열

옵션

수집을 시작할 Apache Kafka 소스 주제 의 이벤트 지정합니다. auto_offset_reset 는 다음 값을 사용합니다.

  • end, latest 또는 largest 을 사용하여 애그리게이션이 초기화될 때 주제의 최신 이벤트로부터 수집을 시작합니다.

  • earliest, beginning 또는 smallest : 주제에서 가장 오래된 이벤트부터 수집을 시작합니다.

기본값은 latest입니다.

config.group_id

문자열

옵션

스트림 프로세서와 연결할 kafka 소비자 그룹의 ID입니다. 생략할 경우, Atlas Stream Processing은 스트림 처리 작업 공간을 다음 형식의 자동 생성 ID와 연결합니다.

asp-${streamProcessorId}-consumer

Atlas Stream Processing 모든 영구 스트림 프로세서에 대해 이 매개변수 값을 자동으로 생성합니다. SP로 정의된 임시 스트림 프로세서의 경우. 프로세스()에서 이 매개변수는 수동으로 정의한 경우에만 설정하다 됩니다.

config.enable_auto_commit

부울

조건부

Kafka 브로커 파티션 오프셋에 대한 커밋 정책을 결정하는 플래그입니다. Atlas Stream Processing 두 가지 커밋 정책을 지원합니다.

  • 이 매개 변수를 true로 설정하다 Atlas Stream Processing $source 단계에서 다음 연산자 로 데이터를 전달할 때마다 오프셋을 커밋합니다.

  • 이 매개 변수를 false로 설정하다 하면 Atlas Stream Processing 체크포인트 취할 때 스트림 프로세서가 파티션 오프셋을 커밋 .

이 매개변수는 config.group_id 이 설정하다 경우에만 설정하다 수 있습니다.

SP로 정의된 임시 스트림 프로세서의 경우. 프로세스()에서 false 을 설정하다 하지 않는 한 이 매개 변수의 group_id 기본값은 입니다. 그렇지 않으면 기본값은 true 입니다.

config.keyFormat

문자열

옵션

Apache Kafka 키 데이터를 역직렬화하는 데 사용되는 데이터 유형입니다. 다음 값 중 하나여야 합니다.

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

기본값은 binData입니다.

config.keyFormatError

문자열

옵션

Apache Kafka 키 데이터를 역직렬화할 때 발생하는 오류를 처리하다 방법. 다음 값 중 하나여야 합니다.

  • dlq는, 해당 문서를 데드 레터 큐에 기록합니다.

  • passThrough- 키 데이터 없이 문서를 다음 단계로 보냅니다.

참고

Atlas Stream Processing에서는 소스 데이터 스트림의 문서가 유효한 json 또는 ejson이어야 합니다. Atlas Stream Processing은 데드 레터 큐를 구성한 경우 이 요구 사항을 충족하지 않는 문서를 데드 레터 큐로 설정합니다.

Atlas 컬렉션 변경 스트림을 통해 애플리케이션은 단일 컬렉션의 실시간 데이터 변경 사항에 액세스할 수 있습니다. 컬렉션에 대한 변경 스트림을 여는 방법을 알아보려면 변경 스트림을 참조하세요.

Atlas 컬렉션 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source 단계는 다음과 같은 프로토타입 형식을 갖습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"initialSync": {
"enable": <boolean>,
"parallelism": <integer>
},
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
]
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}],
"maxAwaitTimeMS": <time-ms>,
}
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

조건부

데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

db

문자열

필수 사항

connectionName 으로 지정된 Atlas 인스턴스에 호스팅된 MongoDB database의 이름입니다. 이 데이터베이스의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다.

coll

문자열 또는 문자열 배열

필수 사항

connectionName에서 지정한 Atlas 인스턴스 에서 호스팅되는 하나 이상의 MongoDB 컬렉션 이름입니다. 이러한 컬렉션의 변경 스트림 스트리밍 데이터 소스 역할을 합니다. 이 필드 생략하면 스트림 프로세서가 MongoDB 데이터베이스 변경 스트림에서 가져옵니다.

initialSync

문서

옵션

initialSync 기능과 관련된 필드가 포함된 문서입니다.

Atlas Stream Processing initialSync 를 사용하면 Atlas 컬렉션 의 기존 문서를 삽입 changeEvent 문서처럼 수집할 수 있습니다. initialSync를 활성화 스트림 프로세서를 시작할 때 새 수신 changeEvent 문서 수집 및 프로세스 진행하기 전에 먼저 컬렉션 의 모든 기존 문서를 수집하고 프로세스 . initialSync 가 완료되면 반복되지 않습니다.

initialSync를 활성화 하면 파이프라인 에서 $hoppingWindow, $sessionWindow 또는 $tumblingWindow 단계를 사용할 수 없습니다.

중요한: 수신 문서의 _id 값이 기본값 생성된 ObjectId 값 또는 정렬된 int/long 값인 컬렉션에서만 initialSync 를 사용할 수 있습니다. 모든 _id 값은 동일한 유형이어야 합니다.

initialSync.enable

부울

조건부

initialSync 활성화 여부를 결정합니다. initialSync 필드 선언하는 경우 이 필드 설정하다 해야 합니다.

initialSync.parallelism

integer

옵션

initialSync 작업을 프로세스 할 병렬 처리 수준을 결정합니다. 값을 지정하지 않으면 기본값은 1입니다.

각 스트림 프로세서에는 해당 계층 에 따라 결정되는 최대 누적 병렬 처리 수 값이 있습니다. 스트림 프로세서의 누적 병렬 처리 수는 다음과 같이 계산됩니다.

parallelism total - parallelized stages

여기서 parallelism total 은(는) $source, $lookup$merge 단계에서 1 보다 큰 모든 parallelism 값의 합계이며, parallelized stagesparallelism 값이 1보다 큰 이러한 단계의 수입니다.

예시 를 들어 $source 단계에서 parallelism 값을 4로 설정하고, $lookup 단계에서 parallelism 값을 설정하지 않으며(따라서 기본값은 1), $merge 단계는 parallelism 값이 2인 경우 두 개의 parallelized stages이(가) 있고 스트림 프로세서의 누적 병렬 처리 수는 (4 + 2) - 2(으)로 계산됩니다.

스트림 프로세서가 해당 계층 의 최대 누적 병렬 처리 수를 초과하는 경우 Atlas Stream Processing 오류가 발생하고 의도한 병렬 처리 수준에 필요한 최소 프로세서 계층 알려줍니다. 오류를 해결하려면 프로세서를 더 높은 계층 으로 확장하다 하거나 단계의 병렬 처리 값을 줄여야 합니다. 자세한 학습은 Stream Processing을 참조하세요.

readPreference

문서

옵션

작업에대한 읽기 설정. initialSync

기본값은 primary입니다.

readPreferenceTags

문서

옵션

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.startAfter

token

조건부

원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.startAtOperationTime

timestamp

조건부

소스가 보고를 시작해야 하는 optime입니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

MongoDB 확장 JSON $date 또는 $timestamp 값을 허용합니다.

config.fullDocument

문자열

조건부

change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:

  • updateLookup : 업데이트 시 변경 사항만 반환합니다.

  • required : 전체 문서를 반환해야 합니다. 전체 문서를 사용할 수 없는 경우 아무것도 반환하지 않습니다.

  • whenAvailable : 사용 가능한 문서가 있을 때마다 전체 문서를 반환하고, 그렇지 않으면 변경 사항을 반환합니다.

fullDocument에 값을 지정하지 않으면 기본값은 updateLookup 입니다.

이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다.

config.fullDocumentOnly

부울

조건부

change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 fullDocument 의 내용만 반환할지를 제어하는 설정입니다. true 로 설정하면 소스는 fullDocument 의 내용만 반환합니다.

이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다.

config.fullDocumentBeforeChange

문자열

옵션

change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:

  • off : fullDocumentBeforeChange 필드를 생략합니다.

  • required : 변경 전 상태의 전체 문서를 반환해야 합니다. 변경 전 상태의 전체 문서를 사용할 수 없는 경우 스트림 프로세서가 실패합니다.

  • whenAvailable : 전체 문서가 있는 경우 변경 전 상태로 전체 문서를 반환하고, 그렇지 않으면 fullDocumentBeforeChange 필드를 생략합니다.

fullDocumentBeforeChange 에 값을 지정하지 않으면 기본값은 off 입니다.

이 필드를 컬렉션 변경 스트림과 함께 사용하려면 해당 컬렉션에서 변경 스트림 사전 및 사후 이미지를 활성화해야 합니다.

config.pipeline

문서

옵션

추가 처리를 위한 전달 전에 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 변경 스트림 출력 수정에 설명된 매개변수를 준수해야 합니다.

중요한: 각 변경 이벤트에는 wallTimeclusterTime 필드가 포함되어 있습니다. $source 이후의 Atlas Stream Processing 단계에서는 프로세서가 이러한 필드를 수집할 때 이러한 필드를 수신할 것으로 예상합니다. 변경 스트림 데이터가 올바르게 처리 되도록 하려면 $source.config.pipeline에서 이러한 필드를 수정하지 마세요.

config.maxAwaitTimeMS

integer

옵션

빈 배치를 반환하기 전에 변경 스트림 커서에 보고할 새 데이터 변경 내용을 기다리는 최대 시간(밀리초)입니다.

기본값은 1000입니다.

Atlas 데이터베이스 변경 스트림을 통해 애플리케이션은 단일 데이터베이스의 실시간 데이터 변경 사항에 액세스할 수 있습니다. 데이터베이스에 대한 변경 스트림을 여는 방법을 알아보려면 변경 스트림을 참조하세요.

Atlas 데이터베이스 변경 스트림의 스트리밍 데이터로 작업하기 위해 $source 단계는 다음과 같은 프로토타입 형식을 갖습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

조건부

데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

db

문자열

필수 사항

connectionName 으로 지정된 Atlas 인스턴스에 호스팅된 MongoDB database의 이름입니다. 이 데이터베이스의 변경 스트림은 스트리밍 데이터 소스 역할을 합니다.

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.startAfter

token

조건부

원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.startAtOperationTime

timestamp

조건부

소스가 보고를 시작해야 하는 optime입니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

MongoDB 확장 JSON $date 또는 $timestamp 값을 허용합니다.

config.fullDocument

문자열

조건부

change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:

  • updateLookup : 업데이트 시 변경 사항만 반환합니다.

  • required : 전체 문서를 반환해야 합니다. 전체 문서를 사용할 수 없는 경우 아무것도 반환하지 않습니다.

  • whenAvailable : 사용 가능한 문서가 있을 때마다 전체 문서를 반환하고, 그렇지 않으면 변경 사항을 반환합니다.

fullDocument에 값을 지정하지 않으면 기본값은 updateLookup 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentOnly

부울

조건부

change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 fullDocument 의 내용만 반환할지를 제어하는 설정입니다. true 로 설정하면 소스는 fullDocument 의 내용만 반환합니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentBeforeChange

문자열

옵션

change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:

  • off : fullDocumentBeforeChange 필드를 생략합니다.

  • required : 변경 전 상태의 전체 문서를 반환해야 합니다. 변경 전 상태의 전체 문서를 사용할 수 없는 경우 스트림 프로세서가 실패합니다.

  • whenAvailable : 전체 문서가 있는 경우 변경 전 상태로 전체 문서를 반환하고, 그렇지 않으면 fullDocumentBeforeChange 필드를 생략합니다.

fullDocumentBeforeChange 에 값을 지정하지 않으면 기본값은 off 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.pipeline

문서

옵션

원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 변경 스트림 출력 수정에 설명된 매개변수를 준수해야 합니다.

중요한: 각 변경 이벤트에는 wallTimeclusterTime 필드가 포함되어 있습니다. $source 이후의 Atlas Stream Processing 단계에서는 프로세서가 이러한 필드를 수집할 때 이러한 필드를 수신할 것으로 예상합니다. 변경 스트림 데이터가 올바르게 처리 되도록 하려면 $source.config.pipeline에서 이러한 필드를 수정하지 마세요.

config.maxAwaitTimeMS

integer

옵션

빈 배치를 반환하기 전에 변경 스트림 커서에 보고할 새 데이터 변경 내용을 기다리는 최대 시간(밀리초)입니다.

기본값은 1000입니다.

전체 Atlas 클러스터 변경 스트림에서 스트리밍 데이터를 처리하기 위해 $source 단계는 다음과 같은 프로토타입 형태를 갖습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

조건부

데이터를 수집할 연결 레지스트리 에서 연결을 식별하는 레이블입니다.

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.startAfter

token

조건부

원본이 보고를 시작한 후 변경 이벤트입니다. 재개 토큰의 형태를 취합니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

config.startAtOperationTime

날짜 | 타임스탬프

조건부

소스가 보고를 시작해야 하는 optime입니다.

config.startAfter 또는 config.StartAtOperationTime 중 하나만 사용할 수 있습니다.

MongoDB 확장 JSON $date 또는 $timestamp 값을 허용합니다.

config.fullDocument

문자열

조건부

change stream 소스에서 문서를 반환할지, 아니면 업데이트 발생 시 변경 사항만 반환할지를 제어하는 설정입니다. 다음 중 하나여야 합니다:

  • updateLookup : 업데이트 시 변경 사항만 반환합니다.

  • required : 전체 문서를 반환해야 합니다. 전체 문서를 사용할 수 없는 경우 아무것도 반환하지 않습니다.

  • whenAvailable : 사용 가능한 문서가 있을 때마다 전체 문서를 반환하고, 그렇지 않으면 변경 사항을 반환합니다.

fullDocument에 값을 지정하지 않으면 기본값은 updateLookup 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentOnly

부울

조건부

change stream 소스가 모든 메타데이터를 포함한 전체 이벤트 문서를 반환할지, 또는 fullDocument 의 내용만 반환할지를 제어하는 설정입니다. true 로 설정하면 소스는 fullDocument 의 내용만 반환합니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.fullDocumentBeforeChange

문자열

옵션

change stream 소스가 출력에 원래 '변경 전' 상태의 문서를 포함할지 여부를 지정합니다. 다음 중 하나여야 합니다:

  • off : fullDocumentBeforeChange 필드를 생략합니다.

  • required : 변경 전 상태의 전체 문서를 반환해야 합니다. 변경 전 상태의 전체 문서를 사용할 수 없는 경우 스트림 프로세서가 실패합니다.

  • whenAvailable : 전체 문서가 있는 경우 변경 전 상태로 전체 문서를 반환하고, 그렇지 않으면 fullDocumentBeforeChange 필드를 생략합니다.

fullDocumentBeforeChange 에 값을 지정하지 않으면 기본값은 off 입니다.

이 필드를 데이터베이스 변경 스트림과 함께 사용하려면 해당 데이터베이스의 모든 컬렉션에서 변경 스트림 사전 및 사후 이미지 를 활성화해야 합니다.

config.pipeline

문서

옵션

원점에서 변경 스트림 출력을 필터링하기 위한 집계 파이프라인을 지정합니다. 이 파이프라인은 변경 스트림 출력 수정에 설명된 매개변수를 준수해야 합니다.

Atlas Stream Processing 수집된 각 변경 이벤트에서 wallTimeclusterTime 필드를 수신할 것으로 예상합니다. 변경 스트림 데이터를 올바르게 처리 하려면 $source.config.pipeline에서 이러한 필드를 수정하지 마세요.

config.maxAwaitTimeMS

integer

옵션

빈 배치를 반환하기 전에 변경 스트림 커서에 보고할 새 데이터 변경 내용을 기다리는 최대 시간(밀리초)입니다.

기본값은 1000입니다.

AWS Kinesis 데이터 스트림의 데이터로 작업하기 위해 $source 단계의 프로토타입 형식은 다음과 같습니다.

{
"$source": {
"connectionName": "<registered-connection>",
"stream": "<stream-name>",
"region": "<aws-region>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<field-name>",
"shardIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"consumerARN": "<aws-arn>",
"initialPosition": <initial-position>,
reshardDetectionIntervalSecs: <interval>
}
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

connectionName

문자열

필수 사항

연결 레지스트리에서 데이터를 수집할 연결을 식별하는 레이블입니다.

stream

문자열

필수 사항

메시지를 스트림 할AWS Kinesis 데이터 스트림 입니다.

region

문자열

조건부

지정된스트림 있는 AWS 리전 입니다. Kinesis 서로 다른 리전에서 동일한 이름을 가진 여러 데이터 스트림을 지원합니다. 동일한 연결 내 둘 이상의 리전에서 데이터 스트림에 동일한 이름을 사용하는 경우 이 필드 사용하여 사용할 이름과 리전 의 조합을 지정해야 합니다.

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

shardIdleTimeout

문서

옵션

워터마크 계산에서 샤드가 무시되기 전에 샤드 유휴 상태로 허용되는 시간을 지정하는 문서입니다.

이 필드 기본값 으로 비활성화되어 있습니다. 유휴 상태로 인해 앞으로 이동하지 않는 샤드를 처리하다 하려면 이 필드 의 값을 설정하다 .

shardIdleTimeout.size

문서

옵션

샤드 유휴 시간 초과 기간을 지정하는 숫자입니다.

shardIdleTimeout.unit

문서

옵션

샤드 유휴 시간 초과 기간의 시간 단위입니다.

unit 의 값은 다음 중 하나일 수 있습니다.

  • "ms" (밀리초)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

문서

옵션

다양한 기본값을 재정의하는 필드가 포함된 문서입니다.

config.consumerARN

문자열

옵션

Kinesis 소비자에 해당하는ARN입니다. 이 필드 지정하면 소비자는 향상된 팬아웃을 사용합니다. 그렇지 않으면 Kinesis 표준 소비자를 사용합니다.

config.initialPosition

문자열

옵션

메시지 수집을 시작할 Kinesis 데이터 스트림 기록의 위치입니다. 다음 중 하나여야 합니다.

  • "TRIM_HORIZON": 샤드 에서 가장 오래된 메시지부터 수집을 시작합니다.

  • "LATEST": 샤드 의 최신 메시지 에서 수집 을 시작 합니다 .

기본값은 'LAtest'입니다.

reshardDetectionIntervalSecs

integer

옵션

리샤딩을 위해Kinesis 스트림 통한 데이터 흐름 속도에 대한 검사 간격(초)입니다.

문서 배열에서 작업하기 위해 $source 단계의 프로토타입 형식은 다음과 같습니다.

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"documents" : [{source-doc},...] | <expression>
}
}

$source 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명

timeField

문서

옵션

수신 메시지에 대한 신뢰할 수 있는 타임스탬프를 정의하는 문서입니다.

timeField 을(를) 사용하는 경우 다음 중 하나로 정의해야 합니다.

  • 소스 메시지 필드를 인수로 사용하는 $toDate 표현식

  • 소스 메시지 필드를 인수로 사용하는 $dateFromString 표현식입니다.

timeField 을 선언하지 않으면 Atlas Stream Processing은 소스에서 제공한 메시지 타임스탬프에서 타임스탬프를 생성합니다.

documents

배열

조건부

Array of documents to use as a streaming data source. 이 필드의 값은 객체 배열이거나 객체 배열로 평가되는 표현식일 수 있습니다. connectionName 필드를 사용할 때는 이 필드를 사용하지 마세요.

$source 는 표시되는 모든 파이프라인의 첫 번째 단계여야 합니다. 파이프라인당 $source 단계는 하나만 사용할 수 있습니다.

Kafka $source 단계의 경우 Atlas Stream Processing 소스 주제 내의 여러 파티션에서 병렬로 읽습니다. 파티션 제한은 프로세서 계층 에 따라 결정됩니다. 자세한 학습은 Stream Processing 청구 참조를참조하세요.

스트리밍 데이터 소스는 샘플 날씨 데이터 세트의 스키마에 따라 다양한 위치의 자세한 날씨 보고서를 생성합니다. 다음 집계 작업은 세 단계로 구성됩니다.

  1. 단계는 $source 라는 주제 에서 이러한 보고서를 수집하는 Apache Kafka 브로커와의 연결을 my_weatherdata 설정하여 수집되는 각 기록 후속 집계 단계에 노출합니다. 또한 이 단계에서는 프로젝션하는 타임스탬프 필드 의 이름을 ingestionTime로 재정의하여 로 설정합니다.

  2. $match 단계에서는 dewPoint.value 값이 5.0 미만인 문서를 제외하고 dewPoint.value 값이 5.0 보다 큰 문서를 다음 단계로 전달합니다.

  3. $merge 단계는 sample_weatherstream 데이터베이스의 stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

결과 sample_weatherstream.stream 컬렉션의 문서를 보려면 Atlas 클러스터에 연결하고 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

참고

위 사례는 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 보게 됩니다.

다음 집계는 샘플 데이터세트가 로드된 Atlas 클러스터에 연결되는 cluster0-collection 소스에서 데이터를 수집합니다. 스트림 처리 작업 공간을 생성하고 연결 레지스트리에 Atlas 클러스터에 대한 연결을 추가하는 방법을 알아보려면 Atlas Stream Processing 시작하기를 참조하세요. 이 집계는 두 단계를 실행하여 변경 스트림을 열고 sample_weatherdata 데이터베이스의 data 컬렉션에 대한 변경 사항을 기록합니다.

  1. $source 단계는 cluster0-collection 소스에 연결하고 sample_weatherdata 데이터베이스의 data 컬렉션에 대한 변경 스트림을 엽니다.

  2. $merge 단계는 필터링된 변경 스트림 문서를 sample_weatherdata 데이터베이스의 data_changes라는 Atlas 컬렉션에 기록합니다. 해당 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
$source: {
connectionName: "cluster0-connection",
db : "sample_weatherdata",
coll : "data"
},
$merge: {
into: {
connectionName: "cluster0-connection",
db: "sample_weatherdata",
coll: "data_changes"
}
}
}

다음 mongosh 명령은 data 문서를 삭제합니다:

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

data 문서가 삭제된 후, 스트림 프로세서는 변경 스트림 이벤트 문서를 sample_weatherdata.data_changes 컬렉션에 기록합니다. sample_weatherdata.data_changes 컬렉션 결과에 있는 문서를 보려면 mongosh를 사용하여 Atlas 클러스터에 연결한 후 다음 명령을 실행하세요.

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]

돌아가기

집계 단계

이 페이지의 내용