An Atlas Stream Processing ストリーム プロセッサは、一意の名前を持つ ストリーム集計パイプラインのロジックをストリーミング データに適用します。Atlas Stream Processing は、各ストリーム プロセッサの定義を永続ストレージに保存し、再利用できるようにします。指定したストリーム プロセッサは、その定義が保存されている Stream Processing ワークスペースでのみ使用できます。
前提条件
ストリーム プロセッサを作成および管理するには、次のものが必要です。
ストリーム プロセッサを作成および実行する、
atlasAdminロールを持つデータベースユーザーAtlas クラスター
Considerations
多くのストリーム プロセッサ コマンドでは、メソッド呼び出しで関連するストリーム プロセッサの名前を指定する必要があります。 次のセクションで説明される構文では、厳密に英数字の名前を使用することを前提としています。 ストリーム プロセッサの名前にハイフン(-)や完全停止(.)など、英数字以外の文字が含まれている場合は、名前を角括弧([])とdouble引用符("")で囲む必要があります。メソッド呼び出し(sp.["special-name-stream"].stats() と同様)。
ストリーム プロセッサを対話的に作成する
のsp.process() mongoshメソッドを使用して、ストリーム プロセッサを対話的に作成できます。対話的に作成したストリーム プロセッサは、次の動作を示します。
出力とデッドレター キューのドキュメントを shell に書込む
作成後すぐに実行を開始
10分間、またはユーザーがそれらを停止するまで実行します
停止した後に永続化しない
対話的に作成するストリーム プロセッサは、プロトタイプ作成を目的としています。 永続的なストリーム プロセッサを作成するには、「 ストリーム プロセッサの作成 」を参照してください。
sp.process() の構文は次のとおりです。
sp.process(<pipeline>)
フィールド | タイプ | 必要性 | 説明 |
|---|---|---|---|
| 配列 | 必須 | データのストリーミング配信に適用するストリーム集約パイプライン。 |
ストリーム プロセッサを対話的に作成するには、次の手順に従います。
Stream Processing ワークスペースに接続します。
Stream Processing ワークスペースに関連付けられた接続文字列を使用して、mongoshを使用して接続します。
例
次のコマンドは x. 059 認証を使用して、 streamOwnerという名前のユーザーとしてStream Processing ワークスペースに接続します。
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
ユーザーのパスワードの入力を求められたら、入力します。
パイプラインを定義します。
mongoshプロンプトで、 pipelineという名前の変数に適用する集計ステージを含む配列を割り当てます。
次の例では、接続レジストリ内のmyKafka接続のstuffトピックを$sourceとして使用し、 temperatureフィールドの値が46であるレコードを照合し、処理されたメッセージをoutputに出力します。接続レジストリにあるmySink接続のトピック。
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
ストリーム プロセッサを作成する
削除するまで保持されるストリーム プロセッサを作成するには、次の手順に従います。
Atlas管理APIは、ストリームプロセッサを作成するためのエンドポイントを提供します。
Atlas UI でストリーム プロセッサを作成するには、Atlas プロジェクトのStream Processingページに移動し、Stream Processing ワークスペースのペインでManageをクリックします。
ストリーム プロセッサを構成するには、Visual Builder または JSONエディターのどちらかを使用するかを選択できます。
ソース接続を追加します。
Sourceフィールドで、ストリーム プロセッサのソースとして使用する接続を Connection ドロップダウン リストから選択します。
これにより、ストリーム プロセッサの ステージを構成できるJSONテキスト sourceボックスが開きます。source ステージの構文の詳細については、$source を参照してください。
例
次の source ステージは、事前構成された sample_stream_solar 接続のリアルタイムデータに基づいて動作します。
{ "$source": { "connectionName": "sample_stream_solar" } }
ストリーム プロセッサパイプラインに集計ステージを追加します。
Start building your pipeline ペインで、パイプラインに追加する集計ステージの [] ボタンをクリックします。これにより、選択した集計ステージをJSON形式で構成できるテキストボックスが開きます。
集計ステージがリストにない場合は、+ Custom stage をクリックして、 JSON形式でサポートされている集計ステージを定義します。Atlas Stream Processing集計ステージとその構文の詳細については、集計パイプライン ステージを参照してください。
例
次の $match ステージは、事前構成された sample_stream_solar ストリーム内の obs.wattsフィールドが 300 より大きいすべてのドキュメントと一致します。
{ "$match": { "obs.watts": { "$gt": 300 } } }
(任意)追加の集計ステージ を構成します。
パイプラインに追加の集計ステージを追加するには、パイプラインの最後のステージの下にある + Add stage below ボタンをクリックして、追加する集計ステージを選択するか、サポートされている別の集計ステージを定義するために Custom stage をクリックします。そうすると、JSON形式で新しいステージを構成できるテキストボックスが開きます。
Sink 接続を追加します。
Sinkフィールドで、Connection ドロップダウンリストから宛先接続を選択します。
Sinkフィールドで、処理データを書き込む Connection ドロップダウン リストから接続を選択します。
これにより、ストリーム プロセッサの ステージを構成できるJSONテキスト mergeボックスが開きます。merge ステージの構文の詳細については、$merge を参照してください。
例
次の sink ステージでは、demoConnection 接続という名前の接続内の demoDb.demoCollコレクションに処理データが書き込まれます。
{ "$merge": { "into": { "connectionName": "demoConnection", "db": "demoDb", "coll": "demoColl" } } }
ストリーム プロセッサを定義します。
JSONエディターのテキスト ボックスにストリーム プロセッサのJSON定義を指定します。この定義には、ストリーム プロセッサの名前と、$source ステージで開始され $merge ステージで終了する集計パイプラインの名前を含める必要があります。$source ステージと $merge ステージの間に任意の数の追加の集計ステージを含めることができます。
Atlas Stream Processing集計ステージとその構文の詳細については、集計パイプライン ステージを参照してください。
例
次のJSON定義では、ネストされた solarDemo$tumblingWindow$groupsample_stream_solarステージを持つ10 ステージを使用して、事前構成された 接続からのリアルタイムデータを 秒間隔で集計する、 という名前のストリーム プロセッサが作成されます。は処理されたデータをmongodb1 という名前の接続内のコレクションに書き込みます。
{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] }
mongosh を使用して新しいストリーム プロセッサを作成するには、sp.createStreamProcessor() メソッドを使用します。構文は次のとおりです。
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | タイプ | 必要性 | 説明 |
|---|---|---|---|
| string | 必須 | ストリーム プロセッサの論理名。これは Stream Processing ワークスペース内で一意である必要があります。 この名前には、英数字のみを含める必要があります。 |
| 配列 | 必須 | データのストリーミング配信に適用するストリーム集約パイプライン。 |
| オブジェクト | 任意 | ストリーム プロセッサのさまざまなオプション設定を定義するオブジェクト。 |
| オブジェクト | 条件付き | Stream Processing ワークスペースにデッドレターキュー(DLQ)を割り当てるオブジェクト。このフィールドは |
| string | 条件付き | 接続レジストリ内の接続を識別する、人間が判読可能なラベル。 この接続は Atlas クラスターを参照する必要があります。 このフィールドは、 |
| string | 条件付き |
|
| string | 条件付き |
|
| string | 任意 | Atlas Stream Processing がプロセッサを割り当てるポッドの階層。このオプションを宣言しない場合、 Atlas Stream Processing プロセッサを Stream Processing ワークスペースのデフォルト階層のポッドに割り当てます。詳しくは 「階層」をご覧ください。 |
Stream Processing ワークスペースに接続します。
Stream Processing ワークスペースに関連付けられた接続文字列を使用して、mongoshを使用して接続します。
Stream Processing ワークスペースのペインで、Connect をクリックします。
Connect to your workspace ダイアログで、Shellタブを選択します。
ダイアログに表示される接続文字列をコピーします。形式は次のとおりで、
<atlas-stream-processing-url>はストリーム処理ワークスペースのURLで、<username>はatlasAdminロールを持つデータベースユーザーのユーザー名です。mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> 接続文字列をターミナルに貼り付け、
<password>プレースホルダーをユーザーの認証情報に置き換えます。Enter キーを押して実行し、 Stream Processing ワークスペースに接続します。
例
以下のコマンドは、x.059 認証を使用して、streamOwner という名前のユーザーとして Stream Processing ワークスペースに接続します。
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
ユーザーのパスワードの入力を求められたら、入力します。
パイプラインを定義します。
mongoshプロンプトで、 pipelineという名前の変数に適用する集計ステージを含む配列を割り当てます。
次の例パイプラインでは、接続レジストリ内の myKafka 接続の stuffトピックを$sourceとして使用し、temperatureフィールドの値が 46 であるレコードと一致させ、処理されたメッセージを output に出力します。接続レジストリにある mySink 接続のトピック。
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
ストリーム プロセッサを起動する
注意
ストリーム プロセッサを起動するには:
Atlas 管理API は、ストリーム プロセッサを起動するための 1 つのストリーム プロセッサを起動 および オプションを使用して 1 つのストリーム プロセッサを起動 エンドポイントとなる接続されたデバイスを提供します。
オプションなしでストリーム プロセッサを起動するには、1 つのストリーム プロセッサを起動を使用してください。
$sourceステージのプロパティを変更するには、ストリーム プロセッサを起動し、オプション付きで1つのストリーム プロセッサを起動を使用します。このエンドポイントとなる接続されたデバイスは、startAfterまたはstartAtOperationTimeプロパティの変更をサポートしています。
Atlas UI でストリーム プロセッサを開始するには、 Atlas プロジェクトのStream Processingページに移動し、Stream Processing ワークスペースのペインでManageをクリックして、それに定義されているストリーム プロセッサのリストを表示します。
次に、ストリーム プロセッサの Start アイコンをクリックします。
ストリーム プロセッサを起動するには、mongosh で sp.processor.start() メソッドを使用します。構文は次のとおりです。
sp.processor.start(<options>)
<options> は汎用の任意ドキュメントで、フィールドは基礎となる開始コマンドに渡されます。これらのフィールドは、次のとおりになります。
フィールド | タイプ | 必要性 | 説明 |
|---|---|---|---|
| token | 条件付き | MongoDB Collection Change Streamを参照してください。 |
| タイムスタンプ | 条件付き | MongoDB Collection Change Streamを参照してください。 |
| string | 任意 | Atlas Stream Processing がプロセッサを割り当てる階層。このオプションを宣言しない場合、Atlas Stream Processing はプロセッサをストリーム処理ワークスペースの階層に割り当てます。次のいずれかである必要があります。
詳しくは 「階層」をご覧ください。 |
たとえば、 proc01という名前のストリーム プロセッサを起動するには、次のコマンドを実行します。
sp.proc01.start()
{ "ok" : 1 }
このメソッドは、ストリーム プロセッサが存在し、現在を実行中いない場合は { "ok": 1 } を返します。sp.processor.start()ではないストリーム プロセッサに対してSTOPPED を呼び出すと、mongosh はエラーを返します。
ストリーム プロセッサの停止
注意
ストリーム プロセッサを停止するには:
Atlas Administration API は、ストリーム プロセッサを停止するためのエンドポイントを提供します。
Atlas UI でストリーム プロセッサを一時停止するには、Atlas プロジェクトの Stream Processing ページに移動し、Stream Processing ワークスペースのペインで Manage をクリックして、それに定義されているストリーム プロセッサのリストを表示します。
次に、ストリーム プロセッサの Pause アイコンをクリックします。
を使用して既存のストリーム プロセッサを停止するには、mongosh sp.processor.stop()メソッドを使用します。構文は次のとおりです。
sp.processor.stop(<options>)
<options> は汎用の任意ドキュメントで、そのフィールドは基になる停止コマンドに渡されます。
たとえば、 proc01という名前のストリーム プロセッサを停止するには、次のコマンドを実行します。
sp.proc01.stop()
{ "ok" : 1 }
このメソッドは、ストリーム プロセッサが存在し、現在を実行中場合は { "ok": 1 } を返します。sp.processor.stop()ではないストリーム プロセッサに対してrunning を呼び出すと、mongosh はエラーを返します。
ストリームプロセッサを変更するには
既存のストリーム プロセッサの次の要素を変更できます。
ストリーム プロセッサを変更するには、次の手順に従います。
デフォルトでは、変更されたプロセッサは最後のチェックポイントから復元されます。あるいは、resumeFromCheckpoint=false を設定することもできます。この場合、プロセッサは要約統計のみを保持します。ウィンドウが開いているプロセッサーを変更した場合、ウィンドウは更新されたパイプラインで全て再計算されます。
注意
Operator(is、contains などのマッチャー式が含まれる)を使用して、ストリーム プロセッサ ステートが失敗した場合のアラートを構成したストリーム プロセッサの名前を変更すると、Atlas は、マッチャー式がその新しい名前と一致しない場合、名前が変更されたストリーム プロセッサに対してアラートをトリガーしません。名前が変更されたストリーム プロセッサを監視するには、アラートを再構成してください。
制限
デフォルト設定 resumeFromCheckpoint=true が有効になっている場合、以下の制限が適用されます。
$sourceステージは変更できません。ウィンドウの間隔は変更できません。
ウィンドウを削除することはできません。
ウィンドウを使用してパイプラインを変更できるのは、そのウィンドウの内部パイプラインに
$groupまたは$sortステージのいずれかが含まれている場合のみです。既存のウィンドウタイプを変更することはできません。たとえば、
$tumblingWindowから$hoppingWindowへの変更、またはその逆の変更はできません。ウィンドウを持つプロセッサは、ウィンドウの再計算の結果として、一部のデータを再処理する可能性があります。
ストリーム プロセッサを変更するには、次の手順に従います。
Atlas Administration API は、ストリームプロセッサを変更するためのエンドポイントを提供します。
mongosh v2.3.4 以上が必要です。
既存のストリーム プロセッサを変更するには、sp.<streamprocessor>.modify() コマンドを使用します。<streamprocessor> は、現在の Stream Processing ワークスペースに対して定義された停止中のストリームプロセッサの名前である必要があります。
たとえば、proc01 という名前のストリーム プロセッサを変更するには、次のコマンドを実行します。
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional })
既存のパイプラインにステージを追加する
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
ストリーム プロセッサの入力ソースを変更する
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
ストリーム プロセッサからデッドレターキュー (DLQ) を削除する
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
ウィンドウを使用してストリーム プロセッサを修正する
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
ストリーム プロセッサの削除
ストリーム プロセッサを削除するには:
Atlas管理APIは、ストリームプロセッサを削除するためのエンドポイントを提供します。
Atlas UI でストリーム プロセッサを削除するには、Atlas プロジェクトの Stream Processing ページに移動し、Stream Processing ワークスペースのペインで Manage をクリックして、それに定義されたストリームプロセッサのリストを表示します。
次に、ストリーム プロセッサの Delete()アイコンをクリックします。表示される確認ダイアログで、ストリーム プロセッサの名前(solarDemo)を入力して削除することを確認し、Delete をクリックします。
を使用して既存のストリーム プロセッサを削除するには、mongosh sp.processor.drop()メソッドを使用します。構文は次のとおりです。
sp.processor.drop(<options>)
<options> は汎用の任意ドキュメントで、そのフィールドは基になる削除コマンドに渡されます。
たとえば、 proc01という名前のストリーム プロセッサを削除するには、次のコマンドを実行します。
sp.proc01.drop()
このメソッドは以下を返します。
trueストリーム プロセッサが存在する場合。falseストリーム プロセッサが存在しない場合。
ストリーム プロセッサを削除すると、Atlas Stream Processing がそのストリーム用にプロビジョニングしたすべてのリソースと、保存されたすべての状態が破棄されます。
使用可能なストリーム プロセッサを一覧表示する
使用可能なすべてのストリーム プロセッサを一覧表示するには:
Atlas Administration API は、利用可能なすべてのストリーム プロセッサを一覧表示するためのエンドポイントを提供します。
Atlas UI で stream processing ワークスペースに定義されているストリームプロセッサのリストを表示するには、Atlas プロジェクトの Stream Processing ページに移動し、Stream Processing ワークスペースのペインで Manage をクリックします。
ストリーム プロセッサの一覧とそのステータスが表示されます。
mongoshを使用して現在の Stream Processing ワークスペースで使用可能なストリーミング配信するプロセッサをすべて一覧表示するには、sp.listStreamProcessors()メソッドを使用します。これは、各ストリーム プロセッサに関連する名前、開始時間、現在の状態、パイプラインを含むドキュメントのリストを返します。構文は次のとおりです。
sp.listStreamProcessors(<filter>)
<filter> は、リストをフィルタリングするフィールドを指定するドキュメントです。
例
次の例では、フィルタリングされていないリクエストの戻り値を示しています。
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
同じ Stream Processing ワークスペースでコマンドを再度実行し、 "running"の"state"をフィルターすると、次の出力が表示されます。
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
ワークスペースのデフォルトを一覧表示する
Atlas ストリーム処理ワークスペース階層のサイズ設定に関する情報を表示するには:
Atlas UIでストリーム処理ワークスペースの defaultTierSize と maxTierSize を表示するには、 Atlasプロジェクトの Stream Processing ページに移動します。
defaultTierSize と maxTierSize は、ストリーム処理ワークスペース カードの下部に表示されます。
を使用してストリーム処理ワークスペースのmongosh defaultTierSizeとmaxTierSize を表示するには、sp.listWorkspaceDefaults() メソッドを使用します。
sp.listWorkspaceDefaults()
このメソッドは以下を返します。
defaultTierSizemaxTierSize
ストリーム プロセッサからのサンプル
既存のストリーム プロセッサからサンプル結果の配列を mongosh を使用して STDOUT に返すには、sp.processor.sample() メソッドを使用します。例、次のコマンドは、proc01 という名前のストリーム プロセッサからサンプリングしています。
sp.proc01.sample()
このコマンドは、 CTRL-Cを使用してキャンセルするまで、または返されたサンプルのサイズが累積40 MB になるまで継続的に実行されます。 ストリーム プロセッサは、サンプル内の無効なドキュメントを次の形式の_dlqMessageドキュメントで報告します。
{ _dlqMessage: { errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', workspaceName: '<workspaceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
これらのメッセージを使用して、デッド レター キューコレクションを定義せずにデータのクリーンアップの問題を診断できます。
ストリーム プロセッサの統計情報を表示する
注意
ストリーム プロセッサの統計を表示するには:
Atlas Administration API は、ストリーム プロセッサの統計を表示するためのエンドポイントを提供します。
ストリーム プロセッサのモニタリングを表示するには、Atlasプロジェクトの Stream Processing ページにGo、 Monitoringタブを開きます。 次に、ページ左上にある Stream processor ドロップダウン リストからストリーム プロセッサを選択します。
mongosh を使用して既存のストリーム プロセッサの現在のステータスを要約するドキュメントを返すには、sp.processor.stats() メソッドを使用します。構文は次のとおりです。
sp.<streamprocessor>.stats({options: {<options>}})
ここで、 optionsは次のフィールドを持つ任意のドキュメントです。
フィールド | タイプ | 説明 |
|---|---|---|
| integer | 出力内の項目のサイズに使用する単位。 デフォルトでは、Atlas Stream Processing はアイテムのサイズをバイト単位で表示します。 KB 単位で表示するには、 |
| ブール値 | 出力ドキュメントの冗長レベルを指定するフラグ。 |
出力ドキュメントには、次のフィールドがあります。
フィールド | タイプ | 説明 |
|---|---|---|
| string | ストリーム プロセッサが定義されている名前空間。 |
| オブジェクト | ストリーム プロセッサの動作状態を説明するドキュメント。 |
| string | ストリーム プロセッサの名前。 |
| string | ストリーム プロセッサの状態。 このフィールドには、次の値を指定できます。
|
| integer | サイズ フィールドが表示される単位。 |
| integer | ストリームに公開されたドキュメントの数。 ドキュメントは、パイプライン全体を通過する場合ではなく、 |
| integer | ストリームに公開されたバイト数またはキロバイト数。 バイトは、パイプライン全体を通過する場合ではなく、 |
| integer | ストリームによって処理されたドキュメントの数。 ドキュメントはストリームによってパイプライン全体を通過すると、ストリームによって「処理済み」と見なされます。 |
| integer | ストリームによって処理されたバイト数またはキロバイト数。 バイトはパイプライン全体を通過すると、ストリームによって「処理済み」と見なされます。 |
| integer | |
| integer | |
| integer | 最新の変更ストリーム再開トークンによって表されるイベント時間とoplog 内の最新のイベントとの差(秒単位)。 |
| token | 最新の変更ストリーム再開トークン。 変更ストリーム ソースを持つストリーム プロセッサにのみ適用されます。 |
| ドキュメント | ストリーム プロセッサ全体のレイテンシ統計です。Atlas Stream Processing は、 |
| integer | 過去 30 秒間に処理されたすべてのドキュメントの推定 50 パーセンタイル レイテンシ。パイプラインにウィンドウ ステージが含まれている場合、レイテンシ測定にはウィンドウの間隔が含まれます。 例えば、 |
| integer | 過去 30 秒間に処理されたすべてのドキュメントの推定 99 パーセンタイル レイテンシ。パイプラインにウィンドウ ステージが含まれている場合、レイテンシ測定にはウィンドウの間隔が含まれます。 例えば、 |
| datetime | 直近の 30 秒測定ウィンドウが開始した時刻(ウォールタイム)。 |
| datetime | 直近の 30 秒間の測定ウィンドウが終了した時点のウォール時間。 |
| string | レイテンシが計測される時間の単位。この値は常に |
| integer | 直近の 30 秒測定ウィンドウでストリームプロセッサが処理したドキュメント数。 |
| integer | 直近の 30 秒測定ウィンドウで取得された、個々のレイテンシ測定値の合計(マイクロ秒単位)。 |
| integer | Windowsがプロセッサの状態を保存するために使用するバイト数。 |
| integer | 現在の浮動小数のタイムスタンプ。 |
| 配列 | プロセッサ パイプライン内の各演算子の統計情報。 Atlas Stream Processing では、
|
| integer | 演算子の最大メモリ使用量(バイトまたはキロバイト単位)。 |
| integer | 演算子の合計実行時間(秒単位)。 |
| date | 最小オープンウィンドウの開始時刻。この値は任意です。 |
| date | 最大オープンウィンドウの開始時刻。この値は任意です。 |
| 配列 | 特定のソース演算子とシンク演算子のターゲットごとの統計情報。 この配列の各要素は、入力コレクションや出力コレクションやApache Kafka トピックなど、単一の入力ターゲットまたは出力ターゲットを表すドキュメントです。演算子に応じて、各ドキュメントには次のフィールドのサブセットが含まれます。 Apache Kafka MongoDBターゲットの場合は
MongoDB や MongoDBターゲットの場合は
Atlas Stream Processing では、 Apache Kafka 最大 100 の個別のターゲットのターゲットごとの統計情報を記録します。その後、ストリーム プロセッサは |
| 配列 | Apache Kafkaエージェントのパーティションのオフセット情報。 |
| integer | Apache Kafkaトピックのパーティション番号。 |
| integer | 指定されたパーティションに対するストリーム プロセッサのオフセット。 この値は、ストリーム プロセッサが処理した以前のオフセットに |
| integer | ストリーム プロセッサがApache Kafkaプロバイダーと指定されたパーティションのチェックポイントに最後にコミットしたオフセット。このオフセットを介するすべてのメッセージは、最後のチェックポイントに記録されます。 |
| ブール値 | パーティションがアイドル状態であるかどうかを示すフラグ。この値はデフォルトで |
たとえば、次の例は、inst01という名前のStream Processing ワークスペース上のproc01という名前のストリーム プロセッサのステータスを示しています(アイテム サイズは KB 単位)。
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }
この例には、ストリーム プロセッサの全体的な統計が示されています。
個々の演算子がどのように動作するか、または各ターゲットが処理するトラフィックの量を確認するには、verbose オプションを使用して sp.<streamprocessor>.stats() を呼び出し、stats.operatorStats と、一部の演算子では stats.operatorStats.targetStats を検査します。
例、ソース演算子の場合、stats.operatorStats.targetStats は、すべての一意の db/coll またはすべての一意のトピックの inputMessageCount フィールドと inputMessageSize フィールドを収集します。
{ "name" : "KafkaConsumerOperator", "inputMessageCount" : NumberLong(100), "inputMessageSize" : 100352, "targetStats" : [ { "topic" : "outputTopic1", "inputMessageCount" : NumberLong(100), "inputMessageSize" : 100352 } ], ... }
また、Sink 演算子の場合、stats.operatorStats.targetStats は、すべての一意の db/coll またはすべての一意のトピックの outputMessageCount フィールドと outputMessageSize フィールドを収集します。
{ "name" : "MergeOperator", "inputMessageCount" : NumberLong(10), "inputMessageSize" : 1744, "targetStats" : [ { "db" : "cust1", "coll" : "outColl1", "outputMessageCount" : NumberLong(3), "outputMessageSize" : 1748 }, { "db" : "cust2", "coll" : "outColl2", "outputMessageCount" : NumberLong(4), "outputMessageSize" : 2241 } ], ... }