Docs Menu
Docs Home
/ /

Atlas Stream Processing を使い始める

このチュートリアルでは、Atlas Stream Processing を設定し、最初のストリーム プロセッサを実行する手順について説明します。

Atlas の サンプル データ セット からの映画データを含むコレクションを使用します。

  • 空のクラスターを持つ Atlasプロジェクト。このクラスターは、ストリーム プロセッサのデータ シンクとして機能します。

  • ストリーム プロセッサを作成および実行する、atlasAdmin ロールを持つデータベースユーザー

  • mongosh バージョン2.0以上

  • Atlas userで、ストリーム プロセシング インスタンスと接続レジストリを管理するためのProject OwnerまたはProject Stream Processing Ownerロールを持つ Atlas ユーザー

    注意

    Project Ownerロールでは、データベース配置の作成、プロジェクト アクセスとプロジェクト設定の管理、IP アクセス リスト エントリの管理などを行うことができます。

    Project Stream Processing Ownerロールにより、ストリーム プロセシング インスタンスの表示、作成、削除、編集などの Atlas Stream Processing アクションや、接続レジストリ内の接続の表示、追加、変更、削除などの Atlas Stream Processing アクションが可能になります。

    2 つのロールの違いの詳細については、「プロジェクト ロール」を参照してください。

このチュートリアルでは、 Atlas Stream Processingインスタンスの作成、既存の Atlas クラスターへの接続、およびストリーム プロセッサを設定して、ソートストリーミングデバイスからサンプルデータを取り込み、接続されたクラスターにデータを書込む方法について説明します。

1
  1. Atlasで、プロジェクトのGo Stream Processing{0 ページに します。

    1. まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。

    2. まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。

    3. サイドバーで、 Streaming Data見出しの下のStream Processingをクリックします。

      Atlas Stream Processingページが表示されます。

  2. [Create a workspace] をクリックします。

  3. Create a stream processing instanceページで、インスタンスを次のように構成します。

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. [Create] をクリックします。

2

既存の空の Atlas クラスターへの接続を接続レジストリに追加します。ストリーム プロセッサはこの接続をストリーミングデータ シンクとして使用します。

  1. Atlas Stream Processing インスタンスの ペインで、 Configureをクリックします。

  2. [ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。

  3. Connection Type ドロップダウンリストから、Atlas Database をクリックします。

  4. Connection Nameフィールドにmongodb1と入力します。

  5. Atlas Cluster ドロップダウン リストから、データが保存されていない Atlas クラスターを選択します。

  6. Execute as ドロップダウンリストから、Read and write to any database を選択します。

  7. [Add connection] をクリックします。

3

ストリーム プロセシング インスタンスには、 sample_stream_solarというサンプル データソースへの接続が事前に構成されています。 このソースは、さまざまな ソート デバイスからのレポートのストリームを生成します。 各レポートには、特定の時点における単一のソート デバイスの測定値と温度、およびそのデバイスの最大出力サイズが記載されています。

次のドキュメントは、このデータソースからのレポートを表します。

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
}
}

このソースが メッセージを発行することを確認するには、 を使用してストリーム プロセッサを対話的に作成します。mongosh

  1. Atlas Stream Processing インスタンスに接続します。

    Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、 mongosh を使用して接続します。

    1. Atlas Stream Processing インスタンスの ペインで、 Connectをクリックします。

    2. Connect to your instance ダイアログで、Shellタブを選択します。

    3. ダイアログに表示される接続文字列をコピーします。 形式は次ので、<atlas-stream-processing-url> は Atlas Stream ProcessingインスタンスのURL 、<username>atlasAdmin ロールを持つデータベースユーザーのユーザー名です。

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 接続文字列をターミナルに貼り付け、<password> プレースホルダーをユーザーの認証情報に置き換えます。

      Enter キーを押して実行し、Stream Processingインスタンスに接続します。

  2. mongosh プロンプトで、sp.process() メソッドを使用してストリーム プロセッサを対話的に作成します。

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    sample_stream_solar接続からのデータがコンソールに表示されていることを確認し、プロセスを終了します。

    sp.process()を使用して作成したストリーム プロセッサは、終了後に永続することはありません。

4

永続的なストリーム プロセッサは、プロセッサを削除するまで、指定されたデータ シンクにストリーミングデータを継続的に取り込み、処理し、書き込みます。次のストリーム プロセッサは、 秒間隔で各ソート10 デバイスの最大温度と平均、最大、最小出力を出力し、その結果を接続された空のクラスターに書込む集計パイプラインです。

次のいずれかのタブを選択して、Atlas UIまたは mongosh: を使用してストリーム プロセッサを作成します。

Atlas UIでストリーム プロセッサを作成するには、Atlasプロジェクトの Stream Processing ページにGo、Stream Processingインスタンスの ペインで [Configure] をクリックします。 次に、ビジュアル ビルダまたはJSONエディターのどちらかを使用して、solarDemo という名前のストリーム プロセッサを構成します。

  1. [Create with visual builder] をクリックします。

    ビジュアル ビルダには、ストリーム プロセッサを設定できるフォームが表示されます。

  2. Stream processor nameフィールドにsolarDemoと入力します。

  3. Sourceフィールドで、Connection ドロップダウンリストから sample_stream_solar を選択します。

    これにより、次の $source ステージが集計パイプラインに追加されます。

    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    }
  4. $tumblingWindowステージを構成します。

    Start building your pipeline ペインで + Custom stage をクリックし、次の JSONをコピーして、表示されるテキストボックスに貼り付けます。これにより、ネストされた$tumblingWindow $groupステージを持つ ステージが定義され、 秒間隔で各ソート デバイスの最大温度と最大書込み保証(write concern)の最大書込み保証(write10 concern)を実現します。

    つまり、例、$group ステージが max_watts の 値を計算する場合、過去 10 秒に特定の group_id が取り込まれたすべてのドキュメントの obs.watts 値から最大値が抽出されます。

    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [ {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }]
    }
    }
  5. Sinkフィールドで、Connection ドロップダウンリストから mongodb1 を選択します。

    表示されるテキストボックスに、次のJSONをコピーして貼り付けます。これにより、処理されたストリーミングデータを接続済みの Atlas クラスターの solarDbデータベース内の solarColl という名前のコレクションに書き込む$mergeステージが構成されます。

    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
  6. [Create stream processor] をクリックします。

    ストリーム プロセッサが作成され、Stream Processing ページの Stream Processorsタブに表示されます。

  1. [Use JSON editor] をクリックします。

    JSONエディターが開き、ストリーム プロセッサをJSON形式で構成できるテキストボックスが表示されます。

  2. ストリーム プロセッサを定義します。

    次のJSON定義をコピーしてJSONエディターのテキストボックスに貼り付け、solarDemo という名前のストリーム プロセッサを定義します。このストリーム プロセッサは、ネストされた$tumblingWindow $groupステージを持つ10 ステージを使用して、 秒間隔で各ソート デバイスの最大値、最小値、平均出力値を求め、その結果をという名前のコレクションに書き込みます。接続されたsolarColl solarDbAtlas クラスターの データベース内の

    つまり、例、$group ステージが max_watts の 値を計算する場合、過去 10 秒に特定の group_id が取り込まれたすべてのドキュメントの obs.watts 値から最大値が抽出されます。

    {
    "name": "solarDemo",
    "pipeline": [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
    ]
    }
    [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "avg_watts": {
    "$avg": "$obs.watts"
    },
    "max_temp": {
    "$avg": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "coll": "solarColl",
    "connectionName": "mongodb1",
    "db": "solarDb"
    }
    }
    }
    ]

mongosh で次のコマンドを実行して、solarDemo という名前の永続ストリーム プロセッサを作成します。

  1. Atlas Stream Processing インスタンスに接続します。

    Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、 mongosh を使用して接続します。

    1. Atlas Stream Processing インスタンスの ペインで、 Connectをクリックします。

    2. Connect to your instance ダイアログで、Shellタブを選択します。

    3. ダイアログに表示される接続文字列をコピーします。 形式は次ので、<atlas-stream-processing-url> は Atlas Stream ProcessingインスタンスのURL 、<username>atlasAdmin ロールを持つデータベースユーザーのユーザー名です。

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 接続文字列をターミナルに貼り付け、<password> プレースホルダーをユーザーの認証情報に置き換えます。

      Enter キーを押して実行し、Stream Processingインスタンスに接続します。

  2. $sourceステージを構成します。

    sample_stream_solar ソースからデータを取り込む $source ステージの変数を定義します。

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  3. $groupステージを構成します。

    $group ステージの変数を定義し、group_id に従って最大温度と各ソート デバイスの平均、最大、最小出力を出力します。

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $max: "$obs.temp"
    },
    avg_watts: {
    $avg: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  4. $tumblingWindowステージを構成します。

    ストリーミングデータで $group などのアキュムレーションを実行するために、Atlas Stream Processing はWindowsを使用してデータセットをバインドします。ストリームを連続する 10 秒間隔に分割する $tumblingWindow ステージの変数を定義します。

    つまり、例、$group ステージが max_watts の 値を計算する場合、過去 10 秒に特定の group_id が取り込まれたすべてのドキュメントの obs.watts 値から最大値が抽出されます。

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  5. $mergeステージを構成します。

    接続された Atlas クラスターの solarDbデータベース内の solarColl という名前のコレクションに処理されたストリーミングデータを書込む $merge ステージの変数を定義します。

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  6. ストリーム プロセッサを作成します。

    sp.createStreamProcessor() メソッドを使用して、新しいストリーム プロセッサに名前を割り当て、その集計パイプラインを宣言します。$group ステージは $tumblingWindow のネストされたパイプラインに属しており、プロセッサパイプライン定義にこれを含めることはできません。

    sp.createStreamProcessor("solarDemo", [s, t, m])

    これにより、以前に定義されたクエリを適用し、接続したクラスター上のsolarDbデータベースのsolarCollコレクションに処理されたデータを書込むsolarDemoという名前のストリーム プロセッサが作成されます。 ソート デバイスからの10秒間隔の観察から派生したさまざまな測定値を返します。

    Atlas Stream Processing が保管中のデータベースに書き込む方法の詳細については、$merge(ストリーム プロセシング) を参照してください。

5

ストリーム プロセシングインスタンスのストリーム プロセッサのリストで、ストリーム プロセッサの Start アイコンをクリックします。

sp.processor.start()mongoshで メソッドを使用する:

sp.solarDemo.start()
6

ストリーム プロセッサが Atlas クラスターにデータを書き込んだことを確認するには、次の手順を実行します。

  1. Atlas で、プロジェクトの [Clusters] ページに移動します。

    1. まだ表示されていない場合は、希望するプロジェクトを含む組織を選択しますナビゲーション バーのOrganizationsメニュー

    2. まだ表示されていない場合は、ナビゲーション バーのProjectsメニューから目的のプロジェクトを選択します。

    3. サイドバーで、 Database見出しの下のClustersをクリックします。

      [ Clusters (クラスター) ] ページが表示されます。

  2. Atlas で、プロジェクトの [Data Explorer] ページに移動します。

    1. まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。

    2. まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。

    3. サイドバーで、 Database見出しの下のData Explorerをクリックします。

      Data Explorerが表示されます。

    注意

    また、Clusters ページに移動し、Shortcuts 見出しの下の Data Explorer をクリックします。

  3. MySolarコレクションを表示します。

プロセッサがアクティブであることを確認するには、: のsp.processor.stats() mongoshメソッドを使用します。

sp.solarDemo.stats()

このメソッドは、solarDemoストリーム プロセッサの運用統計を報告します。

また、sp.processor.sample()mongosh メソッドを使用して、ターミナルに処理されたドキュメントのサンプルを返すこともできます。

sp.solarDemo.sample()
{
_id: 10,
max_temp: 16,
avg_watts: 232,
max_watts: 414,
min_watts: 73
}

注意

前述の出力は、一般的な の例です。ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

7

ストリーム プロセシングインスタンスのストリーム プロセッサのリストで、ストリーム プロセッサの Delete)アイコンをクリックします。

表示される確認ダイアログで、ストリーム プロセッサの名前(solarDemo)を入力して削除することを確認し、[Delete] をクリックします。

sp.processor.drop()mongoshsolarDemoを削除するには、 の メソッドを使用します。

sp.solarDemo.drop()

solarDemo を削除したことを確認するには、sp.listStreamProcessors() メソッドを使用して使用可能なすべてのストリーム プロセッサを一覧表示します。

sp.listStreamProcessors()

次の方法を学習します:

戻る

Overview