Docs 菜单
Docs 主页
/ /

sp.createStreamProcessor()(mongosh方法)

sp.createStreamProcessor()

在当前流处理工作区上创建 流处理器。

只有在连接到流处理工作区时才能调用此命令。

此命令需要mongosh版本 ≥ 2.0 。

Atlas Stream Processing工作区支持此方法。

sp.createStreamProcessor() 方法使用的语法如下:

sp.createStreamProcessor(
<name>,
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() 采用这些字段:

字段
类型
必要性
说明

name

字符串

必需

流处理器的逻辑名称。这在流处理工作区中必须是唯一的。

pipeline

阵列

必需

要应用于流数据的流聚合管道

options

对象

Optional

为流处理器定义各种可选设置的对象。

options.dlq

对象

可选的

为流处理工作区分配 死信队列(DLQ)的对象。如果定义options 字段,则此字段是必需的。

options.dlq.connectionName

字符串

可选的

标识连接注册表中连接的标签。 此连接必须引用 Atlas 集群。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.db

字符串

可选的

options.dlq.connectionName中指定的集群上的 Atlas 数据库的名称。 如果您定义options.dlq字段,则此字段是必需的。

options.dlq.coll

字符串

可选的

options.dlq.db中指定的数据库中的集合名称。 如果您定义options.dlq字段,则此字段是必需的。

options.tier

字符串

Optional

Atlas Stream Processing将处理器分配到的层级。如果您未声明此选项, Atlas Stream Processing会将处理器分配给流处理工作区的层级。必须是以下之一:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

要学习;了解更多信息,请参阅层级。

sp.createStreamProcessor() 在当前流处理工作区上创建一个持久的、已命名的流处理器。您可以使用sp.processor.start() 初始化此流处理器。如果您尝试创建与现有流处理器同名的流处理器,mongosh 将返回错误。

运行sp.createStreamProcessor()的用户必须具有atlasAdmin角色。

以下示例创建了一个名为solarDemo的流处理器,该处理器从sample_stream_solar连接摄取数据。 处理器会排除device_id字段值为device_8的所有文档,将其余文档传递到持续时间为10秒的滚动窗口。 每个窗口对其接收的文档进行分组,然后返回每组的各种有用的统计数据。 然后,流处理器通过mongodb1连接将这些记录合并到solar_db.solar_coll

sp.createStreamProcessor(
'solarDemo',
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

后退

Atlas Stream Processing

在此页面上