Docs Menu
Docs Home
/
Atlas
/

Atlas Stream Processing 시작하기

이 튜토리얼에서는 Atlas Stream Processing을 설정하고 첫 번째 스트림 프로세서를 실행하는 단계를 안내합니다.

이 튜토리얼을 완료하려면 다음이 필요합니다.

  • 빈 클러스터 가 있는 Atlas 프로젝트 입니다. 이 클러스터 스트림 프로세서의 데이터 싱크 역할을 합니다.

  • 스트림 프로세서를 생성하고 실행 있는 atlasAdmin 역할 있는 데이터베이스 사용자

  • mongosh 버전 2.0 이상

  • 스트림 처리 인스턴스 및 연결 레지스트리를 관리하기 위한 Project Owner 또는 Project Stream Processing Owner 역할이 있는 Atlas 사용자

    참고

    Project Owner 역할이 있으면 데이터베이스 배포를 만들고, 프로젝트 액세스 및 프로젝트 설정을 관리하고, IP 액세스 목록 항목을 관리하는 등의 작업을 수행할 수 있습니다.

    Project Stream Processing Owner 역할은 스트림 처리 인스턴스 보기, 생성, 삭제 및 편집, 연결 레지스트리에 연결 보기, 추가, 수정 및 삭제와 같은 Atlas Stream Processing 작업을 활성화합니다.

    두 역할 간의 차이에 대한 자세한 내용은 프로젝트 역할을 참조하세요.

이 튜토리얼에서는 스트림 처리 인스턴스 만들고, 기존 Atlas cluster 에 연결하고, 태양열 스트리밍 장치에서 샘플 데이터를 수집하고 연결된 클러스터 에 데이터를 쓰기 (write) 스트림 프로세서를 설정하는 방법을 안내합니다.

1
  1. Atlas 에서 프로젝트의 Stream Processing 페이지로 Go 합니다.

    경고: 탐색 기능 개선 작업 진행 중

    현재 새롭고 향상된 탐색 환경을 출시하고 있습니다. 다음 단계가 Atlas UI 의 보기와 일치하지 않는 경우 미리 보기 설명서를 참조하세요.

    1. 아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 프로젝트가 포함된 조직을 선택합니다.

    2. 아직 표시되지 않은 경우 내비게이션 바의 Projects 메뉴에서 프로젝트를 선택합니다.

    3. 사이드바에서 Services 제목 아래의 Stream Processing를 클릭합니다.

      스트림 처리 페이지가 표시됩니다.

  2. Create a workspace를 클릭합니다.

  3. Create a stream processing instance 페이지에서 다음과 같이 인스턴스를 구성합니다.

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. Create를 클릭합니다.

2

연결 레지스트리에 기존의 빈 Atlas cluster 에 대한 연결을 추가합니다. 스트림 프로세서는 이 연결을 스트리밍 데이터 싱크로 사용합니다.

  1. Atlas Stream Processing 인스턴스의 창에서 Configure 을(를) 클릭합니다.

  2. Connection Registry 탭에서 오른쪽 상단의 + Add Connection 을 클릭합니다.

  3. Connection Type 드롭다운 목록에서 Atlas Database을 클릭합니다.

  4. Connection Name 필드에 mongodb1를 입력합니다.

  5. Atlas Cluster 드롭다운 목록에서 데이터가 저장되지 않은 Atlas cluster 선택합니다.

  6. Execute as 드롭다운 목록에서 Read and write to any database을 선택합니다.

  7. Add connection를 클릭합니다.

3

스트림 처리 인스턴스는 sample_stream_solar라는 샘플 데이터 소스에 대한 연결로 미리 구성되어 제공됩니다. 이 소스는 다양한 태양광 발전 장치에서 보고서 스트림을 생성합니다. 각 보고서는 특정 시점에서 단일 태양광 장치의 관찰된 전력량과 온도, 해당 장치의 최대 전력량을 설명합니다.

다음 문서 이 데이터 소스 의 보고서를 나타냅니다.

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
}
}

이 소스가 메시지를 내보내는지 확인하려면 을 사용하여 대화형으로 스트림 프로세서를 생성합니다.mongosh

  1. Atlas Stream Processing 인스턴스에 연결합니다.

    Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh을(를) 사용하여 연결합니다.

    1. Atlas Stream Processing 인스턴스의 창에서 Connect 을(를) 클릭합니다.

    2. Connect to your instance 대화 상자에서 Shell 탭 선택합니다.

    3. 대화 상자에 표시된 연결 문자열 복사합니다. 형식은 다음과 같으며, 여기서 <atlas-stream-processing-url> 은 스트림 처리 인스턴스 의 URL 이고 <username>atlasAdmin 역할 가진 데이터베이스 사용자의 사용자 이름 입니다.

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 연결 문자열 터미널에 붙여넣고 <password> 자리 표시자를 사용자의 자격 증명 으로 바꿉니다.

      Enter 키를 눌러 실행 하고 Stream Processing 인스턴스 에 연결합니다.

  2. mongosh 프롬프트에서 sp.process() 메서드를 사용하여 대화형으로 스트림 프로세서를 생성합니다.

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    sample_stream_solar 연결의 데이터가 콘솔에 표시되는지 확인하고 프로세스를 종료합니다.

    sp.process() 으)로 생성한 스트림 프로세서는 종료한 후에는 유지되지 않습니다.

4

영구 스트림 프로세서는 프로세서를 삭제할 때까지 스트리밍 데이터를 지속적으로 수집, 처리 및 지정된 데이터 싱크에 씁니다. 다음 스트림 프로세서는 초 간격으로 각 태양광 발전 장치의 최대 온도와 평균, 최대, 최소 와트수를 10파생한 다음 결과를 연결된 빈 클러스터 에 쓰는 집계 파이프라인 입니다.

Atlas UI 또는 mongosh:를 사용하여 스트림 프로세서를 생성하려면 다음 탭 중 하나를 선택합니다.

Atlas UI 에서 스트림 프로세서를 만들려면 Atlas 프로젝트 의 Stream Processing 페이지로 고 (Go) Stream Processing 인스턴스 창에서 Configure 를 클릭합니다. 그런 다음 시각적 빌더 또는 JSON 편집기 중 하나를 사용하여 solarDemo라는 스트림 프로세서를 구성합니다.

  1. Create with visual builder를 클릭합니다.

    스트림 프로세서를 구성할 수 있는 양식이 포함된 Visual Builder가 열립니다.

  2. Stream processor name 필드에 solarDemo를 입력합니다.

  3. Source 필드 의 Connection 드롭다운 목록에서 sample_stream_solar 를 선택합니다.

    이렇게 하면 집계 파이프라인에 다음 $source 단계가 추가됩니다.

    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    }
  4. $tumblingWindow 단계를 구성합니다.

    Start building your pipeline 창에서 + Custom stage 을 클릭하고 다음 JSON 복사하여 표시되는 텍스트 상자에 붙여넣습니다. 이는 $tumblingWindow 중첩된 $group 단계가 있는 단계를 정의하여 10초 간격으로 각 태양광 발전 장치의 최대 온도와 최대, 최소, 평균 와트수를 파생합니다.

    예시 $group 단계에서 max_watts 값을 계산할 때 이전 10 초 동안 특정 group_id 가 수집된 모든 문서의 obs.watts 값에서 최대값을 추출합니다.

    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [ {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }]
    }
    }
  5. Sink 필드 의 Connection 드롭다운 목록에서 mongodb1 를 선택합니다.

    표시되는 텍스트 상자에 다음 JSON 복사하여 붙여넣습니다. 이렇게 하면 처리된 스트리밍 데이터를 연결된 Atlas cluster 의 solarDb 데이터베이스 에 있는 solarColl 컬렉션 에 쓰는 $merge 단계가 구성됩니다.

    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
  6. Create stream processor를 클릭합니다.

    스트림 프로세서가 생성되어 Stream Processing 페이지의 Stream Processors 탭 에 나열됩니다.

  1. Use JSON editor를 클릭합니다.

    JSON 편집기가 열리고 스트림 프로세서를 JSON 형식으로 구성할 수 있는 텍스트 상자가 표시됩니다.

  2. 스트림 프로세서를 정의합니다.

    다음 JSON 정의를 복사하여 JSON 편집기 텍스트 상자에 붙여넣어 solarDemo라는 스트림 프로세서를 정의합니다. 이 스트림 프로세서는 중첩된$group단계가 있는$tumblingWindow단계를 사용하여 10초 간격으로 각 태양광 발전 장치의 최대 온도와 최대, 최소, 평균 와트수를 파생한 다음 결과를 다음 컬렉션 에 씁니다. 연결된 Atlas cluster 의 solarDb 데이터베이스 에 있는 solarColl.

    예시 $group 단계에서 max_watts 값을 계산할 때 이전 10 초 동안 특정 group_id 가 수집된 모든 문서의 obs.watts 값에서 최대값을 추출합니다.

    {
    "name": "solarDemo",
    "pipeline": [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
    ]
    }
    [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "avg_watts": {
    "$avg": "$obs.watts"
    },
    "max_temp": {
    "$avg": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "coll": "solarColl",
    "connectionName": "mongodb1",
    "db": "solarDb"
    }
    }
    }
    ]

mongosh 에서 다음 명령을 실행하여 solarDemo라는 영구 스트림 프로세서를 생성합니다.

  1. Atlas Stream Processing 인스턴스에 연결합니다.

    Atlas Stream Processing 인스턴스와 연결된 연결 string 을 사용하여 mongosh을(를) 사용하여 연결합니다.

    1. Atlas Stream Processing 인스턴스의 창에서 Connect 을(를) 클릭합니다.

    2. Connect to your instance 대화 상자에서 Shell 탭 선택합니다.

    3. 대화 상자에 표시된 연결 문자열 복사합니다. 형식은 다음과 같으며, 여기서 <atlas-stream-processing-url> 은 스트림 처리 인스턴스 의 URL 이고 <username>atlasAdmin 역할 가진 데이터베이스 사용자의 사용자 이름 입니다.

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 연결 문자열 터미널에 붙여넣고 <password> 자리 표시자를 사용자의 자격 증명 으로 바꿉니다.

      Enter 키를 눌러 실행 하고 Stream Processing 인스턴스 에 연결합니다.

  2. $source 단계를 구성합니다.

    sample_stream_solar 소스에서 데이터를 수집하는 $source 단계에 대한 변수를 정의합니다.

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  3. $group 단계를 구성합니다.

    group_id에 따라 각 태양광 발전 장치의 최대 온도와 평균, 최대 및 최소 와트수를 파생하는 $group 단계에 대한 변수를 정의합니다.

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $max: "$obs.temp"
    },
    avg_watts: {
    $avg: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  4. $tumblingWindow 단계를 구성합니다.

    스트리밍 데이터에 대해 $group 와 같은 축적을 수행하기 위해 Atlas Stream Processing은 Windows 를 사용하여 데이터 세트를 바인딩합니다. 스트림 연속적인 10초 간격으로 분리하는 $tumblingWindow 단계에 대한 변수를 정의합니다.

    예시 $group 단계에서 max_watts 값을 계산할 때 이전 10 초 동안 특정 group_id 가 수집된 모든 문서의 obs.watts 값에서 최대값을 추출합니다.

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  5. $merge 단계를 구성합니다.

    연결된 Atlas cluster 의 solarDb 데이터베이스 에 있는 solarColl 컬렉션 에 처리된 스트리밍 데이터를 쓰는 $merge 단계에 대한 변수를 정의합니다.

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  6. 스트림 프로세서를 생성합니다.

    sp.createStreamProcessor() 메서드를 사용하여 새 스트림 프로세서에 이름을 할당하고 해당 집계 파이프라인 선언합니다. $group 단계는 $tumblingWindow의 중첩된 파이프라인 에 속하며, 프로세서 파이프라인 정의에 포함해서는 안 됩니다.

    sp.createStreamProcessor("solarDemo", [s, t, m])

    이는 solarDemo라는 스트림 프로세서를 생성하여 이전에 정의한 쿼리를 적용하고 처리된 데이터를 연결된 클러스터의 solarDb 데이터베이스의 solarColl 컬렉션에 기록합니다. 이 프로세서는 태양광 장치로부터의 관측 데이터를 10초 간격으로 나누어 도출한 다양한 측정값을 반환합니다.

    Atlas Stream Processing 미사용 데이터 데이터베이스에 쓰는 방법에 대해 자세히 학습하려면 $merge (스트림 처리)를 참조하세요.

5

스트림 처리 인스턴스 의 스트림 프로세서 목록에서 스트림 프로세서의 Start 아이콘을 클릭합니다.

sp.processor.start() 에서 메서드를 mongosh 사용합니다.

sp.solarDemo.start()
6

스트림 프로세서가 Atlas 클러스터에 데이터를 쓰고 있는지 확인하려면 다음을 수행하세요.

  1. Atlas에서 프로젝트의 Clusters 페이지로 이동합니다.

    경고: 탐색 기능 개선 작업 진행 중

    현재 새롭고 향상된 탐색 환경을 출시하고 있습니다. 다음 단계가 Atlas UI 의 보기와 일치하지 않는 경우 미리 보기 설명서를 참조하세요.

    1. 아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 원하는 프로젝트가 포함된 조직을 선택합니다.

    2. 아직 표시되지 않은 경우 탐색 표시줄의 Projects 메뉴에서 원하는 프로젝트를 선택합니다.

    3. 아직 표시되지 않은 경우 사이드바에서 Clusters를 클릭합니다.

      Clusters(클러스터) 페이지가 표시됩니다.

  2. cluster의 Browse Collections 버튼을 클릭합니다.

    데이터 탐색기 가 표시됩니다.

  3. MySolar 컬렉션을 확인하세요.

프로세서가 활성 상태인지 mongosh 확인하려면: sp.processor.stats()에서 메서드를 사용합니다.

sp.solarDemo.stats()

이 메서드는 solarDemo 스트림 프로세서의 작동 통계를 보고합니다.

의 메서드를 사용하여 터미널에서 처리된 sp.processor.sample() 문서의 샘플링을 반환할 수도 있습니다.mongosh

sp.solarDemo.sample()
{
_id: 10,
max_temp: 16,
avg_watts: 232,
max_watts: 414,
min_watts: 73
}

참고

앞의 출력은 대표적인 예시 입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 고유한 문서를 볼 수 있습니다.

7

스트림 처리 인스턴스의 스트림 프로세서 목록에서 스트림 프로세서의 Delete () 아이콘을 클릭합니다.

표시되는 확인 대화 상자에서 스트림 프로세서(solarDemo)의 이름을 입력하여 삭제 것인지 확인한 다음 Delete을(를) 클릭합니다.

에서 메서드를 sp.processor.drop() 사용하여 mongosh 을(를) 삭제합니다.solarDemo

sp.solarDemo.drop()

solarDemo 삭제를 확인하려면 sp.listStreamProcessors() 메서드를 사용하여 사용 가능한 모든 스트림 프로세서를 나열합니다.

sp.listStreamProcessors()

방법 알아보기:

돌아가기

개요

이 페이지의 내용