このチュートリアルでは、Atlas Stream Processing を設定し、最初のストリーム プロセッサを実行する手順について説明します。
前提条件
Atlas の サンプル データ セット からの映画データを含むコレクションを使用します。
空のクラスターを持つ Atlasプロジェクト。このクラスターは、ストリーム プロセッサのデータ シンクとして機能します。
ストリーム プロセッサを作成および実行する、
atlasAdmin
ロールを持つデータベースユーザーmongosh
バージョン2.0以上Atlas userで、ストリーム プロセシング インスタンスと接続レジストリを管理するための
Project Owner
またはProject Stream Processing Owner
ロールを持つ Atlas ユーザー注意
Project Owner
ロールでは、データベース配置の作成、プロジェクト アクセスとプロジェクト設定の管理、IP アクセス リスト エントリの管理などを行うことができます。Project Stream Processing Owner
ロールにより、ストリーム プロセシング インスタンスの表示、作成、削除、編集などの Atlas Stream Processing アクションや、接続レジストリ内の接続の表示、追加、変更、削除などの Atlas Stream Processing アクションが可能になります。2 つのロールの違いの詳細については、「プロジェクト ロール」を参照してください。
手順
このチュートリアルでは、ストリーム処理インスタンスの作成、既存の Atlas クラスターへの接続、およびソートストリーミングデバイスからサンプルデータを取り込んで 、接続されたクラスターにデータを書込むようにストリーム プロセッサを設定する方法について説明します。
Atlas Stream Processing インスタンスの作成。
Atlasで、プロジェクトのGo Stream Processing{0 ページに します。
警告: ナビゲーションの改善が進行中
現在、新しく改善されたナビゲーション エクスペリエンスを展開しています。次の手順が Atlas UIのビューと一致しない場合は、プレビュー ドキュメントを参照してください。
まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。
まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。
サイドバーで、 Services見出しの下のStream Processingをクリックします。
Atlas Stream Processingページが表示されます。
[Create a workspace] をクリックします。
Create a stream processing instanceページで、インスタンスを次のように構成します。
Tier:
SP30
Provider:
AWS
Region:
us-east-1
Instance Name:
tutorialInstance
[Create] をクリックします。
接続レジストリに Sink 接続を追加します。
既存の空の Atlas クラスターへの接続を接続レジストリに追加します。ストリーム プロセッサはこの接続をストリーミングデータ シンクとして使用します。
Atlas Stream Processing インスタンスの ペインで、 Configureをクリックします。
[ Connection Registryタブで、右上の [ + Add Connection ] をクリックします。
Connection Type ドロップダウンリストから、Atlas Database をクリックします。
Connection Nameフィールドに
mongodb1
と入力します。Atlas Cluster ドロップダウン リストから、データが保存されていない Atlas クラスターを選択します。
Execute as ドロップダウンリストから、Read and write to any database を選択します。
[Add connection] をクリックします。
ストリーミング データソースが メッセージを発行することを確認します。
ストリーム プロセシング インスタンスには、 sample_stream_solar
というサンプル データソースへの接続が事前に構成されています。 このソースは、さまざまな ソート デバイスからのレポートのストリームを生成します。 各レポートには、特定の時点における単一のソート デバイスの測定値と温度、およびそのデバイスの最大出力サイズが記載されています。
次のドキュメントは、このデータソースからのレポートを表します。
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 } }
このソースが メッセージを発行することを確認するには、 を使用してストリーム プロセッサを対話的に作成します。mongosh
Atlas Stream Processing インスタンスに接続します。
Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、
mongosh
を使用して接続します。Atlas Stream Processing インスタンスの ペインで、 Connectをクリックします。
Connect to your instance ダイアログで、Shellタブを選択します。
ダイアログに表示される接続文字列をコピーします。形式は次の
<atlas-stream-processing-url>
で、<username>
はストリーム処理インスタンスのURL 、atlasAdmin
は ロールを持つデータベースユーザーのユーザー名です。mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 接続文字列をターミナルに貼り付け、
<password>
プレースホルダーをユーザーの認証情報に置き換えます。Enter キーを押して実行し、ストリーム処理インスタンスに接続します。
mongosh
プロンプトで、 メソッドを使用してストリームsp.process()
プロセッサを対話的に作成します。sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) sample_stream_solar
接続からのデータがコンソールに表示されていることを確認し、プロセスを終了します。sp.process()
を使用して作成したストリーム プロセッサは、終了後に永続することはありません。
永続的なストリーム プロセッサを作成します。
永続的なストリーム プロセッサは、プロセッサを削除するまで、指定されたデータ シンクにストリーミングデータを継続的に取り込み、処理し、書き込みます。次のストリーム プロセッサは、 秒間隔で各ソート10 デバイスの最大温度と平均、最大、最小出力を出力し、その結果を接続された空のクラスターに書込む集計パイプラインです。
次のいずれかのタブを選択して、Atlas UIまたは を使用してストリームmongosh
プロセッサを作成します。
Atlas UIでストリーム プロセッサを作成するには、Atlasプロジェクトの Stream Processing ページに移動し、ストリーム処理インスタンスの ペインで [Configure] をクリックします。次に、ビジュアル ビルダまたはJSONエディターのどちらかを使用して、solarDemo
という名前のストリーム プロセッサを構成します。
[Create with visual builder] をクリックします。
ビジュアル ビルダには、ストリーム プロセッサを設定できるフォームが表示されます。
Stream processor nameフィールドに
solarDemo
と入力します。Sourceフィールドで、Connection ドロップダウンリストから
sample_stream_solar
を選択します。これにより、次の
$source
ステージが集計パイプラインに追加されます。{ "$source": { "connectionName": "sample_stream_solar" } } $tumblingWindow
ステージを構成します。Start building your pipelineペインで+ Custom stage をクリックし、次の JSONをコピーして、表示されるテキストボックスに貼り付けます。これにより、ネストされた ステージを持つ
$tumblingWindow
$group
ステージが定義され、 秒間隔で各ソート デバイスの最大温度と最大書込み保証(write concern)の最大書込み保証(write10 concern)を実現します。つまり、例、
$group
ステージがmax_watts
の 値を計算する場合、過去 10 秒に特定のgroup_id
が取り込まれたすべてのドキュメントのobs.watts
値から最大値が抽出されます。{ "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } }] } } Sinkフィールドで、Connection ドロップダウンリストから
mongodb1
を選択します。表示されるテキストボックスに、次のJSONをコピーして貼り付けます。これにより、処理されたストリーミングデータを接続済みの Atlas クラスターの データベース内の という名前のコレクションに書き込む
$merge
ステージが構成されます。solarColl
solarDb
{ "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } [Create stream processor] をクリックします。
ストリーム プロセッサが作成され、Stream Processing ページの Stream Processorsタブに表示されます。
[Use JSON editor] をクリックします。
JSONエディターが開き、ストリーム プロセッサをJSON形式で構成できるテキストボックスが表示されます。
ストリーム プロセッサを定義します。
次のJSON定義をコピーしてJSONエディターのテキストボックスに貼り付け、 という名前のストリーム
solarDemo
プロセッサを定義します。このストリーム プロセッサは、ネストされた ステージを持つ ステージを使用して、$tumblingWindow
$group
秒間隔で各ソート デバイスの最大値、最小値、平均出力値を求め、その結果をという名前のコレクションに書き込みます。接続された10solarColl
solarDb
Atlas クラスターの データベース内のつまり、例、
$group
ステージがmax_watts
の 値を計算する場合、過去 10 秒に特定のgroup_id
が取り込まれたすべてのドキュメントのobs.watts
値から最大値が抽出されます。{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] } [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "avg_watts": { "$avg": "$obs.watts" }, "max_temp": { "$avg": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "coll": "solarColl", "connectionName": "mongodb1", "db": "solarDb" } } } ]
mongosh
で次のコマンドを実行して、 という名前の永続ストリームsolarDemo
プロセッサを作成します。
Atlas Stream Processing インスタンスに接続します。
Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、
mongosh
を使用して接続します。Atlas Stream Processing インスタンスの ペインで、 Connectをクリックします。
Connect to your instance ダイアログで、Shellタブを選択します。
ダイアログに表示される接続文字列をコピーします。形式は次の
<atlas-stream-processing-url>
で、<username>
はストリーム処理インスタンスのURL 、atlasAdmin
は ロールを持つデータベースユーザーのユーザー名です。mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 接続文字列をターミナルに貼り付け、
<password>
プレースホルダーをユーザーの認証情報に置き換えます。Enter キーを押して実行し、ストリーム処理インスタンスに接続します。
$source
ステージを構成します。sample_stream_solar
ソースからデータを取り込む$source
ステージの変数を定義します。let s = { source: { connectionName: "sample_stream_solar" } } $group
ステージを構成します。$group
ステージの変数を定義し、group_id
に従って最大温度と各ソート デバイスの平均、最大、最小出力を出力します。let g = { group: { _id: "$group_id", max_temp: { $max: "$obs.temp" }, avg_watts: { $avg: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } $tumblingWindow
ステージを構成します。$group
ストリーミングデータで などのアキュムレーションを実行するために、Atlas Stream Processing は ウィンドウ$tumblingWindow
10を使用してデータセットをバインドします。ストリームを連続する 秒間隔に分割する ステージの変数を定義します。つまり、例、
$group
ステージがmax_watts
の 値を計算する場合、過去 10 秒に特定のgroup_id
が取り込まれたすべてのドキュメントのobs.watts
値から最大値が抽出されます。let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } $mergeステージを構成します。
接続された Atlas クラスターの
solarDb
データベース内のsolarColl
という名前のコレクションに処理されたストリーミングデータを書込む$merge
ステージの変数を定義します。let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } ストリーム プロセッサを作成します。
sp.createStreamProcessor()
メソッドを使用して、新しいストリーム プロセッサに名前を割り当て、その集計パイプラインを宣言します。$group
ステージは$tumblingWindow
のネストされたパイプラインに属しており、プロセッサパイプライン定義にこれを含めることはできません。sp.createStreamProcessor("solarDemo", [s, t, m]) これにより、以前に定義されたクエリを適用し、接続したクラスター上の
solarDb
データベースのsolarColl
コレクションに処理されたデータを書込むsolarDemo
という名前のストリーム プロセッサが作成されます。 ソート デバイスからの10秒間隔の観察から派生したさまざまな測定値を返します。Atlas Stream Processing が保管中のデータベースに書き込む方法の詳細については、
$merge
(ストリーム プロセシング) を参照してください。
ストリーム プロセッサを起動します。
ストリーム処理インスタンスのストリーム プロセッサのリストで、ストリーム プロセッサの Start アイコンをクリックします。
sp.processor.start()
mongosh
で メソッドを使用する:
sp.solarDemo.start()
ストリーム プロセッサの出力を確認します。
ストリーム プロセッサが Atlas クラスターにデータを書き込んだことを確認するには、次の手順を実行します。
Atlas で、プロジェクトの [Clusters] ページに移動します。
警告: ナビゲーションの改善が進行中
現在、新しく改善されたナビゲーション エクスペリエンスを展開しています。次の手順が Atlas UIのビューと一致しない場合は、プレビュー ドキュメントを参照してください。
まだ表示されていない場合は、希望するプロジェクトを含む組織を選択しますナビゲーション バーのOrganizationsメニュー
まだ表示されていない場合は、ナビゲーション バーのProjectsメニューから目的のプロジェクトを選択します。
まだ表示されていない場合は、サイドバーの [Clusters] をクリックします。
[ Clusters (クラスター) ] ページが表示されます。
クラスターの [Browse Collections] ボタンをクリックします。
Data Explorerが表示されます。
MySolar
コレクションを表示します。
プロセッサがアクティブであることを確認するには、sp.processor.stats()
mongosh
の メソッドを使用します。
sp.solarDemo.stats()
このメソッドは、solarDemo
ストリーム プロセッサの運用統計を報告します。
また、sp.processor.sample()
のmongosh
メソッドを使用して、ターミナルに処理されたドキュメントのサンプルを返すこともできます。
sp.solarDemo.sample()
{ _id: 10, max_temp: 16, avg_watts: 232, max_watts: 414, min_watts: 73 }
注意
前述の出力は、一般的な の例です。ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。
ストリーム プロセッサを削除します。
ストリーム処理インスタンスのストリーム プロセッサのリストで、ストリーム プロセッサのDelete ( )アイコンをクリックします。
表示される確認ダイアログで、ストリーム プロセッサの名前(solarDemo
)を入力して削除することを確認し、[Delete] をクリックします。
sp.processor.drop()
mongosh
solarDemo
を削除するには、 の メソッドを使用します。
sp.solarDemo.drop()
solarDemo
を削除したことを確認するには、 メソッドを使用して使用可能なすべてのストリームsp.listStreamProcessors()
プロセッサを一覧表示します。
sp.listStreamProcessors()
次のステップ
次の方法を学習します: