이 튜토리얼에서는 Atlas Stream Processing을 설정하고 첫 번째 스트림 프로세서를 실행하는 단계를 안내합니다.
전제 조건
이 튜토리얼을 완료하려면 다음이 필요합니다.
빈 클러스터 가 있는 Atlas 프로젝트 입니다. 이 클러스터 스트림 프로세서의 데이터 싱크 역할을 합니다.
스트림 프로세서를 생성하고 실행 있는
atlasAdmin역할 있는 데이터베이스 사용자An Atlas user with the
Project Owneror theProject Stream Processing Ownerrole to manage a Stream Processing Workspace and Connection Registry참고
Project Owner역할이 있으면 데이터베이스 배포를 만들고, 프로젝트 액세스 및 프로젝트 설정을 관리하고, IP 액세스 목록 항목을 관리하는 등의 작업을 수행할 수 있습니다.The
Project Stream Processing Ownerrole enables Atlas Stream Processing actions such as viewing, creating, deleting, and editing stream processing workspaces, and viewing, adding, modifying, and deleting connections in the connection registry.두 역할 간의 차이에 대한 자세한 내용은 프로젝트 역할을 참조하세요.
절차
This tutorial guides you through creating an stream processing workspace, connecting it to an existing Atlas cluster, and setting up a stream processor to ingest sample data from solar streaming devices and write the data to your connected cluster.
연결 레지스트리에 싱크 연결을 추가합니다.
연결 레지스트리에 기존의 빈 Atlas cluster 에 대한 연결을 추가합니다. 스트림 프로세서는 이 연결을 스트리밍 데이터 싱크로 사용합니다.
In the pane for your stream processing workspace, click Configure.
Connection Registry 탭에서 오른쪽 상단의 + Add Connection 을 클릭합니다.
Connection Type 드롭다운 목록에서 Atlas Database을 클릭합니다.
Connection Name 필드에
mongodb1를 입력합니다.Atlas Cluster 드롭다운 목록에서 데이터가 저장되지 않은 Atlas cluster 선택합니다.
Execute as 드롭다운 목록에서 Read and write to any database을 선택합니다.
Add connection를 클릭합니다.
스트리밍 데이터 소스가 메시지를 전송하는지 확인합니다.
Your stream processing workspace comes preconfigured with a connection to a sample data source called sample_stream_solar. This source generates a stream of reports from various solar power devices. Each report describes the observed wattage and temperature of a single solar device at a specific point in time, as well as that device's maximum wattage.
다음 문서 이 데이터 소스 의 보고서를 나타냅니다.
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 } }
이 소스가 메시지를 내보내는지 확인하려면 을 사용하여 대화형으로 스트림 프로세서를 생성합니다.mongosh
Connect to your stream processing workspace.
Use the connection string associated with your stream processing workspace to connect using
mongosh.In the pane for your stream processing workspace, click Connect.
In the workspace connection dialog, click Choose a connection method, then select the Shell tab.
Copy the connection string displayed in the dialog. It has the following format, where
<atlas-stream-processing-url>is the URL of your stream processing workspace and<username>is the username of a database user with theatlasAdminrole:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 연결 문자열 터미널에 붙여넣고
<password>자리 표시자를 사용자의 자격 증명 으로 바꿉니다.Press Enter to run it and connect to your stream processing workspace.
mongosh프롬프트에서sp.process()메서드를 사용하여 대화형으로 스트림 프로세서를 생성합니다.sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) sample_stream_solar연결의 데이터가 콘솔에 표시되는지 확인하고 프로세스를 종료합니다.sp.process()으)로 생성한 스트림 프로세서는 종료한 후에는 유지되지 않습니다.
영구 스트림 프로세서를 생성합니다.
영구 스트림 프로세서는 프로세서를 삭제할 때까지 스트리밍 데이터를 지속적으로 수집, 처리 및 지정된 데이터 싱크에 씁니다. 다음 스트림 프로세서는 초 간격으로 각 태양광 발전 장치의 최대 온도와 평균, 최대, 최소 와트수를 10파생한 다음 결과를 연결된 빈 클러스터 에 쓰는 집계 파이프라인 입니다.
Atlas UI 또는 mongosh:를 사용하여 스트림 프로세서를 생성하려면 다음 탭 중 하나를 선택합니다.
To create a stream processor in the Atlas UI, go to the Stream Processing page for your Atlas project and click Configure in the pane for your stream processing workspace. Then choose between using the visual builder or the JSON editor to configure a stream processor named solarDemo:
Create with visual builder를 클릭합니다.
스트림 프로세서를 구성할 수 있는 양식이 포함된 Visual Builder가 열립니다.
Stream processor name 필드에
solarDemo를 입력합니다.Source 필드 의 Connection 드롭다운 목록에서
sample_stream_solar를 선택합니다.이렇게 하면 집계 파이프라인에 다음
$source단계가 추가됩니다.{ "$source": { "connectionName": "sample_stream_solar" } } $tumblingWindow단계를 구성합니다.Start building your pipeline 창에서 + Custom stage 을 클릭하고 다음 JSON 복사하여 표시되는 텍스트 상자에 붙여넣습니다. 이는
$tumblingWindow중첩된$group단계가 있는 단계를 정의하여 10초 간격으로 각 태양광 발전 장치의 최대 온도와 최대, 최소, 평균 와트수를 파생합니다.예시
$group단계에서max_watts값을 계산할 때 이전 10 초 동안 특정group_id가 수집된 모든 문서의obs.watts값에서 최대값을 추출합니다.{ "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } }] } } Sink 필드 의 Connection 드롭다운 목록에서
mongodb1를 선택합니다.표시되는 텍스트 상자에 다음 JSON 복사하여 붙여넣습니다. 이렇게 하면 처리된 스트리밍 데이터를 연결된 Atlas cluster 의
solarDb데이터베이스 에 있는solarColl컬렉션 에 쓰는$merge단계가 구성됩니다.{ "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } Create stream processor를 클릭합니다.
스트림 프로세서가 생성되어 Stream Processing 페이지의 Stream Processors 탭 에 나열됩니다.
Use JSON editor를 클릭합니다.
JSON 편집기가 열리고 스트림 프로세서를 JSON 형식으로 구성할 수 있는 텍스트 상자가 표시됩니다.
스트림 프로세서를 정의합니다.
다음 JSON 정의를 복사하여 JSON 편집기 텍스트 상자에 붙여넣어
solarDemo라는 스트림 프로세서를 정의합니다. 이 스트림 프로세서는 중첩된$group단계가 있는$tumblingWindow단계를 사용하여 10초 간격으로 각 태양광 발전 장치의 최대 온도와 최대, 최소, 평균 와트수를 파생한 다음 결과를 다음 컬렉션 에 씁니다. 연결된 Atlas cluster 의solarDb데이터베이스 에 있는solarColl.예시
$group단계에서max_watts값을 계산할 때 이전 10 초 동안 특정group_id가 수집된 모든 문서의obs.watts값에서 최대값을 추출합니다.{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] } [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "avg_watts": { "$avg": "$obs.watts" }, "max_temp": { "$avg": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "coll": "solarColl", "connectionName": "mongodb1", "db": "solarDb" } } } ]
mongosh 에서 다음 명령을 실행하여 solarDemo라는 영구 스트림 프로세서를 생성합니다.
Connect to your stream processing workspace.
Use the connection string associated with your stream processing workspace to connect using
mongosh.In the pane for your stream processing workspace, click Connect.
Connect to your workspace 대화 상자에서 Shell 탭 선택합니다.
Copy the connection string displayed in the dialog. It has the following format, where
<atlas-stream-processing-url>is the URL of your stream processing workspace and<username>is the username of a database user with theatlasAdminrole:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 연결 문자열 터미널에 붙여넣고
<password>자리 표시자를 사용자의 자격 증명 으로 바꿉니다.Press Enter to run it and connect to your stream processing workspace.
$source단계를 구성합니다.sample_stream_solar소스에서 데이터를 수집하는$source단계에 대한 변수를 정의합니다.let s = { source: { connectionName: "sample_stream_solar" } } $group단계를 구성합니다.group_id에 따라 각 태양광 발전 장치의 최대 온도와 평균, 최대 및 최소 와트수를 파생하는$group단계에 대한 변수를 정의합니다.let g = { group: { _id: "$group_id", max_temp: { $max: "$obs.temp" }, avg_watts: { $avg: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } $tumblingWindow단계를 구성합니다.스트리밍 데이터에 대해
$group와 같은 축적을 수행하기 위해 Atlas Stream Processing은 Windows 를 사용하여 데이터 세트를 바인딩합니다. 스트림 연속적인 10초 간격으로 분리하는$tumblingWindow단계에 대한 변수를 정의합니다.예시
$group단계에서max_watts값을 계산할 때 이전 10 초 동안 특정group_id가 수집된 모든 문서의obs.watts값에서 최대값을 추출합니다.let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } $merge 단계를 구성합니다.
연결된 Atlas cluster 의
solarDb데이터베이스 에 있는solarColl컬렉션 에 처리된 스트리밍 데이터를 쓰는$merge단계에 대한 변수를 정의합니다.let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } 스트림 프로세서를 생성합니다.
sp.createStreamProcessor()메서드를 사용하여 새 스트림 프로세서에 이름을 할당하고 해당 집계 파이프라인 선언합니다.$group단계는$tumblingWindow의 중첩된 파이프라인 에 속하며, 프로세서 파이프라인 정의에 포함해서는 안 됩니다.sp.createStreamProcessor("solarDemo", [s, t, m]) 이는
solarDemo라는 스트림 프로세서를 생성하여 이전에 정의한 쿼리를 적용하고 처리된 데이터를 연결된 클러스터의solarDb데이터베이스의solarColl컬렉션에 기록합니다. 이 프로세서는 태양광 장치로부터의 관측 데이터를 10초 간격으로 나누어 도출한 다양한 측정값을 반환합니다.Atlas Stream Processing 미사용 데이터 데이터베이스에 쓰는 방법에 대해 자세히 학습하려면
$merge(스트림 처리)를 참조하세요.
스트림 프로세서를 시작합니다.
In the list of stream processors for your stream processing workspace, click the Start icon for your stream processor.
sp.processor.start() 에서 메서드를 mongosh 사용합니다.
sp.solarDemo.start()
스트림 프로세서의 출력을 확인합니다.
스트림 프로세서가 Atlas 클러스터에 데이터를 쓰고 있는지 확인하려면 다음을 수행하세요.
Atlas에서 프로젝트의 Clusters 페이지로 이동합니다.
아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 원하는 프로젝트가 포함된 조직을 선택합니다.
아직 표시되지 않은 경우 탐색 표시줄의 Projects 메뉴에서 원하는 프로젝트를 선택합니다.
사이드바에서 Database 제목 아래의 Clusters를 클릭합니다.
Clusters(클러스터) 페이지가 표시됩니다.
Atlas에서 프로젝트의 Data Explorer 페이지로 이동합니다.
아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 프로젝트가 포함된 조직을 선택합니다.
아직 표시되지 않은 경우 내비게이션 바의 Projects 메뉴에서 프로젝트를 선택합니다.
사이드바에서 Database 제목 아래의 Data Explorer를 클릭합니다.
데이터 탐색기 가 표시됩니다.
참고
Clusters 페이지로 이동하여 Shortcuts 제목 아래의 Data Explorer 을 클릭할 수도 있습니다.
MySolar컬렉션을 확인하세요.
프로세서가 활성 상태인지 mongosh 확인하려면: sp.processor.stats()에서 메서드를 사용합니다.
sp.solarDemo.stats()
이 메서드는 solarDemo 스트림 프로세서의 작동 통계를 보고합니다.
의 메서드를 사용하여 터미널에서 처리된 sp.processor.sample() 문서의 샘플링을 반환할 수도 있습니다.mongosh
sp.solarDemo.sample()
{ _id: 10, max_temp: 16, avg_watts: 232, max_watts: 414, min_watts: 73 }
참고
앞의 출력은 대표적인 예시 입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 고유한 문서를 볼 수 있습니다.
스트림 프로세서를 삭제합니다.
In the list of stream processors for your stream processing workspace, click the Delete () icon for your stream processor.
표시되는 확인 대화 상자에서 스트림 프로세서(solarDemo)의 이름을 입력하여 삭제 것인지 확인한 다음 Delete을(를) 클릭합니다.
에서 메서드를 sp.processor.drop() 사용하여 mongosh 을(를) 삭제합니다.solarDemo
sp.solarDemo.drop()
solarDemo 삭제를 확인하려면 sp.listStreamProcessors() 메서드를 사용하여 사용 가능한 모든 스트림 프로세서를 나열합니다.
sp.listStreamProcessors()
다음 단계
방법 알아보기: