Docs 菜单
Docs 主页
/
MongoDB Manual
/ / /

sp.process()

在此页面上

  • 定义
  • 兼容性
  • 语法
  • 命令字段
  • 行为
  • 访问控制
  • 例子
  • 了解详情
sp.process()

7.0版本新增: 在当前 流处理实例 上创建临时 流处理器 。

Atlas Stream Processing实例支持此方法。

sp.process() 方法使用的语法如下:

sp.process(
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() 采用这些字段:

字段
类型
必要性
说明

name

字符串

必需

流处理器的逻辑名称。 它在Atlas Stream Processing实例中必须是唯一的。

pipeline

阵列

必需

要应用于流数据的流聚合管道

options

对象

Optional

为流处理器定义各种可选设置的对象。

options.dlq

对象

可选的

为您的 实例分配 死信队列Atlas Stream Processing 的对象。如果您定义options字段,则此字段是必需的。

options.dlq.connectionName

字符串

可选的

标识连接注册表中连接的标签。 此连接必须引用 Atlas 集群。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.db

字符串

可选的

options.dlq.connectionName中指定的集群上的 Atlas 数据库的名称。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.coll

字符串

可选的

options.dlq.db中指定的数据库中的集合名称。 如果您定义options.dlq字段,则此字段是必需的。

sp.process() 在当前Atlas Stream Processing实例上创建一个临时的未命名流处理器并立即将其初始化。 此流处理器仅在运行时持续存在。 如果终止临时流处理器,则必须重新创建才能使用。

运行sp.process()的用户必须具有atlasAdmin角色。

以下示例创建了一个临时流处理器,该处理器从sample_stream_solar连接摄取数据。 处理器会排除device_id字段值为device_8的所有文档,将其余文档传递到持续时间为10秒的滚动窗口。 每个窗口对其接收的文档进行分组,然后返回每组的各种有用的统计数据。 然后,流处理器通过mongodb1连接将这些记录合并到solar_db.solar_coll

sp.process(
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: NumberInt(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)
  • 流聚合

  • 托管流处理器

后退

sp.listStreamProcessors