このチュートリアルでは、Atlas Stream Processing を設定し、最初のストリーム プロセッサを実行する手順について説明します。
前提条件
Atlas の サンプル データ セット からの映画データを含むコレクションを使用します。
空のクラスターを持つ Atlasプロジェクト。このクラスターは、ストリーム プロセッサのデータ シンクとして機能します。
ストリーム プロセッサを作成および実行する、
atlasAdminロールを持つデータベースユーザーStream Processing ワークスペースと接続レジストリを管理するための
Project OwnerまたはProject Stream Processing Ownerロールを持つ Atlas ユーザー注意
Project Ownerロールでは、データベース配置の作成、プロジェクト アクセスとプロジェクト設定の管理、IP アクセス リスト エントリの管理などを行うことができます。Project Stream Processing Ownerのロールは、Stream Processing ワークスペースの閲覧、作成、削除、編集、接続レジストリ内の接続の表示、追加、変更、削除などの Atlas Stream Processing アクションを可能にします。2 つのロールの違いの詳細については、「プロジェクト ロール」を参照してください。
手順
このチュートリアルでは、 Stream Processing ワークスペースの作成、既存の Atlas クラスターへの接続、太陽光ストリーミング デバイスからサンプル データを取り込み、接続されているクラスターにデータを書き込むストリーム プロセッサの設定について説明します。
Stream Processing ワークスペースを作成します。
Atlasで、プロジェクトのGo Stream Processing{0 ページに します。
まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。
まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。
サイドバーで、 Streaming Data見出しの下のStream Processingをクリックします。
Stream Processing ページが表示されます。
[Create a workspace] をクリックします。
Create a stream processing workspaceページで、ワークスペースを次のように設定します。
Tier:
SP30Provider:
AWSRegion:
us-east-1Workspace Name:
tutorialWorkspace
[Create] をクリックします。
接続レジストリに Sink 接続を追加します。
既存の空の Atlas クラスターへの接続を接続レジストリに追加します。ストリーム プロセッサはこの接続をストリーミングデータ シンクとして使用します。
Stream Processing ワークスペースのペインで、Configure をクリックします。
[ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。
Connection Type ドロップダウンリストから、Atlas Database をクリックします。
Connection Nameフィールドに
mongodb1と入力します。Atlas Cluster ドロップダウン リストから、データが保存されていない Atlas クラスターを選択します。
Execute as ドロップダウンリストから、Read and write to any database を選択します。
[Add connection] をクリックします。
ストリーミング データソースが メッセージを発行することを確認します。
Stream Processing ワークスペースには、sample_stream_solar というサンプルデータソースへの接続が事前に構成されています。このソースは、さまざまな太陽光発電デバイスからのレポートのストリーミング配信を生成します。各レポートには、特定の時点での単一の太陽光発電デバイスの観測ワット数と温度、そのデバイスの最大ワット数が記載されています。
次のドキュメントは、このデータソースからのレポートを表します。
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 } }
このソースが メッセージを発行することを確認するには、 を使用してストリーム プロセッサを対話的に作成します。mongosh
Stream Processing ワークスペースに接続します。
Stream Processing ワークスペースに関連付けられた接続文字列を使用して、
mongoshを使用して接続します。Stream Processing ワークスペースのペインで、Connect をクリックします。
ワークスペース接続ダイアログで、Choose a connection methodをクリックし、次にShellタブを選択します。
ダイアログに表示される接続文字列をコピーします。形式は以下の通りで、
<atlas-stream-processing-url>は Stream Processing ワークスペースのURL、<username>はatlasAdminロールを持つデータベースユーザーのユーザー名です。mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 接続文字列をターミナルに貼り付け、
<password>プレースホルダーをユーザーの認証情報に置き換えます。Enter キーを押して実行し、 Stream Processing ワークスペースに接続します。
mongoshプロンプトで、sp.process()メソッドを使用してストリーム プロセッサを対話的に作成します。sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) sample_stream_solar接続からのデータがコンソールに表示されていることを確認し、プロセスを終了します。sp.process()を使用して作成したストリーム プロセッサは、終了後に永続することはありません。
永続的なストリーム プロセッサを作成します。
永続的なストリーム プロセッサは、プロセッサを削除するまで、指定されたデータ シンクにストリーミングデータを継続的に取り込み、処理し、書き込みます。次のストリーム プロセッサは、 秒間隔で各ソート10 デバイスの最大温度と平均、最大、最小出力を出力し、その結果を接続された空のクラスターに書込む集計パイプラインです。
次のいずれかのタブを選択して、Atlas UIまたは mongosh: を使用してストリーム プロセッサを作成します。
Atlas UI でストリームプロセッサを作成するには、Atlas プロジェクトの Stream Processing ページに移動し、Stream Processing ワークスペースのペインで Configure をクリックします。次に、ビジュアルビルダまたは JSON エディターを使用して、solarDemo という名前のストリーム プロセッサを構成するかを選択します。
[Create with visual builder] をクリックします。
ビジュアル ビルダには、ストリーム プロセッサを設定できるフォームが表示されます。
Stream processor nameフィールドに
solarDemoと入力します。Sourceフィールドで、Connection ドロップダウンリストから
sample_stream_solarを選択します。これにより、次の
$sourceステージが集計パイプラインに追加されます。{ "$source": { "connectionName": "sample_stream_solar" } } $tumblingWindowステージを構成します。Start building your pipeline ペインで + Custom stage をクリックし、次の JSONをコピーして、表示されるテキストボックスに貼り付けます。これにより、ネストされた
$tumblingWindow$groupステージを持つ ステージが定義され、 秒間隔で各ソート デバイスの最大温度と最大書込み保証(write concern)の最大書込み保証(write10 concern)を実現します。つまり、例、
$groupステージがmax_wattsの 値を計算する場合、過去 10 秒に特定のgroup_idが取り込まれたすべてのドキュメントのobs.watts値から最大値が抽出されます。{ "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } }] } } Sinkフィールドで、Connection ドロップダウンリストから
mongodb1を選択します。表示されるテキストボックスに、次のJSONをコピーして貼り付けます。これにより、処理されたストリーミングデータを接続済みの Atlas クラスターの
solarDbデータベース内のsolarCollという名前のコレクションに書き込む$mergeステージが構成されます。{ "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } [Create stream processor] をクリックします。
ストリーム プロセッサが作成され、Stream Processing ページの Stream Processorsタブに表示されます。
[Use JSON editor] をクリックします。
JSONエディターが開き、ストリーム プロセッサをJSON形式で構成できるテキストボックスが表示されます。
ストリーム プロセッサを定義します。
次のJSON定義をコピーしてJSONエディターのテキストボックスに貼り付け、
solarDemoという名前のストリーム プロセッサを定義します。このストリーム プロセッサは、ネストされた$tumblingWindow$groupステージを持つ10 ステージを使用して、 秒間隔で各ソート デバイスの最大値、最小値、平均出力値を求め、その結果をという名前のコレクションに書き込みます。接続されたsolarCollCluster0Atlas クラスターの データベース内のつまり、例、
$groupステージがmax_wattsの 値を計算する場合、過去 10 秒に特定のgroup_idが取り込まれたすべてのドキュメントのobs.watts値から最大値が抽出されます。{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "Cluster0", "coll": "solarColl" }, "parallelism":16, } } ] }
mongosh で次のコマンドを実行して、solarDemo という名前の永続ストリーム プロセッサを作成します。
Stream Processing ワークスペースに接続します。
Stream Processing ワークスペースに関連付けられた接続文字列を使用して、
mongoshを使用して接続します。Stream Processing ワークスペースのペインで、Connect をクリックします。
Connect to your workspace ダイアログで、Shellタブを選択します。
ダイアログに表示される接続文字列をコピーします。形式は以下の通りで、
<atlas-stream-processing-url>は Stream Processing ワークスペースのURL、<username>はatlasAdminロールを持つデータベースユーザーのユーザー名です。mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 接続文字列をターミナルに貼り付け、
<password>プレースホルダーをユーザーの認証情報に置き換えます。Enter キーを押して実行し、 Stream Processing ワークスペースに接続します。
$sourceステージを構成します。sample_stream_solarソースからデータを取り込む$sourceステージの変数を定義します。let s = { source: { connectionName: "sample_stream_solar" } } $groupステージを構成します。$groupステージの変数を定義し、group_idに従って最大温度と各ソート デバイスの平均、最大、最小出力を出力します。let g = { group: { _id: "$group_id", max_temp: { $max: "$obs.temp" }, avg_watts: { $avg: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } $tumblingWindowステージを構成します。ストリーミングデータで
$groupなどのアキュムレーションを実行するために、Atlas Stream Processing はWindowsを使用してデータセットをバインドします。ストリームを連続する 10 秒間隔に分割する$tumblingWindowステージの変数を定義します。つまり、例、
$groupステージがmax_wattsの 値を計算する場合、過去 10 秒に特定のgroup_idが取り込まれたすべてのドキュメントのobs.watts値から最大値が抽出されます。let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } $mergeステージを構成します。
接続された Atlas クラスターの
solarDbデータベース内のsolarCollという名前のコレクションに処理されたストリーミングデータを書込む$mergeステージの変数を定義します。let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } ストリーム プロセッサを作成します。
sp.createStreamProcessor()メソッドを使用して、新しいストリーム プロセッサに名前を割り当て、その集計パイプラインを宣言します。$groupステージは$tumblingWindowのネストされたパイプラインに属しており、プロセッサパイプライン定義にこれを含めることはできません。sp.createStreamProcessor("solarDemo", [s, t, m]) これにより、以前に定義されたクエリを適用し、接続したクラスター上の
solarDbデータベースのsolarCollコレクションに処理されたデータを書込むsolarDemoという名前のストリーム プロセッサが作成されます。 ソート デバイスからの10秒間隔の観察から派生したさまざまな測定値を返します。Atlas Stream Processing が保管中のデータベースに書き込む方法の詳細については、
$merge(ストリーム プロセシング) を参照してください。
ストリーム プロセッサを起動します。
Stream Processing ワークスペースのストリーム プロセッサのリストで、ストリーム プロセッサの Start アイコンをクリックします。
sp.processor.start()mongoshで メソッドを使用する:
sp.solarDemo.start()
ストリーム プロセッサの出力を確認します。
ストリーム プロセッサが Atlas クラスターにデータを書き込んだことを確認するには、次の手順を実行します。
Atlas で、プロジェクトの [Clusters] ページに移動します。
まだ表示されていない場合は、希望するプロジェクトを含む組織を選択しますナビゲーション バーのOrganizationsメニュー
まだ表示されていない場合は、ナビゲーション バーのProjectsメニューから目的のプロジェクトを選択します。
サイドバーで、 Database見出しの下のClustersをクリックします。
[ Clusters (クラスター) ] ページが表示されます。
Atlas で、プロジェクトの [Data Explorer] ページに移動します。
まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。
まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。
サイドバーで、 Database見出しの下のData Explorerをクリックします。
Data Explorerが表示されます。
注意
また、Clusters ページに go し、Shortcuts 見出しの下の Data Explorer をクリックします。
MySolarコレクションを表示します。
プロセッサがアクティブであることを確認するには、: のsp.processor.stats() mongoshメソッドを使用します。
sp.solarDemo.stats()
このメソッドは、solarDemoストリーム プロセッサの運用統計を報告します。
また、sp.processor.sample() のmongosh メソッドを使用して、ターミナルに処理されたドキュメントのサンプルを返すこともできます。
sp.solarDemo.sample()
{ _id: 10, max_temp: 16, avg_watts: 232, max_watts: 414, min_watts: 73 }
注意
前述の出力は、一般的な の例です。ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。
ストリーム プロセッサを削除します。
Stream Processing ワークスペースのストリーム プロセッサのリストで、ストリーム プロセッサの Delete()アイコンをクリックします。
表示される確認ダイアログで、ストリーム プロセッサの名前(solarDemo)を入力して削除することを確認し、[Delete] をクリックします。
sp.processor.drop()mongoshsolarDemoを削除するには、 の メソッドを使用します。
sp.solarDemo.drop()
solarDemo を削除したことを確認するには、sp.listStreamProcessors() メソッドを使用して使用可能なすべてのストリーム プロセッサを一覧表示します。
sp.listStreamProcessors()
次のステップ
次の方法を学習します: