Docs Menu
Docs Home
/ /

Atlas Stream Processing を使い始める

このチュートリアルでは、Atlas Stream Processing を設定し、最初のストリーム プロセッサを実行する手順について説明します。

Atlas の サンプル データ セット からの映画データを含むコレクションを使用します。

  • 空のクラスターを持つ Atlasプロジェクト。このクラスターは、ストリーム プロセッサのデータ シンクとして機能します。

  • ストリーム プロセッサを作成および実行する、atlasAdmin ロールを持つデータベースユーザー

  • An Atlas user with the Project Owner or the Project Stream Processing Owner role to manage a Stream Processing Workspace and Connection Registry

    注意

    Project Ownerロールでは、データベース配置の作成、プロジェクト アクセスとプロジェクト設定の管理、IP アクセス リスト エントリの管理などを行うことができます。

    The Project Stream Processing Owner role enables Atlas Stream Processing actions such as viewing, creating, deleting, and editing stream processing workspaces, and viewing, adding, modifying, and deleting connections in the connection registry.

    2 つのロールの違いの詳細については、「プロジェクト ロール」を参照してください。

This tutorial guides you through creating an stream processing workspace, connecting it to an existing Atlas cluster, and setting up a stream processor to ingest sample data from solar streaming devices and write the data to your connected cluster.

1
  1. Atlasで、プロジェクトのGo Stream Processing{0 ページに します。

    1. まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。

    2. まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。

    3. サイドバーで、 Streaming Data見出しの下のStream Processingをクリックします。

      Stream Processing ページが表示されます。

  2. [Create a workspace] をクリックします。

  3. On the Create a stream processing workspace page, configure your workspace as follows:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Workspace Name: tutorialWorkspace

  4. [Create] をクリックします。

2

既存の空の Atlas クラスターへの接続を接続レジストリに追加します。ストリーム プロセッサはこの接続をストリーミングデータ シンクとして使用します。

  1. Atlas ストリーム処理ワークスペースの ペインで、Configure をクリックします。

  2. [ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。

  3. Connection Type ドロップダウンリストから、Atlas Database をクリックします。

  4. Connection Nameフィールドにmongodb1と入力します。

  5. Atlas Cluster ドロップダウン リストから、データが保存されていない Atlas クラスターを選択します。

  6. Execute as ドロップダウンリストから、Read and write to any database を選択します。

  7. [Add connection] をクリックします。

3

Your stream processing workspace comes preconfigured with a connection to a sample data source called sample_stream_solar. This source generates a stream of reports from various solar power devices. Each report describes the observed wattage and temperature of a single solar device at a specific point in time, as well as that device's maximum wattage.

次のドキュメントは、このデータソースからのレポートを表します。

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
}
}

このソースが メッセージを発行することを確認するには、 を使用してストリーム プロセッサを対話的に作成します。mongosh

  1. ストリーム処理ワークスペースに接続します。

    Atlas ストリーム処理ワークスペースに関連付けられている接続文字列を使用して接続し、mongosh を使用して接続します。

    1. Atlas ストリーム処理ワークスペースの ペインで、Connect をクリックします。

    2. In the workspace connection dialog, click Choose a connection method, then select the Shell tab.

    3. ダイアログに表示される接続文字列をコピーします。形式は次の<atlas-stream-processing-url> で、<username> はストリーム処理ワークスペースのURL 、atlasAdmin は ロールを持つデータベースユーザーのユーザー名です。

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 接続文字列をターミナルに貼り付け、<password> プレースホルダーをユーザーの認証情報に置き換えます。

      Press Enter to run it and connect to your stream processing workspace.

  2. mongosh プロンプトで、sp.process() メソッドを使用してストリーム プロセッサを対話的に作成します。

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    sample_stream_solar接続からのデータがコンソールに表示されていることを確認し、プロセスを終了します。

    sp.process()を使用して作成したストリーム プロセッサは、終了後に永続することはありません。

4

永続的なストリーム プロセッサは、プロセッサを削除するまで、指定されたデータ シンクにストリーミングデータを継続的に取り込み、処理し、書き込みます。次のストリーム プロセッサは、 秒間隔で各ソート10 デバイスの最大温度と平均、最大、最小出力を出力し、その結果を接続された空のクラスターに書込む集計パイプラインです。

次のいずれかのタブを選択して、Atlas UIまたは mongosh: を使用してストリーム プロセッサを作成します。

To create a stream processor in the Atlas UI, go to the Stream Processing page for your Atlas project and click Configure in the pane for your stream processing workspace. Then choose between using the visual builder or the JSON editor to configure a stream processor named solarDemo:

  1. [Create with visual builder] をクリックします。

    ビジュアル ビルダには、ストリーム プロセッサを設定できるフォームが表示されます。

  2. Stream processor nameフィールドにsolarDemoと入力します。

  3. Sourceフィールドで、Connection ドロップダウンリストから sample_stream_solar を選択します。

    これにより、次の $source ステージが集計パイプラインに追加されます。

    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    }
  4. $tumblingWindowステージを構成します。

    Start building your pipeline ペインで + Custom stage をクリックし、次の JSONをコピーして、表示されるテキストボックスに貼り付けます。これにより、ネストされた$tumblingWindow $groupステージを持つ ステージが定義され、 秒間隔で各ソート デバイスの最大温度と最大書込み保証(write concern)の最大書込み保証(write10 concern)を実現します。

    つまり、例、$group ステージが max_watts の 値を計算する場合、過去 10 秒に特定の group_id が取り込まれたすべてのドキュメントの obs.watts 値から最大値が抽出されます。

    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [ {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }]
    }
    }
  5. Sinkフィールドで、Connection ドロップダウンリストから mongodb1 を選択します。

    表示されるテキストボックスに、次のJSONをコピーして貼り付けます。これにより、処理されたストリーミングデータを接続済みの Atlas クラスターの solarDbデータベース内の solarColl という名前のコレクションに書き込む$mergeステージが構成されます。

    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
  6. [Create stream processor] をクリックします。

    ストリーム プロセッサが作成され、Stream Processing ページの Stream Processorsタブに表示されます。

  1. [Use JSON editor] をクリックします。

    JSONエディターが開き、ストリーム プロセッサをJSON形式で構成できるテキストボックスが表示されます。

  2. ストリーム プロセッサを定義します。

    次のJSON定義をコピーしてJSONエディターのテキストボックスに貼り付け、solarDemo という名前のストリーム プロセッサを定義します。このストリーム プロセッサは、ネストされた$tumblingWindow $groupステージを持つ10 ステージを使用して、 秒間隔で各ソート デバイスの最大値、最小値、平均出力値を求め、その結果をという名前のコレクションに書き込みます。接続されたsolarColl solarDbAtlas クラスターの データベース内の

    つまり、例、$group ステージが max_watts の 値を計算する場合、過去 10 秒に特定の group_id が取り込まれたすべてのドキュメントの obs.watts 値から最大値が抽出されます。

    {
    "name": "solarDemo",
    "pipeline": [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
    ]
    }
    [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "avg_watts": {
    "$avg": "$obs.watts"
    },
    "max_temp": {
    "$avg": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "coll": "solarColl",
    "connectionName": "mongodb1",
    "db": "solarDb"
    }
    }
    }
    ]

mongosh で次のコマンドを実行して、solarDemo という名前の永続ストリーム プロセッサを作成します。

  1. ストリーム処理ワークスペースに接続します。

    Atlas ストリーム処理ワークスペースに関連付けられている接続文字列を使用して接続し、mongosh を使用して接続します。

    1. Atlas ストリーム処理ワークスペースの ペインで、Connect をクリックします。

    2. Connect to your workspace ダイアログで、Shellタブを選択します。

    3. ダイアログに表示される接続文字列をコピーします。形式は次の<atlas-stream-processing-url> で、<username> はストリーム処理ワークスペースのURL 、atlasAdmin は ロールを持つデータベースユーザーのユーザー名です。

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 接続文字列をターミナルに貼り付け、<password> プレースホルダーをユーザーの認証情報に置き換えます。

      Press Enter to run it and connect to your stream processing workspace.

  2. $sourceステージを構成します。

    sample_stream_solar ソースからデータを取り込む $source ステージの変数を定義します。

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  3. $groupステージを構成します。

    $group ステージの変数を定義し、group_id に従って最大温度と各ソート デバイスの平均、最大、最小出力を出力します。

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $max: "$obs.temp"
    },
    avg_watts: {
    $avg: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  4. $tumblingWindowステージを構成します。

    ストリーミングデータで $group などのアキュムレーションを実行するために、Atlas Stream Processing はWindowsを使用してデータセットをバインドします。ストリームを連続する 10 秒間隔に分割する $tumblingWindow ステージの変数を定義します。

    つまり、例、$group ステージが max_watts の 値を計算する場合、過去 10 秒に特定の group_id が取り込まれたすべてのドキュメントの obs.watts 値から最大値が抽出されます。

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  5. $mergeステージを構成します。

    接続された Atlas クラスターの solarDbデータベース内の solarColl という名前のコレクションに処理されたストリーミングデータを書込む $merge ステージの変数を定義します。

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  6. ストリーム プロセッサを作成します。

    sp.createStreamProcessor() メソッドを使用して、新しいストリーム プロセッサに名前を割り当て、その集計パイプラインを宣言します。$group ステージは $tumblingWindow のネストされたパイプラインに属しており、プロセッサパイプライン定義にこれを含めることはできません。

    sp.createStreamProcessor("solarDemo", [s, t, m])

    これにより、以前に定義されたクエリを適用し、接続したクラスター上のsolarDbデータベースのsolarCollコレクションに処理されたデータを書込むsolarDemoという名前のストリーム プロセッサが作成されます。 ソート デバイスからの10秒間隔の観察から派生したさまざまな測定値を返します。

    Atlas Stream Processing が保管中のデータベースに書き込む方法の詳細については、$merge(ストリーム プロセシング) を参照してください。

5

In the list of stream processors for your stream processing workspace, click the Start icon for your stream processor.

sp.processor.start()mongoshで メソッドを使用する:

sp.solarDemo.start()
6

ストリーム プロセッサが Atlas クラスターにデータを書き込んだことを確認するには、次の手順を実行します。

  1. Atlas で、プロジェクトの [Clusters] ページに移動します。

    1. まだ表示されていない場合は、希望するプロジェクトを含む組織を選択しますナビゲーション バーのOrganizationsメニュー

    2. まだ表示されていない場合は、ナビゲーション バーのProjectsメニューから目的のプロジェクトを選択します。

    3. サイドバーで、 Database見出しの下のClustersをクリックします。

      [ Clusters (クラスター) ] ページが表示されます。

  2. Atlas で、プロジェクトの [Data Explorer] ページに移動します。

    1. まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。

    2. まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。

    3. サイドバーで、 Database見出しの下のData Explorerをクリックします。

      Data Explorerが表示されます。

    注意

    また、Clusters ページに移動し、Shortcuts 見出しの下の Data Explorer をクリックします。

  3. MySolarコレクションを表示します。

プロセッサがアクティブであることを確認するには、: のsp.processor.stats() mongoshメソッドを使用します。

sp.solarDemo.stats()

このメソッドは、solarDemoストリーム プロセッサの運用統計を報告します。

また、sp.processor.sample()mongosh メソッドを使用して、ターミナルに処理されたドキュメントのサンプルを返すこともできます。

sp.solarDemo.sample()
{
_id: 10,
max_temp: 16,
avg_watts: 232,
max_watts: 414,
min_watts: 73
}

注意

前述の出力は、一般的な の例です。ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

7

In the list of stream processors for your stream processing workspace, click the Delete () icon for your stream processor.

表示される確認ダイアログで、ストリーム プロセッサの名前(solarDemo)を入力して削除することを確認し、[Delete] をクリックします。

sp.processor.drop()mongoshsolarDemoを削除するには、 の メソッドを使用します。

sp.solarDemo.drop()

solarDemo を削除したことを確認するには、sp.listStreamProcessors() メソッドを使用して使用可能なすべてのストリーム プロセッサを一覧表示します。

sp.listStreamProcessors()

次の方法を学習します:

戻る

Overview