Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
Atlas
/

Atlas Stream Processing を使い始める

このチュートリアルでは、Atlas Stream Processing を設定し、最初のストリーム プロセッサを実行する手順について説明します。

Atlas の サンプル データ セット からの映画データを含むコレクションを使用します。

  • 空のクラスターを持つ Atlasプロジェクト。このクラスターは、ストリーム プロセッサのデータ シンクとして機能します。

  • ストリーム プロセッサを作成および実行する、atlasAdmin ロールを持つデータベースユーザー

  • mongosh バージョン2.0以上

  • Atlas userで、ストリーム プロセシング インスタンスと接続レジストリを管理するためのProject OwnerまたはProject Stream Processing Ownerロールを持つ Atlas ユーザー

    注意

    Project Ownerロールでは、データベース配置の作成、プロジェクト アクセスとプロジェクト設定の管理、IP アクセス リスト エントリの管理などを行うことができます。

    Project Stream Processing Ownerロールにより、ストリーム プロセシング インスタンスの表示、作成、削除、編集などの Atlas Stream Processing アクションや、接続レジストリ内の接続の表示、追加、変更、削除などの Atlas Stream Processing アクションが可能になります。

    2 つのロールの違いの詳細については、「プロジェクト ロール」を参照してください。

このチュートリアルでは、ストリーム処理インスタンスの作成、既存の Atlas クラスターへの接続、およびソートストリーミングデバイスからサンプルデータを取り込んで 、接続されたクラスターにデータを書込むようにストリーム プロセッサを設定する方法について説明します。

1
  1. Atlasで、プロジェクトのGo Stream Processing{0 ページに します。

    警告: ナビゲーションの改善が進行中

    現在、新しく改善されたナビゲーション エクスペリエンスを展開しています。次の手順が Atlas UIのビューと一致しない場合は、プレビュー ドキュメントを参照してください。

    1. まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。

    2. まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。

    3. サイドバーで、 Services見出しの下のStream Processingをクリックします。

      Atlas Stream Processingページが表示されます。

  2. [Create a workspace] をクリックします。

  3. Create a stream processing instanceページで、インスタンスを次のように構成します。

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. [Create] をクリックします。

2

既存の空の Atlas クラスターへの接続を接続レジストリに追加します。ストリーム プロセッサはこの接続をストリーミングデータ シンクとして使用します。

  1. Atlas Stream Processing インスタンスの ペインで、 Configureをクリックします。

  2. [ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。

  3. Connection Type ドロップダウンリストから、Atlas Database をクリックします。

  4. Connection Nameフィールドにmongodb1と入力します。

  5. Atlas Cluster ドロップダウン リストから、データが保存されていない Atlas クラスターを選択します。

  6. Execute as ドロップダウンリストから、Read and write to any database を選択します。

  7. [Add connection] をクリックします。

3

ストリーム プロセシング インスタンスには、 sample_stream_solarというサンプル データソースへの接続が事前に構成されています。 このソースは、さまざまな ソート デバイスからのレポートのストリームを生成します。 各レポートには、特定の時点における単一のソート デバイスの測定値と温度、およびそのデバイスの最大出力サイズが記載されています。

次のドキュメントは、このデータソースからのレポートを表します。

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
}
}

このソースが メッセージを発行することを確認するには、 を使用してストリーム プロセッサを対話的に作成します。mongosh

  1. Atlas Stream Processing インスタンスに接続します。

    Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、 mongosh を使用して接続します。

    1. Atlas Stream Processing インスタンスの ペインで、 Connectをクリックします。

    2. Connect to your instance ダイアログで、Shellタブを選択します。

    3. ダイアログに表示される接続文字列をコピーします。形式は次の<atlas-stream-processing-url> で、<username> はストリーム処理インスタンスのURL 、atlasAdmin は ロールを持つデータベースユーザーのユーザー名です。

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 接続文字列をターミナルに貼り付け、<password> プレースホルダーをユーザーの認証情報に置き換えます。

      Enter キーを押して実行し、ストリーム処理インスタンスに接続します。

  2. mongoshプロンプトで、 メソッドを使用してストリームsp.process() プロセッサを対話的に作成します。

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    sample_stream_solar接続からのデータがコンソールに表示されていることを確認し、プロセスを終了します。

    sp.process()を使用して作成したストリーム プロセッサは、終了後に永続することはありません。

4

永続的なストリーム プロセッサは、プロセッサを削除するまで、指定されたデータ シンクにストリーミングデータを継続的に取り込み、処理し、書き込みます。次のストリーム プロセッサは、 秒間隔で各ソート10 デバイスの最大温度と平均、最大、最小出力を出力し、その結果を接続された空のクラスターに書込む集計パイプラインです。

次のいずれかのタブを選択して、Atlas UIまたは を使用してストリームmongosh プロセッサを作成します。

Atlas UIでストリーム プロセッサを作成するには、Atlasプロジェクトの Stream Processing ページに移動し、ストリーム処理インスタンスの ペインで [Configure] をクリックします。次に、ビジュアル ビルダまたはJSONエディターのどちらかを使用して、solarDemo という名前のストリーム プロセッサを構成します。

  1. [Create with visual builder] をクリックします。

    ビジュアル ビルダには、ストリーム プロセッサを設定できるフォームが表示されます。

  2. Stream processor nameフィールドにsolarDemoと入力します。

  3. Sourceフィールドで、Connection ドロップダウンリストから sample_stream_solar を選択します。

    これにより、次の$source ステージが集計パイプラインに追加されます。

    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    }
  4. $tumblingWindowステージを構成します。

    Start building your pipelineペインで+ Custom stage をクリックし、次の JSONをコピーして、表示されるテキストボックスに貼り付けます。これにより、ネストされた ステージを持つ$tumblingWindow$group ステージが定義され、 秒間隔で各ソート デバイスの最大温度と最大書込み保証(write concern)の最大書込み保証(write10 concern)を実現します。

    つまり、例、$group ステージが max_watts の 値を計算する場合、過去 10 秒に特定の group_id が取り込まれたすべてのドキュメントの obs.watts 値から最大値が抽出されます。

    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [ {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }]
    }
    }
  5. Sinkフィールドで、Connection ドロップダウンリストから mongodb1 を選択します。

    表示されるテキストボックスに、次のJSONをコピーして貼り付けます。これにより、処理されたストリーミングデータを接続済みの Atlas クラスターの データベース内の という名前のコレクションに書き込む$merge ステージが構成されます。solarCollsolarDb

    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
  6. [Create stream processor] をクリックします。

    ストリーム プロセッサが作成され、Stream Processing ページの Stream Processorsタブに表示されます。

  1. [Use JSON editor] をクリックします。

    JSONエディターが開き、ストリーム プロセッサをJSON形式で構成できるテキストボックスが表示されます。

  2. ストリーム プロセッサを定義します。

    次のJSON定義をコピーしてJSONエディターのテキストボックスに貼り付け、 という名前のストリームsolarDemo プロセッサを定義します。このストリーム プロセッサは、ネストされた ステージを持つ ステージを使用して、$tumblingWindow $group秒間隔で各ソート デバイスの最大値、最小値、平均出力値を求め、その結果をという名前のコレクションに書き込みます。接続された10 solarCollsolarDbAtlas クラスターの データベース内の

    つまり、例、$group ステージが max_watts の 値を計算する場合、過去 10 秒に特定の group_id が取り込まれたすべてのドキュメントの obs.watts 値から最大値が抽出されます。

    {
    "name": "solarDemo",
    "pipeline": [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
    ]
    }
    [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "avg_watts": {
    "$avg": "$obs.watts"
    },
    "max_temp": {
    "$avg": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "coll": "solarColl",
    "connectionName": "mongodb1",
    "db": "solarDb"
    }
    }
    }
    ]

mongoshで次のコマンドを実行して、 という名前の永続ストリームsolarDemo プロセッサを作成します。

  1. Atlas Stream Processing インスタンスに接続します。

    Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、 mongosh を使用して接続します。

    1. Atlas Stream Processing インスタンスの ペインで、 Connectをクリックします。

    2. Connect to your instance ダイアログで、Shellタブを選択します。

    3. ダイアログに表示される接続文字列をコピーします。形式は次の<atlas-stream-processing-url> で、<username> はストリーム処理インスタンスのURL 、atlasAdmin は ロールを持つデータベースユーザーのユーザー名です。

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. 接続文字列をターミナルに貼り付け、<password> プレースホルダーをユーザーの認証情報に置き換えます。

      Enter キーを押して実行し、ストリーム処理インスタンスに接続します。

  2. $sourceステージを構成します。

    sample_stream_solar ソースからデータを取り込む $source ステージの変数を定義します。

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  3. $groupステージを構成します。

    $group ステージの変数を定義し、group_id に従って最大温度と各ソート デバイスの平均、最大、最小出力を出力します。

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $max: "$obs.temp"
    },
    avg_watts: {
    $avg: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  4. $tumblingWindowステージを構成します。

    $groupストリーミングデータで などのアキュムレーションを実行するために、Atlas Stream Processing は ウィンドウ$tumblingWindow 10を使用してデータセットをバインドします。ストリームを連続する 秒間隔に分割する ステージの変数を定義します。

    つまり、例、$group ステージが max_watts の 値を計算する場合、過去 10 秒に特定の group_id が取り込まれたすべてのドキュメントの obs.watts 値から最大値が抽出されます。

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  5. $mergeステージを構成します。

    接続された Atlas クラスターの solarDbデータベース内の solarColl という名前のコレクションに処理されたストリーミングデータを書込む $merge ステージの変数を定義します。

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  6. ストリーム プロセッサを作成します。

    sp.createStreamProcessor()メソッドを使用して、新しいストリーム プロセッサに名前を割り当て、その集計パイプラインを宣言します。$group ステージは$tumblingWindow のネストされたパイプラインに属しており、プロセッサパイプライン定義にこれを含めることはできません。

    sp.createStreamProcessor("solarDemo", [s, t, m])

    これにより、以前に定義されたクエリを適用し、接続したクラスター上のsolarDbデータベースのsolarCollコレクションに処理されたデータを書込むsolarDemoという名前のストリーム プロセッサが作成されます。 ソート デバイスからの10秒間隔の観察から派生したさまざまな測定値を返します。

    Atlas Stream Processing が保管中のデータベースに書き込む方法の詳細については、$merge(ストリーム プロセシング) を参照してください。

5

ストリーム処理インスタンスのストリーム プロセッサのリストで、ストリーム プロセッサの Start アイコンをクリックします。

sp.processor.start()mongoshで メソッドを使用する:

sp.solarDemo.start()
6

ストリーム プロセッサが Atlas クラスターにデータを書き込んだことを確認するには、次の手順を実行します。

  1. Atlas で、プロジェクトの [Clusters] ページに移動します。

    警告: ナビゲーションの改善が進行中

    現在、新しく改善されたナビゲーション エクスペリエンスを展開しています。次の手順が Atlas UIのビューと一致しない場合は、プレビュー ドキュメントを参照してください。

    1. まだ表示されていない場合は、希望するプロジェクトを含む組織を選択しますナビゲーション バーのOrganizationsメニュー

    2. まだ表示されていない場合は、ナビゲーション バーのProjectsメニューから目的のプロジェクトを選択します。

    3. まだ表示されていない場合は、サイドバーの [Clusters] をクリックします。

      [ Clusters (クラスター) ] ページが表示されます。

  2. クラスターの [Browse Collections] ボタンをクリックします。

    Data Explorerが表示されます。

  3. MySolarコレクションを表示します。

プロセッサがアクティブであることを確認するには、sp.processor.stats() mongoshの メソッドを使用します。

sp.solarDemo.stats()

このメソッドは、solarDemoストリーム プロセッサの運用統計を報告します。

また、sp.processor.sample()mongosh メソッドを使用して、ターミナルに処理されたドキュメントのサンプルを返すこともできます。

sp.solarDemo.sample()
{
_id: 10,
max_temp: 16,
avg_watts: 232,
max_watts: 414,
min_watts: 73
}

注意

前述の出力は、一般的な の例です。ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

7

ストリーム処理インスタンスのストリーム プロセッサのリストで、ストリーム プロセッサのDelete )アイコンをクリックします。

表示される確認ダイアログで、ストリーム プロセッサの名前(solarDemo)を入力して削除することを確認し、[Delete] をクリックします。

sp.processor.drop()mongoshsolarDemoを削除するには、 の メソッドを使用します。

sp.solarDemo.drop()

solarDemoを削除したことを確認するには、 メソッドを使用して使用可能なすべてのストリームsp.listStreamProcessors() プロセッサを一覧表示します。

sp.listStreamProcessors()

次の方法を学習します:

戻る

Overview