Docs Menu
Docs Home
/
Atlas
/

ストリーム プロセッサの開発

Atlas Stream Processing ストリーム プロセッサは、一意の名前を持つストリーム集計パイプラインのロジックをストリーミング データに適用します。 Atlas Stream Processing は、各ストリーム プロセッサの定義を永続ストレージに保存し、再利用できるようにします。 特定のストリーム プロセッサは、その定義が保存されているストリーム プロセシング インスタンス内でのみ使用できます。 Atlas Stream Processing は、ワーカーごとに最大4ストリーム プロセッサをサポートします。 この制限を超える追加のプロセッサには、Atlas Stream Processing は新しいリソースを割り当てます。

ストリーム プロセッサを作成および管理するには、次のものが必要です。

多くのストリーム プロセッサ コマンドでは、メソッド呼び出しで関連するストリーム プロセッサの名前を指定する必要があります。 次のセクションで説明される構文では、厳密に英数字の名前を使用することを前提としています。 ストリーム プロセッサの名前にハイフン(-)や完全停止(.)など、英数字以外の文字が含まれている場合は、名前を角括弧([])とdouble引用符("")で囲む必要があります。メソッド呼び出し(sp.["special-name-stream"].stats() と同様)。

sp.process() mongoshメソッドを使用して、ストリーム プロセッサを対話的に作成できます。対話的に作成したストリーム プロセッサは、次の動作を示します。

  • 出力とデッドレター キューのドキュメントを shell に書込む

  • 作成後すぐに実行を開始

  • 10分間、またはユーザーがそれらを停止するまで実行します

  • 停止した後に永続化しない

対話的に作成するストリーム プロセッサは、プロトタイプ作成を目的としています。 永続的なストリーム プロセッサを作成するには、「 ストリーム プロセッサの作成 」を参照してください。

sp.process() の構文は次のとおりです。

sp.process(<pipeline>)
フィールド
タイプ
必要性
説明

pipeline

配列

必須

データのストリーミング配信に適用するストリーム集約パイプライン

ストリーム プロセッサを対話的に作成するには、次の手順に従います。

1

Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、 mongosh を使用して接続します。

以下のコマンドは、x.059 認証を使用して、streamOwner という名前のユーザーとしてストリーム プロセシング インスタンスに接続します。

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

ユーザーのパスワードの入力を求められたら、入力します。

2

mongoshプロンプトで、 pipelineという名前の変数に適用する集計ステージを含む配列を割り当てます。

次の例では、接続レジストリ内のmyKafka接続のstuffトピックを$sourceとして使用し、 temperatureフィールドの値が46であるレコードを照合し、処理されたメッセージをoutputに出力します。接続レジストリにあるmySink接続のトピック。

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

次のコマンドは、 pipelineで定義されたロジックを適用するストリーム プロセッサを作成します。

sp.process(pipeline)

削除するまで保持されるストリーム プロセッサを作成するには、次の手順に従います。

Atlas管理APIは、ストリームプロセッサを作成するためのエンドポイントを提供します。

1 つのストリーム プロセッサを作成

Atlas UIでストリーム プロセッサを作成するには、Atlasプロジェクトの Stream Processing ページにGo、Stream Processingインスタンスの ペインで [Configure] をクリックします。

ストリーム プロセッサを構成するには、Visual Builder または JSONエディターのどちらかを使用するかを選択できます。

1

ストリーム プロセシングインスタンスに既存のストリーム プロセッサが存在する場合は、[+ Create stream processorボタンをクリックし、ドロップダウン オプションから Visual Builder を選択します。

ビジュアル ビルダには、ストリーム プロセッサを設定できるフォームが表示されます。

2
3

Sourceフィールドで、ストリーム プロセッサのソースとして使用する接続を Connection ドロップダウン リストから選択します。

これにより、ストリーム プロセッサの ステージを構成できるJSONテキスト sourceボックスが開きます。source ステージの構文の詳細については、$source を参照してください。

次の source ステージは、事前構成された sample_stream_solar 接続のリアルタイムデータに基づいて動作します。

{
"$source": {
"connectionName": "sample_stream_solar"
}
}
4

Start building your pipeline ペインで、パイプラインに追加する集計ステージの [] ボタンをクリックします。これにより、選択した集計ステージをJSON形式で構成できるテキストボックスが開きます。

集計ステージがリストにない場合は、+ Custom stage をクリックして、 JSON形式でサポートされている集計ステージを定義します。Atlas Stream Processing集計ステージとその構文の詳細については、集計パイプライン ステージを参照してください。

次の $match ステージは、事前構成された sample_stream_solar ストリーム内の obs.wattsフィールドが 300 より大きいすべてのドキュメントと一致します。

{
"$match": {
"obs.watts": { "$gt": 300 }
}
}
5

パイプラインに追加の集計ステージを追加するには、パイプラインの最後のステージの下にある + Add stage below ボタンをクリックして、追加する集計ステージを選択するか、サポートされている別の集計ステージを定義するために Custom stage をクリックします。そうすると、JSON形式で新しいステージを構成できるテキストボックスが開きます。

6

Sinkフィールドで、Connection ドロップダウンリストから宛先接続を選択します。

Sinkフィールドで、処理データを書き込む Connection ドロップダウン リストから接続を選択します。

これにより、ストリーム プロセッサの ステージを構成できるJSONテキスト mergeボックスが開きます。merge ステージの構文の詳細については、$merge を参照してください。

次の sink ステージでは、demoConnection 接続という名前の接続内の demoDb.demoCollコレクションに処理データが書き込まれます。

{
"$merge": {
"into": {
"connectionName": "demoConnection",
"db": "demoDb",
"coll": "demoColl"
}
}
}
7

ストリーム プロセッサが作成され、Stream Processing ページの Stream Processorsタブに表示されます。

1

ストリーム プロセシングインスタンスに既存のストリーム プロセッサが存在する場合は、[+ Create stream processorボタンをクリックし、ドロップダウン オプションから Visual Builder を選択します。

JSONエディターが開き、ストリーム プロセッサをJSON形式で構成できるテキストボックスが表示されます。

2

JSONエディターのテキスト ボックスにストリーム プロセッサのJSON定義を指定します。この定義には、ストリーム プロセッサの名前と、$source ステージで開始され $merge ステージで終了する集計パイプラインの名前を含める必要があります。$source ステージと $merge ステージの間に任意の数の追加の集計ステージを含めることができます。

Atlas Stream Processing集計ステージとその構文の詳細については、集計パイプライン ステージを参照してください。

次のJSON定義では、ネストされた solarDemo$tumblingWindow$groupsample_stream_solarステージを持つ10 ステージを使用して、事前構成された 接続からのリアルタイムデータを 秒間隔で集計する、 という名前のストリーム プロセッサが作成されます。は処理されたデータをmongodb1 という名前の接続内のコレクションに書き込みます。

{
"name": "solarDemo",
"pipeline": [
{
"$source": {
"connectionName": "sample_stream_solar"
}
},
{
"$tumblingWindow": {
"interval": {
"size": 10,
"unit": "second"
},
"pipeline": [
{
"$group": {
"_id": "$group_id",
"max_watts": { "$max": "$obs.watts" },
"min_watts": { "$min": "$obs.watts" }
}
}
]
}
},
{
"$merge": {
"into": {
"connectionName": "mongodb1",
"db": "solarDb",
"coll": "solarColl"
}
}
}
]
}

mongosh を使用して新しいストリーム プロセッサを作成するには、sp.createStreamProcessor() メソッドを使用します。構文は次のとおりです。

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
タイプ
必要性
説明

name

string

必須

ストリーム プロセッサの論理名。 これは、ストリーム プロセシング インスタンス内で一意である必要があります。 この名前には、英数字のみを含める必要があります。

pipeline

配列

必須

データのストリーミング配信に適用するストリーム集約パイプライン

options

オブジェクト

任意

ストリーム プロセッサのさまざまなオプション設定を定義するオブジェクト。

options.dlq

オブジェクト

条件付き

ストリーム プロセシング インスタンスにデッド レター キューを割り当てるオブジェクト。 このフィールドは、 optionsフィールドを定義する場合に必要です。

options.dlq.connectionName

string

条件付き

接続レジストリ内の接続を識別する、人間が判読可能なラベル。 この接続は Atlas クラスターを参照する必要があります。 このフィールドは、 options.dlqフィールドを定義する場合に必要です。

options.dlq.db

string

条件付き

options.dlq.connectionNameで指定されたクラスター上の Atlas データベースの名前。 このフィールドは、 options.dlqフィールドを定義する場合に必要です。

options.dlq.coll

string

条件付き

options.dlq.dbで指定されるデータベース内のコレクションの名前。 このフィールドは、 options.dlqフィールドを定義する場合に必要です。

1

Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、 mongosh を使用して接続します。

  1. Atlas Stream Processing インスタンスの ペインで、 Connectをクリックします。

  2. Connect to your instance ダイアログで、Shellタブを選択します。

  3. ダイアログに表示される接続文字列をコピーします。 形式は次ので、<atlas-stream-processing-url> は Atlas Stream ProcessingインスタンスのURL 、<username>atlasAdmin ロールを持つデータベースユーザーのユーザー名です。

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>
    --password <password>
  4. 接続文字列をターミナルに貼り付け、<password> プレースホルダーをユーザーの認証情報に置き換えます。 Enter キーを押して実行し、Stream Processingインスタンスに接続します。

以下のコマンドは、x.059 認証を使用して、streamOwner という名前のユーザーとしてストリーム プロセシング インスタンスに接続します。

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

ユーザーのパスワードの入力を求められたら、入力します。

2

mongoshプロンプトで、 pipelineという名前の変数に適用する集計ステージを含む配列を割り当てます。

次の例パイプラインでは、接続レジストリ内の myKafka 接続の stuffトピックを$sourceとして使用し、temperatureフィールドの値が 46 であるレコードと一致させ、処理されたメッセージを output に出力します。接続レジストリにある mySink 接続のトピック。

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongoshプロンプトで、DLQ の次のプロパティを含むオブジェクトを割り当てます。

  • connectionName

  • databaseName

  • コレクション名

次の例では、 metadata.dlqデータベース コレクション内のcluster01接続を介したDLQ を定義しています。

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

次のコマンドは、 pipelineで定義されたロジックを適用するproc01という名前のストリーム プロセッサを作成します。 処理でエラーをスローするドキュメントは、 deadLetterで定義されたDLQ に書き込まれます。

sp.createStreamProcessor("proc01", pipeline, deadLetter)

注意

Atlas Stream Processing は、45 日以上で stopped であったストリーム プロセッサの内部状態を破棄します。 このようなプロセッサを起動すると、最初の実行と同じように動作し、統計情報を報告します。

ストリーム プロセッサを起動するには:

Atlas Administration API は、ストリーム プロセッサを起動するための 1 つのストリーム プロセッサを起動 および オプションを使用して 1 つのストリーム プロセッサを起動 エンドポイントを提供します。

Atlas UIでストリーム プロセッサを起動するには、Atlasプロジェクトの Stream Processing ページにGo、Stream Processingインスタンスの ペインで Configure をクリックして、そのインスタンスに定義されているストリーム プロセッサのリストを表示します。

次に、ストリーム プロセッサの Start アイコンをクリックします。

mongosh を使用して新しいストリーム プロセッサを作成するには、sp.createStreamProcessor() メソッドを使用します。構文は次のとおりです。

sp.processor.start(<options>)

ここで、<options> startAfterは またはstartAtOperationTime のいずれかになります。これらのパラメーターの詳細については、 「 MongoDB Collection Change Stream 」を参照してください。

たとえば、 proc01という名前のストリーム プロセッサを起動するには、次のコマンドを実行します。

sp.proc01.start()
{ "ok" : 1 }

このメソッドは、ストリーム プロセッサが存在し、現在を実行中いない場合は { "ok": 1 } を返します。sp.processor.start()ではないストリーム プロセッサに対してSTOPPED を呼び出すと、mongosh はエラーを返します。

注意

Atlas Stream Processing は、45 日以上で stopped であったストリーム プロセッサの内部状態を破棄します。 このようなプロセッサを起動すると、最初の実行と同じように動作し、統計情報を報告します。

ストリーム プロセッサを停止するには:

Atlas Administration API は、ストリーム プロセッサを停止するためのエンドポイントを提供します。

1 つのストリーム プロセッサを停止

Atlas UIでストリーム プロセッサを一時停止するには、Atlasプロジェクトの Stream Processing ページにGo、Stream Processingインスタンスの ペインで Configure をクリックして、そのインスタンスに定義されているストリーム プロセッサのリストを表示します。

次に、ストリーム プロセッサの Pause アイコンをクリックします。

mongosh で既存のストリーム プロセッサを停止するには、sp.processor.stop() メソッドを使用します。

たとえば、 proc01という名前のストリーム プロセッサを停止するには、次のコマンドを実行します。

sp.proc01.stop()
{ "ok" : 1 }

このメソッドは、ストリーム プロセッサが存在し、現在を実行中場合は { "ok": 1 } を返します。sp.processor.stop()ではないストリーム プロセッサに対してrunning を呼び出すと、mongosh はエラーを返します。

既存のストリーム プロセッサの次の要素を変更できます。

ストリーム プロセッサを変更するには、次の手順に従います。

  1. ストリーム プロセッサを停止します。

  2. ストリーム プロセッサを変更します。

  3. ストリーム プロセッサを再起動します。

デフォルトでは、変更されたプロセッサは最後のチェックポイントから復元されます。あるいは、resumeFromCheckpoint=false を設定することもできます。この場合、プロセッサは要約統計のみを保持します。ウィンドウが開いているプロセッサーを変更した場合、ウィンドウは更新されたパイプラインで全て再計算されます。

注意

Operatoriscontains などのマッチャー式が含まれる)を使用して、ストリーム プロセッサ ステートが失敗した場合のアラートを構成したストリーム プロセッサの名前を変更すると、Atlas は、マッチャー式がその新しい名前と一致しない場合、名前が変更されたストリーム プロセッサに対してアラートをトリガーしません。名前が変更されたストリーム プロセッサを監視するには、アラートを再構成してください。

デフォルト設定 resumeFromCheckpoint=true が有効になっている場合、以下の制限が適用されます。

  • $source ステージは変更できません。

  • ウィンドウの間隔は変更できません。

  • ウィンドウを削除することはできません。

  • ウィンドウを使用してパイプラインを変更できるのは、そのウィンドウの内部パイプラインに $group または $sort ステージのいずれかが含まれている場合のみです。

  • 既存のウィンドウタイプを変更することはできません。たとえば、$tumblingWindow から $hoppingWindow への変更、またはその逆の変更はできません。

  • ウィンドウを持つプロセッサは、ウィンドウの再計算の結果として、一部のデータを再処理する可能性があります。

Atlas Administration API は、ストリームプロセッサを変更するためのエンドポイントを提供します。

1 つのストリーム プロセッサを変更

mongosh v2.3.4 以上が必要です。

既存のストリーム プロセッサを変更するには、sp.<streamprocessor>.modify() コマンドを使用します。<streamprocessor> は、現在のストリーム処理インスタンスに対して定義された停止中のストリームプロセッサの名前である必要があります。

たとえば、proc01 という名前のストリーム プロセッサを変更するには、次のコマンドを実行します。

sp.proc1.modify(<pipeline>, {
resumeFromCheckpoint: bool, // optional
name: string, // optional
dlq: string, // optional
}})
sp.createStreamProcessor("foo", [
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
])
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
]);
sp.foo.start();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test",
config: {
startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000)
}
}},
{$match: {
operationType: "insert"
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout2"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.stop();
sp.foo.modify({dlq: {}})
sp.foo.start();
sp.foo.stop();
sp.foo.modify([
{$source: {
connectionName: "StreamsAtlasConnection",
db: "test",
coll: "test"
}},
{$replaceRoot: {newRoot: "$fullDocument"}},
{$match: {cost: {$gt: 500}}},
{$tumblingWindow: {
interval: {unit: "day", size: 1},
pipeline: [
{$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}}
]
}},
{$merge: {
into: {
connectionName: "StreamsAtlasConnection",
db: "testout",
coll: "testout"
}
}}
], {resumeFromCheckpoint: false});
sp.foo.start();

ストリーム プロセッサを削除するには:

Atlas管理APIは、ストリームプロセッサを削除するためのエンドポイントを提供します。

1 つのストリーム プロセッサを削除

Atlas UIでストリーム プロセッサを削除するには、Atlasプロジェクトの Stream Processing ページにGo、Stream Processingインスタンスの ペインで Configure をクリックして、そのストリーム プロセッサが定義されているストリーム プロセッサのリストを表示します。

次に、ストリーム プロセッサの Delete)アイコンをクリックします。表示される確認ダイアログで、ストリーム プロセッサの名前(solarDemo)を入力して削除することを確認し、Delete をクリックします。

mongosh を使用して既存のストリーム プロセッサを削除するには、sp.processor.drop() メソッドを使用します。

たとえば、 proc01という名前のストリーム プロセッサを削除するには、次のコマンドを実行します。

sp.proc01.drop()

このメソッドは以下を返します。

  • true ストリーム プロセッサが存在する場合。

  • false ストリーム プロセッサが存在しない場合。

ストリーム プロセッサを削除すると、Atlas Stream Processing がそのストリーム用にプロビジョニングしたすべてのリソースと、保存されたすべての状態が破棄されます。

使用可能なすべてのストリーム プロセッサを一覧表示するには:

Atlas Administration API は、利用可能なすべてのストリーム プロセッサを一覧表示するためのエンドポイントを提供します。

ストリーム プロセッサを一覧表示する

Atlas UIで、Stream Processingインスタンスに定義されているストリーム プロセッサの一覧を表示するには、Atlasプロジェクトの Stream Processing ページにGo、Stream Processingインスタンスの ペインで [Configure] をクリックします。

ストリーム プロセッサの一覧とそのステータスが表示されます。

mongosh を使用して現在のストリーム プロセシングインスタンスで使用可能なすべてのストリーム プロセッサを一覧表示するには、sp.listStreamProcessors() メソッドを使用します。これにより、各ストリーム プロセッサに関連付けられた名前、開始時間、現在の状態、パイプラインを含むドキュメントのリストが返されます。構文は次のとおりです。

sp.listStreamProcessors(<filter>)

<filter> は、リストをフィルタリングするフィールドを指定するドキュメントです。

次の例では、フィルタリングされていないリクエストの戻り値を示しています。

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

同じストリーム プロセシング インスタンスで コマンドを再度実行し、 "running""state"をフィルタリングすると、次の出力が表示されます。

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

既存のストリーム プロセッサからサンプル結果の配列を mongosh を使用して STDOUT に返すには、sp.processor.sample() メソッドを使用します。例、次のコマンドは、proc01 という名前のストリーム プロセッサからサンプリングしています。

sp.proc01.sample()

このコマンドは、 CTRL-Cを使用してキャンセルするまで、または返されたサンプルのサイズが累積40 MB になるまで継続的に実行されます。 ストリーム プロセッサは、サンプル内の無効なドキュメントを次の形式の_dlqMessageドキュメントで報告します。

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
instanceName: '<instanceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

これらのメッセージを使用して、デッド レター キューコレクションを定義せずにデータのクリーンアップの問題を診断できます。

注意

Atlas Stream Processing は、45 日以上で stopped であったストリーム プロセッサの内部状態を破棄します。 このようなプロセッサを起動すると、最初の実行と同じように動作し、統計情報を報告します。

ストリーム プロセッサの統計を表示するには:

Atlas Administration API は、ストリーム プロセッサの統計を表示するためのエンドポイントを提供します。

1 つのストリーム プロセッサを取得

ストリーム プロセッサのモニタリングを表示するには、Atlasプロジェクトの Stream Processing ページにGo、 Monitoringタブを開きます。 次に、ページ左上にある Stream processor ドロップダウン リストからストリーム プロセッサを選択します。

mongosh を使用して既存のストリーム プロセッサの現在のステータスを要約するドキュメントを返すには、sp.processor.stats() メソッドを使用します。構文は次のとおりです。

sp.<streamprocessor>.stats({options: {<options>}})

ここで、 optionsは次のフィールドを持つ任意のドキュメントです。

フィールド
タイプ
説明

scale

integer

出力内の項目のサイズに使用する単位。 デフォルトでは、Atlas Stream Processing はアイテムのサイズをバイト単位で表示します。 KB 単位で表示するには、 1024scaleを指定します。

verbose

ブール値

出力ドキュメントの冗長レベルを指定するフラグ。 trueに設定されている場合、出力ドキュメントには、パイプライン内の各演算子の統計を報告するサブドキュメントが含まれます。 デフォルトはfalseです。

出力ドキュメントには、次のフィールドがあります。

フィールド
タイプ
説明

ns

string

ストリーム プロセッサが定義されている名前空間。

stats

オブジェクト

ストリーム プロセッサの動作状態を説明するドキュメント。

stats.name

string

ストリーム プロセッサの名前。

stats.status

string

ストリーム プロセッサの状態。 このフィールドには、次の値を指定できます。

  • starting

  • running

  • error

  • stopping

stats.scaleFactor

integer

サイズ フィールドが表示される単位。 1に設定されている場合、サイズはバイト単位で表示されます。 1024に設定されている場合、サイズはキロバイト単位で表示されます。

stats.inputMessageCount

integer

ストリームに公開されたドキュメントの数。 ドキュメントは、パイプライン全体を通過する場合ではなく、 $sourceステージを通過するとストリームに「公開」されたと見なされます。

stats.inputMessageSize

integer

ストリームに公開されたバイト数またはキロバイト数。 バイトは、パイプライン全体を通過する場合ではなく、 $sourceステージを通過するとストリームに「公開」と見なされます。

stats.outputMessageCount

integer

ストリームによって処理されたドキュメントの数。 ドキュメントはストリームによってパイプライン全体を通過すると、ストリームによって「処理済み」と見なされます。

stats.outputMessageSize

integer

ストリームによって処理されたバイト数またはキロバイト数。 バイトはパイプライン全体を通過すると、ストリームによって「処理済み」と見なされます。

stats.dlqMessageCount

integer

stats.dlqMessageSize

integer

stats.changeStreamTimeDifferenceSecs

integer

最新の変更ストリーム再開トークンによって表されるイベント時間とoplog 内の最新のイベントとの差(秒単位)。

stats.changeStreamState

token

最新の変更ストリーム再開トークン。 変更ストリーム ソースを持つストリーム プロセッサにのみ適用されます。

stats.latency

ドキュメント

ストリーム プロセッサ全体のレイテンシ統計です。Atlas Stream Processing は、verboseオプションを指定した場合にのみ、このフィールドを返します。

stats.latency.p50

integer

過去 30 秒間に処理されたすべてのドキュメントの推定 50 パーセンタイル レイテンシ。パイプラインにウィンドウ ステージが含まれている場合、レイテンシ測定にはウィンドウの間隔が含まれます。

例えば、$tumblingWindowステージの間隔が5分の場合、レイテンシ測定にはその5分が含まれます。

stats.latency.p99

integer

過去 30 秒間に処理されたすべてのドキュメントの推定 99 パーセンタイル レイテンシ。パイプラインにウィンドウ ステージが含まれている場合、レイテンシ測定にはウィンドウの間隔が含まれます。

例えば、$tumblingWindowステージの間隔が5分の場合、レイテンシ測定にはその5分が含まれます。

stats.latency.start

datetime

直近の 30 秒測定ウィンドウが開始した時刻(ウォールタイム)。

stats.latency.end

datetime

直近の 30 秒間の測定ウィンドウが終了した時点のウォール時間。

stats.latency.unit

string

レイテンシが計測される時間の単位。この値は常に microseconds です。

stats.latency.count

integer

直近の 30 秒測定ウィンドウでストリームプロセッサが処理したドキュメント数。

stats.latency.sum

integer

直近の 30 秒測定ウィンドウで取得された、個々のレイテンシ測定値の合計(マイクロ秒単位)。

stats.stateSize

integer

Windowsがプロセッサの状態を保存するために使用するバイト数。

stats.watermark

integer

現在の浮動小数のタイムスタンプ。

stats.operatorStats

配列

プロセッサ パイプライン内の各演算子の統計情報。 Atlas Stream Processing では、 verboseオプションで渡された場合にのみこのフィールドが返されます。

stats.operatorStats は、多数のコアstatsフィールドの演算子ごとのバージョンを提供します。

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.latency

  • stats.operatorStats.stateSize

stats.operatorStats には、次の一意のフィールドが含まれます。

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTimeMillis

stats.operatorStats verbose オプションを渡し、プロセッサにウィンドウステージが含まれている場合、次のフィールドも含まれます。

  • stats.minOpenWindowStartTime

  • stats.maxOpenWindowStartTime

stats.operatorStats.maxMemoryUsage

integer

演算子の最大メモリ使用量(バイトまたはキロバイト単位)。

stats.operatorStats.executionTimeSecs

integer

演算子の合計実行時間(秒単位)。

stats.minOpenWindowStartTime

date

最小オープンウィンドウの開始時刻。この値は任意です。

stats.maxOpenWindowStartTime

date

最大オープンウィンドウの開始時刻。この値は任意です。

stats.kafkaPartitions

配列

Apache Kafka プロバイダーのパーティションのオフセット情報。kafkaPartitionsApache Kafkaソースを使用する接続にのみ適用されます。

stats.kafkaPartitions.partition

integer

Apache Kafkaトピックのパーティション番号。

stats.kafkaPartitions.currentOffset

integer

指定されたパーティションに対するストリーム プロセッサのオフセット。 この値は、ストリーム プロセッサが処理した以前のオフセットに1を加えた値に等しくなります。

stats.kafkaPartitions.checkpointOffset

integer

ストリーム プロセッサがApache Kafkaプロバイダーと指定されたパーティションのチェックポイントに最後にコミットしたオフセット。このオフセットを介するすべてのメッセージは、最後のチェックポイントに記録されます。

stats.kafkaPartitions.isIdle

ブール値

パーティションがアイドル状態であるかどうかを示すフラグ。この値はデフォルトで false です。

たとえば、以下は、 inst01という名前のストリーム プロセシング インスタンス上のproc01という名前のストリーム プロセッサのステータスを示しており、アイテムサイズは KB 単位で表示されています。

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

戻る

VPC接続の管理