Docs 菜单
Docs 主页
/ /

sp. 进程()(mongosh方法)

sp.process()

版本7.0 中的新增功能:在当前流处理工作区上创建临时 流处理器。

Atlas Stream Processing工作区支持此方法。

sp.process() 方法使用的语法如下:

sp.process(
[
<pipeline>
]
)

sp.createStreamProcessor() 采用这些字段:

字段
类型
必要性
说明

pipeline

阵列

必需

要应用于流数据的流聚合管道

sp.process() 在当前流处理工作区上创建一个临时的未命名流处理器并立即将其初始化。此流处理器仅在运行时持续存在。如果终止临时流处理器,则必须重新创建才能使用。

运行sp.process()的用户必须具有atlasAdmin角色。

以下示例创建了一个临时流处理器,该处理器从sample_stream_solar连接摄取数据。 处理器会排除device_id字段值为device_8的所有文档,将其余文档传递到持续时间为10秒的滚动窗口。 每个窗口对其接收的文档进行分组,然后返回每组的各种有用的统计数据。 然后,流处理器通过mongodb1连接将这些记录合并到solar_db.solar_coll

sp.process(
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)
  • 流聚合

  • 托管流处理器

后退

sp.listStreamProcessors

在此页面上