Docs Menu
Docs Home
/ /

SP.createStreamProcessor()(mongoshメソッド)

sp.createStreamProcessor()

バージョン の新機能:7.0 現在のストリーム プロセシング ワークスペースに Stream プロセッサを作成します。

このメソッドは Atlas Stream Processing ワークスペースでサポートされています。

sp.createStreamProcessor()メソッドの構文は次のとおりです。

sp.createStreamProcessor(
<name>,
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() 次のフィールドを取ります。

フィールド
タイプ
必要性
説明

name

string

必須

ストリーム プロセッサの論理名。これは、ストリーム処理ワークスペース内で一意である必要があります。

pipeline

配列

必須

データのストリーミング配信に適用するストリーム集約パイプライン

options

オブジェクト

任意

ストリーム プロセッサのさまざまなオプション設定を定義するオブジェクト。

注意

以下のフィールドはすべて構成できるオプションです。

dlq

オブジェクト

条件付き

ストリーム処理ワークスペースに のデッドレターキュー (DLQ)を割り当てるオブジェクト。このフィールドは、options フィールドを定義する場合に必要です。

dlq.connectionName

string

条件付き

接続レジストリ内の接続を識別するラベル。 この接続は Atlas クラスターを参照する必要があります。 このフィールドは、 dlqフィールドを定義する場合に必要です。

dlq.db

string

条件付き

dlq.connectionNameで指定されたクラスター上の Atlas データベースの名前。 このフィールドは、 dlqフィールドを定義する場合に必要です。

dlq.coll

string

条件付き

dlq.dbで指定されるデータベース内のコレクションの名前。 このフィールドは、 dlqフィールドを定義する場合に必要です。

options.tier

string

任意

Atlas Stream Processing がプロセッサを割り当てる階層。このオプションを宣言しない場合、Atlas Stream Processing はプロセッサをストリーム処理ワークスペースの 階層に割り当てます。次のいずれかである必要があります。

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

詳細については、「 階層 」を参照してください。

sp.createStreamProcessor() は、現在のストリーム処理ワークスペースに永続的な名前付きストリーム プロセッサを作成します。このストリーム プロセッサは で初期化できます。既存のストリーム プロセッサと同じ名前でストリームsp.processor.start() プロセッサを作成しようとすると、mongosh はエラーを返します。

sp.createStreamProcessor()を実行しているユーザーにはatlasAdminロールが必要です。

次の例では、 sample_stream_solar接続からデータを取り込むsolarDemoという名前のストリーム プロセッサを作成します。 プロセッサは、 device_idフィールドの値がdevice_8であるすべてのドキュメントを除外し、残りを10秒の期間のローリングウィンドウに渡します。 各ウィンドウは受け取ったドキュメントをグループ化し、各グループのさまざまな有用な統計を返します。 次に、ストリーム プロセッサはこれらのレコードをmongodb1接続を介してsolar_db.solar_collにマージします。

sp.createStreamProcessor(
'solarDemo',
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

戻る

Atlas Stream Processing

項目一覧