이 튜토리얼에서는 Atlas Stream Processing을 설정하고 첫 번째 스트림 프로세서를 실행하는 단계를 안내합니다.
전제 조건
이 튜토리얼을 완료하려면 다음이 필요합니다.
An Atlas project with an empty cluster. This cluster serves as the data sink for your stream processor.
A database user with the
atlasAdmin
role to create and run stream processorsmongosh
버전 2.0 이상스트림 처리 인스턴스 및 연결 레지스트리를 관리하기 위한
Project Owner
또는Project Stream Processing Owner
역할이 있는 Atlas 사용자참고
Project Owner
역할이 있으면 데이터베이스 배포를 만들고, 프로젝트 액세스 및 프로젝트 설정을 관리하고, IP 액세스 목록 항목을 관리하는 등의 작업을 수행할 수 있습니다.Project Stream Processing Owner
역할은 스트림 처리 인스턴스 보기, 생성, 삭제 및 편집, 연결 레지스트리에 연결 보기, 추가, 수정 및 삭제와 같은 Atlas Stream Processing 작업을 활성화합니다.두 역할 간의 차이에 대한 자세한 내용은 프로젝트 역할을 참조하세요.
절차
This tutorial guides you through creating an stream processing instance, connecting it to an existing Atlas cluster, and setting up a stream processor to ingest sample data from solar streaming devices and write the data to your connected cluster.
Atlas Stream Processing 인스턴스를 생성합니다.
Atlas 에서 프로젝트의 Stream Processing 페이지로 Go 합니다.
경고: 탐색 기능 개선 작업 진행 중
현재 새롭고 향상된 탐색 환경을 출시하고 있습니다. 다음 단계가 Atlas UI 의 보기와 일치하지 않는 경우 미리 보기 설명서를 참조하세요.
아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 프로젝트가 포함된 조직을 선택합니다.
아직 표시되지 않은 경우 내비게이션 바의 Projects 메뉴에서 프로젝트를 선택합니다.
사이드바에서 Services 제목 아래의 Stream Processing를 클릭합니다.
스트림 처리 페이지가 표시됩니다.
Create a workspace를 클릭합니다.
Create a stream processing instance 페이지에서 다음과 같이 인스턴스를 구성합니다.
Tier:
SP30
Provider:
AWS
Region:
us-east-1
Instance Name:
tutorialInstance
Create를 클릭합니다.
Add a sink connection to the connection registry.
Add a connection to an existing empty Atlas cluster to your connection registry. Your stream processor will use this connection as a streaming data sink.
Atlas Stream Processing 인스턴스의 창에서 Configure 을(를) 클릭합니다.
Connection Registry 탭에서 오른쪽 상단의 + Add Connection 을 클릭합니다.
From the Connection Type drop-down list, click Atlas Database.
Connection Name 필드에
mongodb1
를 입력합니다.From the Atlas Cluster drop-down list, select an Atlas cluster without any data stored on it.
From the Execute as drop-down list, select Read and write to any database.
Add connection를 클릭합니다.
스트리밍 데이터 소스가 메시지를 전송하는지 확인합니다.
스트림 처리 인스턴스는 sample_stream_solar
라는 샘플 데이터 소스에 대한 연결로 미리 구성되어 제공됩니다. 이 소스는 다양한 태양광 발전 장치에서 보고서 스트림을 생성합니다. 각 보고서는 특정 시점에서 단일 태양광 장치의 관찰된 전력량과 온도, 해당 장치의 최대 전력량을 설명합니다.
The following document represents a report from this data source:
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 } }
To verify that this source emits messages, create a stream processor interactively using mongosh
:
Atlas Stream Processing 인스턴스에 연결합니다.
Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여
mongosh
을(를) 사용하여 연결합니다.Atlas Stream Processing 인스턴스의 창에서 Connect 을(를) 클릭합니다.
In the Connect to your instance dialog, select the Shell tab.
Copy the connection string displayed in the dialog. It has the following format, where
<atlas-stream-processing-url>
is the URL of your stream processing instance and<username>
is the username of a database user with theatlasAdmin
role:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Paste the connection string into your terminal and replace the
<password>
placeholder with the credentials for the user.Press Enter to run it and connect to your stream processing instance.
In the
mongosh
prompt, use thesp.process()
method to create the stream processor interactively.sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) sample_stream_solar
연결의 데이터가 콘솔에 표시되는지 확인하고 프로세스를 종료합니다.sp.process()
으)로 생성한 스트림 프로세서는 종료한 후에는 유지되지 않습니다.
영구 스트림 프로세서를 생성합니다.
A persistent stream processor continuously ingests, processes, and writes streaming data to a specified data sink until you drop the processor. The following stream processor is an aggregation pipeline that derives the maximum temperature and the average, maximum, and minimum wattages of each solar device across 10-second intervals, then writes the results to your connected empty cluster.
Select one of the following tabs to create a stream processor using the Atlas UI or mongosh
:
To create a stream processor in the Atlas UI, go to the Stream Processing page for your Atlas project and click Configure in the pane for your stream processing instance. Then choose between using the visual builder or the JSON editor to configure a stream processor named solarDemo
:
Create with visual builder를 클릭합니다.
The Visual Builder opens with a form where you can configure your stream processor.
Stream processor name 필드에
solarDemo
를 입력합니다.In the Source field, select
sample_stream_solar
from the Connection drop-down list.This adds the following
$source
stage to your aggregation pipeline:{ "$source": { "connectionName": "sample_stream_solar" } } $tumblingWindow
단계를 구성합니다.In the Start building your pipeline pane, click + Custom stage and copy and paste the following JSON into the text box that appears. This defines a
$tumblingWindow
stage with a nested$group
stage that derives the maximum temperature and the maximum, minimum, and average wattages of each solar device over 10-second intervals.This means, for example, that when the
$group
stage computes a value formax_watts
, it extracts the maximum value from theobs.watts
values for all documents with a givengroup_id
ingested in the previous 10 seconds.{ "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } }] } } In the Sink field, select
mongodb1
from the Connection drop-down list.In the text box that appears, copy and paste the following JSON. This configures a
$merge
stage that writes the processed streaming data to a collection namedsolarColl
in thesolarDb
database of your connected Atlas cluster:{ "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } Create stream processor를 클릭합니다.
The stream processor is created and listed on the Stream Processors tab of the Stream Processing page.
Use JSON editor를 클릭합니다.
The JSON editor opens with a text box where you can configure your stream processor in JSON format.
Define the stream processor.
Copy and paste the following JSON definition into the JSON editor text box to define a stream processor named
solarDemo
. This stream processor uses a$tumblingWindow
stage with a nested$group
stage to derive the maximum temperature and the maximum, minimum, and average wattages of each solar device over 10-second intervals, then writes the results to a collection namedsolarColl
in thesolarDb
database of your connected Atlas cluster.This means, for example, that when the
$group
stage computes a value formax_watts
, it extracts the maximum value from theobs.watts
values for all documents with a givengroup_id
ingested in the previous 10 seconds.{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] } [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "avg_watts": { "$avg": "$obs.watts" }, "max_temp": { "$avg": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "coll": "solarColl", "connectionName": "mongodb1", "db": "solarDb" } } } ]
Run the following commands in mongosh
to create a persistent stream processor named solarDemo
:
Atlas Stream Processing 인스턴스에 연결합니다.
Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여
mongosh
을(를) 사용하여 연결합니다.Atlas Stream Processing 인스턴스의 창에서 Connect 을(를) 클릭합니다.
In the Connect to your instance dialog, select the Shell tab.
Copy the connection string displayed in the dialog. It has the following format, where
<atlas-stream-processing-url>
is the URL of your stream processing instance and<username>
is the username of a database user with theatlasAdmin
role:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Paste the connection string into your terminal and replace the
<password>
placeholder with the credentials for the user.Press Enter to run it and connect to your stream processing instance.
$source
단계를 구성합니다.Define a variable for a
$source
stage that ingests data from thesample_stream_solar
source.let s = { source: { connectionName: "sample_stream_solar" } } $group
단계를 구성합니다.Define a variable for a
$group
stage that derives the maximum temperature and the average, maximum, and minimum wattages of each solar device according to itsgroup_id
.let g = { group: { _id: "$group_id", max_temp: { $max: "$obs.temp" }, avg_watts: { $avg: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } $tumblingWindow
단계를 구성합니다.In order to perform accumulations such as
$group
on streaming data, Atlas Stream Processing uses windows to bound the data set. Define a variable for a$tumblingWindow
stage that separates the stream into consecutive 10-second intervals.This means, for example, that when the
$group
stage computes a value formax_watts
, it extracts the maximum value from theobs.watts
values for all documents with a givengroup_id
ingested in the previous 10 seconds.let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } $merge 단계를 구성합니다.
Define a variable for a
$merge
stage that writes the processed streaming data to a collection namedsolarColl
in thesolarDb
database of your connected Atlas cluster.let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } 스트림 프로세서를 생성합니다.
Use the
sp.createStreamProcessor()
method to assign a name to your new stream processor and declare its aggregation pipeline. The$group
stage belongs to the nested pipeline of the$tumblingWindow
, and you must not include it in the processor pipeline definition.sp.createStreamProcessor("solarDemo", [s, t, m]) 이는
solarDemo
라는 스트림 프로세서를 생성하여 이전에 정의한 쿼리를 적용하고 처리된 데이터를 연결된 클러스터의solarDb
데이터베이스의solarColl
컬렉션에 기록합니다. 이 프로세서는 태양광 장치로부터의 관측 데이터를 10초 간격으로 나누어 도출한 다양한 측정값을 반환합니다.Atlas Stream Processing 미사용 데이터 데이터베이스에 쓰는 방법에 대해 자세히 학습하려면
$merge
(스트림 처리)를 참조하세요.
스트림 프로세서를 시작합니다.
In the list of stream processors for your stream processing instance, click the Start icon for your stream processor.
Use the sp.processor.start()
method in mongosh
:
sp.solarDemo.start()
스트림 프로세서의 출력을 확인합니다.
스트림 프로세서가 Atlas 클러스터에 데이터를 쓰고 있는지 확인하려면 다음을 수행하세요.
Atlas에서 프로젝트의 Clusters 페이지로 이동합니다.
경고: 탐색 기능 개선 작업 진행 중
현재 새롭고 향상된 탐색 환경을 출시하고 있습니다. 다음 단계가 Atlas UI 의 보기와 일치하지 않는 경우 미리 보기 설명서를 참조하세요.
아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 원하는 프로젝트가 포함된 조직을 선택합니다.
아직 표시되지 않은 경우 탐색 표시줄의 Projects 메뉴에서 원하는 프로젝트를 선택합니다.
아직 표시되지 않은 경우 사이드바에서 Clusters를 클릭합니다.
Clusters(클러스터) 페이지가 표시됩니다.
cluster의 Browse Collections 버튼을 클릭합니다.
데이터 탐색기 가 표시됩니다.
MySolar
컬렉션을 확인하세요.
To verify that the processor is active, use the sp.processor.stats()
method in mongosh
:
sp.solarDemo.stats()
This method reports operational statistics of the solarDemo
stream processor.
You can also use the sp.processor.sample()
method in mongosh
to return a sampling of processed documents in the terminal.
sp.solarDemo.sample()
{ _id: 10, max_temp: 16, avg_watts: 232, max_watts: 414, min_watts: 73 }
참고
The preceding output is a representative example. Streaming data are not static, and each user sees distinct documents.
스트림 프로세서를 삭제합니다.
In the list of stream processors for your stream processing instance, click the Delete () icon for your stream processor.
In the confirmation dialog that appears, type the name of the stream processor (solarDemo
) to confirm that you want to delete it, and then click Delete.
Use the sp.processor.drop()
method in mongosh
to drop solarDemo
:
sp.solarDemo.drop()
To confirm that you have dropped solarDemo
, use the sp.listStreamProcessors()
method to list all your available stream processors:
sp.listStreamProcessors()
다음 단계
방법 알아보기: