Docs Menu
Docs Home
/ /

SP.process()(mongoshメソッド)

sp.process()

バージョン の新機能:7.0 現在のストリーム プロセシング ワークスペースにエフェメラル Stream プロセッサを作成します。

このメソッドは Atlas Stream Processing ワークスペースでサポートされています。

sp.process()メソッドの構文は次のとおりです。

sp.process(
[
<pipeline>
]
)

sp.createStreamProcessor() 次のフィールドを取ります。

フィールド
タイプ
必要性
説明

pipeline

配列

必須

データのストリーミング配信に適用するストリーム集約パイプライン

sp.process() 現在のストリーム処理ワークスペースにエフェメラルの名前のないストリーム プロセッサを作成し、すぐに初期化します。このストリーム プロセッサは の実行中のみ保持されます。エフェメラル ストリーム プロセッサを終了した場合、使用するには再度作成する必要があります。

sp.process()を実行しているユーザーにはatlasAdminロールが必要です。

次の例では、 sample_stream_solar接続からデータを取り込む一時的なストリーム プロセッサを作成しています。 プロセッサは、 device_idフィールドの値がdevice_8であるすべてのドキュメントを除外し、残りを10秒の期間のローリングウィンドウに渡します。 各ウィンドウは受け取ったドキュメントをグループ化し、各グループのさまざまな有用な統計を返します。 次に、ストリーム プロセッサはこれらのレコードをmongodb1接続を介してsolar_db.solar_collにマージします。

sp.process(
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

戻る

SP.listStreamProcessers

項目一覧