Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
Atlas
/

스트림 프로세서 관리

Atlas Stream Processing 스트림 프로세서는 고유한 이름이 지정된 스트림 집계 파이프라인의 논리를 스트리밍 데이터에 적용합니다. Atlas Stream Processing은 각 스트림 프로세서 정의를 재사용할 수 있도록 영구 스토리지에 저장합니다. 정의가 저장된 스트림 처리 인스턴스에서만 지정된 스트림 프로세서를 사용할 수 있습니다. Atlas Stream Processing은 작업자당 최대 4개 스트림 프로세서를 지원합니다. 이 제한을 초과하는 추가 프로세서에 대해 Atlas Stream Processing은 새 리소스를 할당합니다.

스트림 프로세서를 생성하고 managed 다음이 필요합니다.

대부분의 스트림 프로세서 명령은 메서드 호출에서 관련 스트림 프로세서의 이름을 지정해야 합니다. 다음 섹션에서 설명하는 구문은 엄밀히 영숫자 이름을 가정합니다. 스트림 프로세서의 이름에 하이픈(-) 또는 마침표(.)와 같은 영숫자가 아닌 문자가 포함된 경우, 이름을 대괄호([])와 큰따옴표("")로 묶어야 합니다. 메서드 호출(예: sp.["special-name-stream"].stats().

에서 메서드를 사용하여 대화형으로 스트림 프로세서를 만들 수 sp.process() mongosh있습니다. 대화형으로 생성하는 스트림 프로세서는 다음과 같은 동작을 나타냅니다.

  • 출력 및 데드 레터 큐 문서를 shell에 쓰기

  • 생성 즉시 실행 시작

  • 10분 동안 실행하거나 사용자가 중지할 때까지 실행합니다.

  • 중단 후 계속 진행하지 마세요

대화형으로 생성된 스트림 프로세서는 프로토타이핑을 위한 것입니다. 지속적인 스트림 프로세서를 생성하려면 스트림 프로세서 생성을 참조하세요.

sp.process() 의 구문은 다음과 같습니다:

sp.process(<pipeline>)
필드
유형
필요성
설명

pipeline

배열

필수 사항

스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 .

스트림 프로세서를 대화형으로 생성하려면 다음을 수행합니다.

1

Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh을(를) 사용하여 연결합니다.

예시

다음 명령은 x.059 인증을 사용하여 streamOwner라는 사용자로 스트림 처리 인스턴스에 연결합니다.

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

메시지가 표시되면 사용자 비밀번호를 입력합니다.

2

mongosh 프롬프트에서 적용하려는 집계 단계가 포함된 배열을 pipeline이라는 변수에 할당합니다.

다음 예제에서는 연결 레지스트리에서 myKafka 연결의 stuff 주제를 $source 로 사용하여 temperature 필드의 값이 46 인 레코드와 일치시키고, 처리된 메시지를 output 로 내보냅니다. 연결 레지스트리의 mySink 연결 주제:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

다음 명령은 pipeline 에 정의된 로직을 적용하는 스트림 프로세서를 만듭니다.

sp.process(pipeline)

삭제할 때까지 유지되는 스트림 프로세서를 생성하려면 다음을 수행합니다.

Atlas Administration API는 스트림 프로세서를 생성하기 위한 엔드포인트를 제공합니다.

단일 스트림 프로세서 생성

Atlas UI 에서 스트림 프로세서를 만들려면 Atlas 프로젝트 의 Stream Processing 페이지로 이동하여 스트림 처리 인스턴스 창에서 Configure 를 클릭합니다.

비주얼 빌더 또는 JSON 편집기 중 하나를 선택하여 스트림 프로세서를 구성할 수 있습니다.

1

스트림 처리 인스턴스 에 기존 스트림 프로세서가 있는 경우 + Create stream processor 버튼을 클릭한 다음 드롭다운 옵션에서 Visual Builder 를 선택합니다.

스트림 프로세서를 구성할 수 있는 양식이 포함된 Visual Builder가 열립니다.

2
3

Source 필드 의 Connection 드롭다운 목록에서 스트림 프로세서의 소스로 사용할 연결을 선택합니다.

그러면 스트림 프로세서의 단계를 구성할 수 있는 JSON 텍스트 상자가 source 열립니다.source 단계 구문에 대해 자세히 학습 $source 를 참조하세요.

예시

다음 source 단계는 사전 구성된 sample_stream_solar 연결의 실시간 데이터에 대해 작동합니다.

{
"$source": {
"connectionName": "sample_stream_solar"
}
}
4

창에서 Start building your pipeline 파이프라인 에 추가하려는 집계 단계의 버튼을 클릭합니다. 그러면 선택한 집계 단계를 JSON 형식으로 구성할 수 있는 텍스트 상자가 열립니다.

집계 단계가 목록에 없는 경우 을 + Custom stage 클릭하여 지원되는 집계 단계를 JSON 형식으로 정의합니다. 스트림 처리 집계 단계와 해당 구문에 대해 자세히 학습 집계 파이프라인 단계를 참조하세요.

예시

다음 단계는 필드 $match 보다 sample_stream_solar obs.watts 큰 사전 구성된 스트림 의 모든 문서와 300 일치합니다.

{
"$match": {
"obs.watts": { "$gt": 300 }
}
}
5

파이프라인 에 애그리 집계 단계를 추가하려면 + Add stage below 파이프라인 의 마지막 단계 아래에 있는 버튼을 클릭하고 추가하려는 집계 단계를 선택하거나 Custom stage 를 클릭하여 지원되는 다른 집계 단계를 정의합니다. 그러면 새 단계를 JSON 형식으로 구성할 수 있는 텍스트 상자가 열립니다.

6

Sink 필드 의 Connection 드롭다운 목록에서 대상 연결을 선택합니다.

Sink 필드 의 Connection 드롭다운 목록에서 처리된 데이터를 쓰기 (write) 연결을 선택합니다.

그러면 스트림 프로세서의 단계를 구성할 수 있는 JSON 텍스트 상자가 merge 열립니다.merge 단계 구문에 대해 자세히 학습 $merge 를 참조하세요.

예시

다음 sink 단계에서는 demoConnection 연결이라는 이름의 연결에서 처리된 데이터를 demoDb.demoColl 컬렉션 에 쓰기 (write) .

{
"$merge": {
"into": {
"connectionName": "demoConnection",
"db": "demoDb",
"coll": "demoColl"
}
}
}
7

스트림 프로세서가 생성되어 Stream Processing 페이지의 Stream Processors 탭 에 나열됩니다.

1

스트림 처리 인스턴스 에 기존 스트림 프로세서가 있는 경우 + Create stream processor 버튼을 클릭한 다음 드롭다운 옵션에서 Visual Builder 를 선택합니다.

JSON 편집기가 열리고 스트림 프로세서를 JSON 형식으로 구성할 수 있는 텍스트 상자가 표시됩니다.

2

JSON 편집기 텍스트 상자에 스트림 프로세서에 대한 JSON 정의를 지정합니다. 이 정의에는 스트림 프로세서의 이름과 단계로 $source 시작하여 단계로 $merge 끝나는 집계 파이프라인 포함되어야 합니다. 단계와 $source $merge 단계 사이에 추가 집계 단계를 원하는 수만큼 포함할 수 있습니다.

스트림처리 집계 단계와 해당 구문에 대해 자세히 학습 집계 파이프라인 단계를 참조하세요.

예시

다음 JSON 정의는 solarDemo $tumblingWindow 중첩된 단계가 $group 있는 단계를 sample_stream_solar 사용하여 10초 간격으로 사전 구성된 연결의 실시간 데이터를 집계하는 라는 스트림 프로세서를 mongodb1 생성합니다. 처리된 데이터를 라는 연결의 컬렉션 에 씁니다.

{
"name": "solarDemo",
"pipeline": [
{
"$source": {
"connectionName": "sample_stream_solar"
}
},
{
"$tumblingWindow": {
"interval": {
"size": 10,
"unit": "second"
},
"pipeline": [
{
"$group": {
"_id": "$group_id",
"max_watts": { "$max": "$obs.watts" },
"min_watts": { "$min": "$obs.watts" }
}
}
]
}
},
{
"$merge": {
"into": {
"connectionName": "mongodb1",
"db": "solarDb",
"coll": "solarColl"
}
}
}
]
}

을 사용하여 새 스트림 프로세서를 만들려면 메서드를 mongosh sp.createStreamProcessor() 사용합니다. 다음과 같은 구문을 가집니다.

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
유형
필요성
설명

name

문자열

필수 사항

스트림 프로세서의 논리적 이름입니다. 이름은 스트림 처리 인스턴스 내에서 고유해야 합니다. 이 이름에는 영숫자만 포함되어야 합니다.

pipeline

배열

필수 사항

스트리밍 데이터에 적용하려는 스트림 집계 파이프라인 .

options

객체

옵션

스트림 프로세서에 대한 다양한 선택적 설정을 정의하는 객체입니다.

options.dlq

객체

조건부

Atlas Stream Processing 인스턴스에 데드 레터 큐 를 할당하는 객체입니다. 이 필드는 options 필드를 정의하는 경우 필요합니다.

options.dlq.connectionName

문자열

조건부

연결 레지스트리에서 연결을 식별하는 사람이 읽을 수 있는 레이블입니다. 이 연결은 Atlas cluster를 참고해야 합니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.

options.dlq.db

문자열

조건부

options.dlq.connectionName에 지정된 cluster의 Atlas 데이터베이스 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.

options.dlq.coll

문자열

조건부

options.dlq.db 에 지정된 데이터베이스의 collection 이름입니다. 이 필드는 options.dlq 필드를 정의하는 경우 필요합니다.

1

Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh을(를) 사용하여 연결합니다.

  1. Atlas Stream Processing 인스턴스의 창에서 Connect 을(를) 클릭합니다.

  2. Connect to your instance 대화 상자에서 Shell 탭 선택합니다.

  3. 대화 상자에 표시된 연결 문자열 복사합니다. 형식은 다음과 같으며, 여기서 <atlas-stream-processing-url> 은 스트림 처리 인스턴스 의 URL 이고 은 역할 가진 데이터베이스 <username> 사용자의 사용자 이름 atlasAdmin 입니다.

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>
    --password <password>
  4. 연결 문자열 터미널에 붙여넣고 <password> 자리 표시자를 사용자의 자격 증명 으로 바꿉니다. Enter 키를 눌러 실행 스트림 처리 인스턴스 에 연결합니다.

예시

다음 명령은 x.059 인증을 사용하여 streamOwner라는 사용자로 스트림 처리 인스턴스에 연결합니다.

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

메시지가 표시되면 사용자 비밀번호를 입력합니다.

2

mongosh 프롬프트에서 적용하려는 집계 단계가 포함된 배열을 pipeline이라는 변수에 할당합니다.

다음 예시 stuff myKafka 파이프라인 연결 레지스트리의 연결에서 $source 주제 로 사용하고,temperature 필드 값이 인 레코드와 일치하고,46 처리된 메시지를 output mySink 연결 레지스트리에서 연결의 주제 :

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongosh 프롬프트에서 DLQ의 다음 속성이 포함된 객체를 할당합니다.

  • connectionName

  • 데이터베이스 이름

  • 컬렉션 이름

다음 예제에서는 metadata.dlq 데이터베이스 collection에서 cluster01 연결을 통한 DLQ를 정의합니다.

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

다음 명령어는 pipeline 에 정의된 로직을 적용하는 proc01 라는 이름의 스트림 프로세서를 만듭니다. 처리 중 오류가 발생한 문서는 deadLetter 에 정의된 DLQ에 기록됩니다.

sp.createStreamProcessor("proc01", pipeline, deadLetter)

참고

Atlas Stream Processing 45 일 이상 stopped 인 스트림 프로세서의 내부 상태 삭제합니다. 이러한 프로세서를 시작하면 초기 실행 과 동일하게 작동하고 통계를 보고합니다.

스트림 프로세서를 시작하려면:

Atlas Administration API는 스트림 프로세서를 시작하기 위한 엔드포인트를 제공합니다.

단일 스트림 프로세서 시작

Atlas UI 에서 스트림 프로세서를 시작하려면 Atlas 프로젝트 의 Stream Processing 페이지로 이동한 다음 스트림 처리 인스턴스 의 창에서 Configure 를 클릭하여 인스턴스에 대해 정의된 스트림 프로세서 목록을 확인합니다.

그런 다음 스트림 프로세서의 Start 아이콘을 클릭합니다.

으로 mongosh sp.processor.start() 기존 스트림 프로세서를 시작하려면 메서드를 사용합니다.

예를 들어 proc01 이라는 스트림 프로세서를 시작하려면 다음 명령을 실행합니다.

sp.proc01.start()
{ "ok" : 1 }

스트림 프로세서가 존재하지만 현재 실행 아닌 경우 이 메서드는 를 반환합니다.{ "ok": 1 } sp.processor.start() 이(가) 아닌 스트림 프로세서에 대해 을(를) STOPPED 호출하면mongosh 이(가) 오류를 반환합니다.

참고

Atlas Stream Processing 45 일 이상 stopped 인 스트림 프로세서의 내부 상태 삭제합니다. 이러한 프로세서를 시작하면 초기 실행 과 동일하게 작동하고 통계를 보고합니다.

스트림 프로세서를 중지하려면:

Atlas Administration API는 스트림 프로세서를 중지하기 위한 엔드포인트를 제공합니다.

단일 스트림 프로세서 중지

Atlas UI 에서 스트림 프로세서를 일시 중지하려면 Atlas 프로젝트 의 Stream Processing 페이지로 이동한 다음 스트림 처리 인스턴스 의 창에서 Configure 를 클릭하여 인스턴스에 정의된 스트림 프로세서 목록을 확인합니다.

그런 다음 스트림 프로세서의 Pause 아이콘을 클릭합니다.

으로 mongosh sp.processor.stop() 기존 스트림 프로세서를 중지하려면 메서드를 사용합니다.

예를 들어 proc01 이라는 스트림 프로세서를 중지하려면 다음 명령을 실행합니다.

sp.proc01.stop()
{ "ok" : 1 }

스트림 프로세서가 존재하고 현재 실행 경우 이 메서드는 를 반환합니다. 이(가) { "ok": 1 } sp.processor.stop() 아닌 스트림 프로세서에 대해 을(를) running 호출하면mongosh 이(가) 오류를 반환합니다.

기존 스트림 프로세서의 다음 요소를 수정할 수 있습니다.

스트림 프로세서를 수정하려면 다음을 수행합니다.

  1. 스트림 프로세서를 중지합니다.

  2. 스트림 프로세서를 수정합니다.

  3. 스트림 프로세서를 다시 시작합니다.

기본적으로 수정된 프로세서는 마지막 체크포인트에서 복원됩니다. 또는 resumeFromCheckpoint=false를 설정할 수 있으며, 이 경우 프로세서는 요약 통계만 유지합니다. 열려 있는 창이 있는 프로세서를 수정하면, 업데이트된 파이프라인에서 창이 완전히 다시 계산됩니다.

참고

Operator(is, contains와 같은 매처 표현식 포함)를 사용하여 스트림 프로세서 상태 실패 경고를 구성한 스트림 프로세서의 이름을 변경하면, Atlas는 새 이름과 매처 표현식이 일치하지 않을 경우 이름이 변경된 스트림 프로세서에 대한 경고를 트리거하지 않습니다. 이름이 변경된 스트림 프로세서를 모니터링하려면 경고를 재구성하세요. 이름이 변경된 스트림 프로세서를 모니터링하려면 경고를 재구성하세요.

기본 설정 resumeFromCheckpoint=true 가 활성화되면 다음 제한 사항이 적용됩니다.

  • $source 단계를 수정할 수 없습니다.

  • 창의 간격을 수정할 수 없습니다.

  • 창을 제거할 수 없습니다.

  • 창의 내부 파이프라인에 $group 또는 $sort 단계가 있으면 창을 사용하여 파이프라인을 수정할 수 있습니다.

  • 기존 창 유형은 변경할 수 없습니다. 예를 들어, $tumblingWindow에서 $hoppingWindow로 변경할 수 없으며 그 반대도 마찬가지입니다.

  • 창이 있는 프로세서는 창 재계산의 결과로 일부 데이터를 재처리할 수 있습니다.

Atlas 관리 API는 스트림 프로세서를 수정하기 위한 엔드포인트를 제공합니다.

1개의 스트림 프로세서 수정

mongosh v2.3.4 이상이 필요합니다.

sp.<streamprocessor>.modify() 명령을 사용하여 기존 스트림 프로세서를 수정합니다. <streamprocessor>는 현재 스트림 처리 인스턴스에 대해 정의된 중지된 스트림 프로세서의 이름이어야 합니다.

예를 들어 proc01이라는 스트림 프로세서를 수정하려면 다음 명령을 실행하세요.

sp.proc1.modify(<pipeline>, {
resumeFromCheckpoint: bool, // optional
name: string, // optional
dlq: string, // optional
}})
sp.createStreamProcessor("foo", [
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
])
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
]);
sp.foo.start();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test",
config: {
startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000)
}
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.stop();
sp.foo.modify({dlq: {}})
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$replaceRoot: {newRoot: "$fullDocument"}},
{$match: {cost: {$gt: 500}}},
{$tumblingWindow: {
interval: {unit: "day", size: 1},
pipeline: [
{$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}}
]
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.start();

스트림 프로세서를 삭제하려면 다음을 수행합니다.

Atlas Administration API는 스트림 프로세서를 삭제하기 위한 엔드포인트를 제공합니다.

단일 스트림 프로세서 삭제

Atlas UI 에서 스트림 프로세서를 삭제 하려면 Atlas 프로젝트 의 Stream Processing 페이지로 이동한 다음 스트림 처리 인스턴스 의 창에서 Configure 를 클릭하여 인스턴스에 정의된 스트림 프로세서 목록을 확인합니다.

그런 다음 스트림 Delete 프로세서의 () 아이콘을 클릭합니다. 표시되는 확인 대화 상자에서 스트림solarDemo 프로세서()의 이름을 입력하여 삭제 것인지 확인한 다음 Delete 를 클릭합니다.

으로 기존 스트림 sp.processor.drop() mongosh 프로세서를 삭제 하려면 메서드를 사용합니다.

예를 들어 proc01 이라는 스트림 프로세서를 삭제하려면 다음 명령을 실행합니다.

sp.proc01.drop()

이 메서드는 다음을 반환합니다.

  • true 스트림 프로세서가 존재하는 경우.

  • false 스트림 프로세서가 존재하지 않는 경우.

스트림 프로세서를 삭제하면 Atlas Stream Processing이 이에 대해 프로비저닝한 모든 리소스와 저장된 모든 상태가 함께 폐기됩니다.

사용 가능한 모든 스트림 프로세서를 나열하려면 다음을 수행합니다.

Atlas Administration API는 사용 가능한 모든 스트림 프로세서를 나열하는 엔드포인트를 제공합니다.

스트림 프로세서 나열

Atlas UI 에서 스트림 처리 인스턴스 에 대해 정의된 스트림 프로세서 목록을 보려면 Atlas 프로젝트 의 Stream Processing 페이지로 이동하여 스트림 처리 인스턴스 창에서 Configure 를 클릭합니다.

스트림 프로세서 목록과 해당 상태가 표시됩니다.

현재 스트림 처리 인스턴스 에서 사용 가능한 모든 스트림 프로세서를 으로 나열하려면 mongosh sp.listStreamProcessors() 메서드를 사용합니다. 각 스트림 프로세서와 연결된 이름, 시작 시간, 현재 상태, 파이프라인 포함된 문서 목록을 반환합니다. 다음과 같은 구문을 가집니다.

sp.listStreamProcessors(<filter>)

<filter> 목록을 필터링할 필드를 지정하는 문서입니다.

예시

다음 예는 필터링되지 않은 요청의 반환 값을 보여줍니다.

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

동일한 Atlas Stream Processing 인스턴스에서 명령을 다시 실행하여 "running""state" 에 대해 필터링하면 다음 출력이 표시됩니다.

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

기존 스트림 프로세서에서 샘플링된 결과의 배열 STDOUT mongosh사용하여(으)로 반환하려면 메서드를 사용합니다. 예시 들어, 다음 명령은 라는 스트림 sp.processor.sample() 프로세서에서 proc01 샘플링합니다.

sp.proc01.sample()

이 명령은 CTRL-C를 사용하여 취소하거나 반환된 샘플의 크기가 누적 40MB에 도달할 때까지 계속 실행됩니다. 스트림 프로세서는 샘플의 잘못된 문서를 다음 형식의 _dlqMessage 문서로 보고합니다.

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
instanceName: '<instanceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

데드 레터 큐 컬렉션을 정의하지 않고도 이러한 메시지를 사용하여 데이터 위생 문제를 진단할 수 있습니다.

참고

Atlas Stream Processing 45 일 이상 stopped 인 스트림 프로세서의 내부 상태 삭제합니다. 이러한 프로세서를 시작하면 초기 실행 과 동일하게 작동하고 통계를 보고합니다.

스트림 프로세서의 통계를 보려면:

Atlas Administration API는 스트림 프로세서의 통계를 볼 수 있는 엔드포인트를 제공합니다.

단일 스트림 프로세서 가져오기

스트림 프로세서에 대한 모니터링 보려면 Atlas 프로젝트 의 Stream Processing 페이지로 이동하여 Monitoring 탭 엽니다. 그런 다음 페이지 왼쪽 상단의 Stream processor 드롭다운 목록에서 스트림 프로세서를 선택합니다.

을 사용하여 기존 스트림 프로세서의 현재 상태를 요약하는 문서 반환하려면 mongosh sp.processor.stats() 메서드를 사용합니다. 다음과 같은 구문을 가집니다.

sp.<streamprocessor>.stats({options: {<options>}})

여기서 options 은 다음 필드가 있는 선택적 문서입니다.

필드
유형
설명

scale

integer

출력의 항목 크기에 사용할 단위입니다. 기본적으로 Atlas Stream Processing은 항목 크기를 바이트 단위로 표시합니다. KB 단위로 표시하려면 1024scale 을 지정합니다.

verbose

부울

출력 문서의 상세 수준을 지정하는 플래그입니다. true 로 설정하면 출력 문서에는 파이프라인의 각 개별 연산자에 대한 통계를 보고하는 하위 문서가 포함됩니다. 기본값은 false 입니다.

출력 문서에는 다음과 같은 필드가 있습니다.

필드
유형
설명

ns

문자열

스트림 프로세서가 정의된 네임스페이스입니다.

stats

객체

스트림 프로세서의 작동 상태를 설명하는 문서입니다.

stats.name

문자열

스트림 프로세서의 이름입니다.

stats.status

문자열

스트림 프로세서의 상태입니다. 이 필드는 다음과 같은 값을 가질 수 있습니다:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor

integer

크기 필드가 표시되는 배율입니다. 1 로 설정하면 크기가 바이트 단위로 표시됩니다. 1024 로 설정하면 크기가 킬로바이트 단위로 표시됩니다.

stats.inputMessageCount

integer

스트림에 게시된 문서 수입니다. 문서가 전체 파이프라인을 통과할 때가 아니라 $source 단계를 통과하면 스트림에 '게시된' 것으로 간주됩니다.

stats.inputMessageSize

integer

스트림에 게시된 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과할 때가 아니라 $source 단계를 통과한 후에 스트림에 '게시된' 것으로 간주됩니다.

stats.outputMessageCount

integer

스트림에서 처리한 문서 수입니다. 문서가 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다.

stats.outputMessageSize

integer

스트림에서 처리한 바이트 또는 킬로바이트의 수입니다. 바이트는 전체 파이프라인을 통과하면 스트림에서 '처리'된 것으로 간주됩니다.

stats.dlqMessageCount

integer

stats.dlqMessageSize

integer

stats.changeStreamTimeDifferenceSecs

integer

가장 최근의 변경 스트림 재개 토큰이 나타내는 이벤트 시간과 oplog의 최신 이벤트 간의 차이(초)입니다.

stats.changeStreamState

token

가장 최근의 변경 스트림 재개 토큰입니다. 변경 스트림 소스가 있는 스트림 프로세서에만 적용됩니다.

stats.latency

문서

스트림 프로세서 전체에 대한 지연 시간 통계입니다. Atlas Stream Processing verbose 옵션을 전달한 경우에만 이 필드 반환합니다.

stats.latency.p50

integer

지난 30 초 동안 처리된 모든 문서의 예상 50번째 백분위 지연 시간 . 파이프라인 에 창 단계가 포함된 경우 지연 시간 시간 측정에는 창 간격이 포함됩니다.

예시 를 들어 $tumblingWindow 단계의 간격이 5 분인 경우 지연 시간 시간 측정에는 해당 5 분이 포함됩니다.

stats.latency.p99

integer

지난 30 초 동안 처리된 모든 문서의 예상 99번째 백분위 지연 시간 . 파이프라인 에 창 단계가 포함된 경우 지연 시간 시간 측정에는 창 간격이 포함됩니다.

예시 를 들어 $tumblingWindow 단계의 간격이 5 분인 경우 지연 시간 시간 측정에는 해당 5 분이 포함됩니다.

stats.latency.start

datetime

가장 최근의 30 초 측정 창 시작된 실제 시간입니다.

stats.latency.end

datetime

가장 최근의 30 초 측정 창 종료된 실제 시간입니다.

stats.latency.unit

문자열

지연 시간 시간이 계산되는 시간 단위입니다. 이 값은 항상 microseconds입니다.

stats.latency.count

integer

가장 최근의 30 초 측정 창 에서 스트림 프로세서가 처리한 문서 수입니다.

stats.latency.sum

integer

가장 최근의 30 초 측정 창 에서 수행된 모든 개별 지연 시간 측정의 합계(마이크로초)입니다.

stats.stateSize

integer

Windows에서 프로세서 상태를 저장하는 데 사용하는 바이트 수입니다.

stats.watermark

integer

현재 워터마크의 타임스탬프입니다.

stats.operatorStats

배열

프로세서 파이프라인의 각 연산자에 대한 통계입니다. Atlas Stream Processing은 verbose 옵션을 전달한 경우에만 이 필드를 반환합니다.

stats.operatorStats 다양한 핵심 stats 필드의 연산자별 버전을 제공합니다.

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.latency

  • stats.operatorStats.stateSize

stats.operatorStats 에는 고유 필드가 포함되어 있습니다:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTimeMillis

stats.operatorStats 에는 verbose 옵션을 전달하고 프로세서에 창 단계가 포함된 경우 다음 필드도 포함되어 있습니다.

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats.maxMemoryUsage

integer

연산자의 최대 메모리 사용량(바이트 또는 킬로바이트)입니다.

stats.operatorStats.executionTimeSecs

integer

연산자의 총 실행 시간(초)입니다.

stats.minOpenWindowStartTime

날짜

최소 열린 창의 시작 시간입니다. 이 값은 선택 사항입니다.

stats.maxOpenWindowStartTime

날짜

최대 열린 창의 시작 시간입니다. 이 값은 선택 사항입니다.

stats.kafkaPartitions

배열

stats.kafkaPartitions.partition

integer

Apache Kafka 주제 파티션 번호입니다.

stats.kafkaPartitions.currentOffset

integer

지정된 파티션에 대해 스트림 프로세서가 있는 오프셋입니다. 이 값은 스트림 프로세서가 처리한 이전 오프셋에 1 을 더한 것과 같습니다.

stats.kafkaPartitions.checkpointOffset

integer

스트림 프로세서가 Apache Kafka 브로커에 마지막으로 커밋한 오프셋과 지정된 파티션의 체크포인트 . 이 오프셋을 통한 모든 메시지는 마지막 체크포인트 에 기록됩니다.

stats.kafkaPartitions.isIdle

부울

파티션이 유휴 상태인지 여부를 나타내는 플래그입니다. 이 값의 기본값은 false입니다.

예를 들어 다음은 항목 크기가 KB 단위로 표시되는 inst01 Atlas Stream Processing 인스턴스에서 proc01 스트림 프로세서의 상태를 보여줍니다.

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

돌아가기

VPC 연결 관리

이 페이지의 내용