このチュートリアルでは、Atlas Stream Processing を設定し、最初のストリーム プロセッサを実行する手順について説明します。
前提条件
Atlas の サンプル データ セット からの映画データを含むコレクションを使用します。
空のクラスターを持つ Atlasプロジェクト。このクラスターは、ストリーム プロセッサのデータ シンクとして機能します。
ストリーム プロセッサを作成および実行する、
atlasAdminロールを持つデータベースユーザーストリーム処理ワークスペースと接続レジストリを管理するための
Project Stream Processing OwnerまたはProject Ownerロールを持つ Atlas ユーザー注意
Project Ownerロールでは、データベース配置の作成、プロジェクト アクセスとプロジェクト設定の管理、IP アクセス リスト エントリの管理などを行うことができます。Project Stream Processing Ownerロールにより、ストリーム処理ワークスペースの表示、作成、削除、編集や、接続レジストリ内の接続の表示、追加、変更、削除など、Atlas Stream Processing のアクションが可能になります。2 つのロールの違いの詳細については、「プロジェクト ロール」を参照してください。
手順
このチュートリアルでは、ストリーム処理ワークスペースの作成、既存の Atlas クラスターへの接続、およびソートストリーミングデバイスからサンプルデータを取り込んで 、接続されたクラスターにデータを書込むようにストリーム プロセッサを設定する方法について説明します。
Atlas Stream Processing ワークスペースの作成。
Atlasで、プロジェクトのGo Stream Processing{0 ページに します。
まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。
まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。
サイドバーで、 Streaming Data見出しの下のStream Processingをクリックします。
Stream Processing ページが表示されます。
[Create a workspace] をクリックします。
Create a stream processing workspace ページで、ワークスペースを次のように構成します。
Tier:
SP30Provider:
AWSRegion:
us-east-1Workspace Name:
tutorialWorkspace
[Create] をクリックします。
接続レジストリに Sink 接続を追加します。
既存の空の Atlas クラスターへの接続を接続レジストリに追加します。ストリーム プロセッサはこの接続をストリーミングデータ シンクとして使用します。
Atlas ストリーム処理ワークスペースの ペインで、Configure をクリックします。
[ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。
Connection Type ドロップダウンリストから、Atlas Database をクリックします。
Connection Nameフィールドに
mongodb1と入力します。Atlas Cluster ドロップダウン リストから、データが保存されていない Atlas クラスターを選択します。
Execute as ドロップダウンリストから、Read and write to any database を選択します。
[Add connection] をクリックします。
ストリーミング データソースが メッセージを発行することを確認します。
ストリーム処理ワークスペースには、sample_stream_solar というサンプルデータソースへの接続が事前に構成されています。このソースは、さまざまな ソート デバイスからのレポートのストリームを生成します。各レポートには、特定の点における単一のソート デバイスの測定値と温度、およびそのデバイスの最大出力サイズが記載されています。
次のドキュメントは、このデータソースからのレポートを表します。
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 } }
このソースが メッセージを発行することを確認するには、 を使用してストリーム プロセッサを対話的に作成します。mongosh
ストリーム処理ワークスペースに接続します。
Atlas ストリーム処理ワークスペースに関連付けられている接続文字列を使用して接続し、
mongoshを使用して接続します。Atlas ストリーム処理ワークスペースの ペインで、Connect をクリックします。
ワークスペース接続ダイアログで、Choose a connection method をクリックし、Shellタブ を選択します。
ダイアログに表示される接続文字列をコピーします。形式は次の
<atlas-stream-processing-url>で、<username>はストリーム処理ワークスペースのURL 、atlasAdminは ロールを持つデータベースユーザーのユーザー名です。mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 接続文字列をターミナルに貼り付け、
<password>プレースホルダーをユーザーの認証情報に置き換えます。Enter キーを押して実行し、ストリーム処理ワークスペースに接続します。
mongoshプロンプトで、sp.process()メソッドを使用してストリーム プロセッサを対話的に作成します。sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) sample_stream_solar接続からのデータがコンソールに表示されていることを確認し、プロセスを終了します。sp.process()を使用して作成したストリーム プロセッサは、終了後に永続することはありません。
永続的なストリーム プロセッサを作成します。
永続的なストリーム プロセッサは、プロセッサを削除するまで、指定されたデータ シンクにストリーミングデータを継続的に取り込み、処理し、書き込みます。次のストリーム プロセッサは、 秒間隔で各ソート10 デバイスの最大温度と平均、最大、最小出力を出力し、その結果を接続された空のクラスターに書込む集計パイプラインです。
次のいずれかのタブを選択して、Atlas UIまたは mongosh: を使用してストリーム プロセッサを作成します。
Atlas UIでストリーム プロセッサを作成するには、Atlasプロジェクトの Stream Processing ページに移動し、ストリーム処理ワークスペースの ペインで [Configure] をクリックします。次に、ビジュアル ビルダまたはJSONエディターのどちらかを使用して、solarDemo という名前のストリーム プロセッサを構成します。
[Create with visual builder] をクリックします。
ビジュアル ビルダには、ストリーム プロセッサを設定できるフォームが表示されます。
Stream processor nameフィールドに
solarDemoと入力します。Sourceフィールドで、Connection ドロップダウンリストから
sample_stream_solarを選択します。これにより、次の
$sourceステージが集計パイプラインに追加されます。{ "$source": { "connectionName": "sample_stream_solar" } } $tumblingWindowステージを構成します。Start building your pipeline ペインで + Custom stage をクリックし、次の JSONをコピーして、表示されるテキストボックスに貼り付けます。これにより、ネストされた
$tumblingWindow$groupステージを持つ ステージが定義され、 秒間隔で各ソート デバイスの最大温度と最大書込み保証(write concern)の最大書込み保証(write10 concern)を実現します。つまり、例、
$groupステージがmax_wattsの 値を計算する場合、過去 10 秒に特定のgroup_idが取り込まれたすべてのドキュメントのobs.watts値から最大値が抽出されます。{ "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } }] } } Sinkフィールドで、Connection ドロップダウンリストから
mongodb1を選択します。表示されるテキストボックスに、次のJSONをコピーして貼り付けます。これにより、処理されたストリーミングデータを接続済みの Atlas クラスターの
solarDbデータベース内のsolarCollという名前のコレクションに書き込む$mergeステージが構成されます。{ "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } [Create stream processor] をクリックします。
ストリーム プロセッサが作成され、Stream Processing ページの Stream Processorsタブに表示されます。
[Use JSON editor] をクリックします。
JSONエディターが開き、ストリーム プロセッサをJSON形式で構成できるテキストボックスが表示されます。
ストリーム プロセッサを定義します。
次のJSON定義をコピーしてJSONエディターのテキストボックスに貼り付け、
solarDemoという名前のストリーム プロセッサを定義します。このストリーム プロセッサは、ネストされた$tumblingWindow$groupステージを持つ10 ステージを使用して、 秒間隔で各ソート デバイスの最大値、最小値、平均出力値を求め、その結果をという名前のコレクションに書き込みます。接続されたsolarCollCluster0Atlas クラスターの データベース内のつまり、例、
$groupステージがmax_wattsの 値を計算する場合、過去 10 秒に特定のgroup_idが取り込まれたすべてのドキュメントのobs.watts値から最大値が抽出されます。{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "Cluster0", "coll": "solarColl" }, "parallelism":16, } } ] }
mongosh で次のコマンドを実行して、solarDemo という名前の永続ストリーム プロセッサを作成します。
ストリーム処理ワークスペースに接続します。
Atlas ストリーム処理ワークスペースに関連付けられている接続文字列を使用して接続し、
mongoshを使用して接続します。Atlas ストリーム処理ワークスペースの ペインで、Connect をクリックします。
Connect to your workspace ダイアログで、Shellタブを選択します。
ダイアログに表示される接続文字列をコピーします。形式は次の
<atlas-stream-processing-url>で、<username>はストリーム処理ワークスペースのURL 、atlasAdminは ロールを持つデータベースユーザーのユーザー名です。mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 接続文字列をターミナルに貼り付け、
<password>プレースホルダーをユーザーの認証情報に置き換えます。Enter キーを押して実行し、ストリーム処理ワークスペースに接続します。
$sourceステージを構成します。sample_stream_solarソースからデータを取り込む$sourceステージの変数を定義します。let s = { source: { connectionName: "sample_stream_solar" } } $groupステージを構成します。$groupステージの変数を定義し、group_idに従って最大温度と各ソート デバイスの平均、最大、最小出力を出力します。let g = { group: { _id: "$group_id", max_temp: { $max: "$obs.temp" }, avg_watts: { $avg: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } $tumblingWindowステージを構成します。ストリーミングデータで
$groupなどのアキュムレーションを実行するために、Atlas Stream Processing はWindowsを使用してデータセットをバインドします。ストリームを連続する 10 秒間隔に分割する$tumblingWindowステージの変数を定義します。つまり、例、
$groupステージがmax_wattsの 値を計算する場合、過去 10 秒に特定のgroup_idが取り込まれたすべてのドキュメントのobs.watts値から最大値が抽出されます。let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } $mergeステージを構成します。
接続された Atlas クラスターの
solarDbデータベース内のsolarCollという名前のコレクションに処理されたストリーミングデータを書込む$mergeステージの変数を定義します。let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } ストリーム プロセッサを作成します。
sp.createStreamProcessor()メソッドを使用して、新しいストリーム プロセッサに名前を割り当て、その集計パイプラインを宣言します。$groupステージは$tumblingWindowのネストされたパイプラインに属しており、プロセッサパイプライン定義にこれを含めることはできません。sp.createStreamProcessor("solarDemo", [s, t, m]) これにより、以前に定義されたクエリを適用し、接続したクラスター上の
solarDbデータベースのsolarCollコレクションに処理されたデータを書込むsolarDemoという名前のストリーム プロセッサが作成されます。 ソート デバイスからの10秒間隔の観察から派生したさまざまな測定値を返します。Atlas Stream Processing が保管中のデータベースに書き込む方法の詳細については、
$merge(ストリーム プロセシング) を参照してください。
ストリーム プロセッサを起動します。
ストリーム処理ワークスペースのストリーム プロセッサのリストで、ストリーム プロセッサの Start アイコンをクリックします。
sp.processor.start()mongoshで メソッドを使用する:
sp.solarDemo.start()
ストリーム プロセッサの出力を確認します。
ストリーム プロセッサが Atlas クラスターにデータを書き込んだことを確認するには、次の手順を実行します。
Atlas で、プロジェクトの [Clusters] ページに移動します。
まだ表示されていない場合は、希望するプロジェクトを含む組織を選択しますナビゲーション バーのOrganizationsメニュー
まだ表示されていない場合は、ナビゲーション バーのProjectsメニューから目的のプロジェクトを選択します。
サイドバーで、 Database見出しの下のClustersをクリックします。
[ Clusters (クラスター) ] ページが表示されます。
Atlas で、プロジェクトの [Data Explorer] ページに移動します。
まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。
まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。
サイドバーで、 Database見出しの下のData Explorerをクリックします。
Data Explorerが表示されます。
注意
また、Clusters ページに移動し、Shortcuts 見出しの下の Data Explorer をクリックします。
MySolarコレクションを表示します。
プロセッサがアクティブであることを確認するには、: のsp.processor.stats() mongoshメソッドを使用します。
sp.solarDemo.stats()
このメソッドは、solarDemoストリーム プロセッサの運用統計を報告します。
また、sp.processor.sample() のmongosh メソッドを使用して、ターミナルに処理されたドキュメントのサンプルを返すこともできます。
sp.solarDemo.sample()
{ _id: 10, max_temp: 16, avg_watts: 232, max_watts: 414, min_watts: 73 }
注意
前述の出力は、一般的な の例です。ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。
ストリーム プロセッサを削除します。
ストリーム処理ワークスペースのストリーム プロセッサのリストで、ストリームDelete プロセッサの ( )アイコンをクリックします。
表示される確認ダイアログで、ストリーム プロセッサの名前(solarDemo)を入力して削除することを確認し、[Delete] をクリックします。
sp.processor.drop()mongoshsolarDemoを削除するには、 の メソッドを使用します。
sp.solarDemo.drop()
solarDemo を削除したことを確認するには、sp.listStreamProcessors() メソッドを使用して使用可能なすべてのストリーム プロセッサを一覧表示します。
sp.listStreamProcessors()
次のステップ
次の方法を学習します: