Docs 菜单
Docs 主页
/ /

sp.processor.modify()(mongosh方法)

sp.processor.modify()

修改当前流处理工作区上已命名的 流处理器。

Atlas Stream Processing 工作区支持此方法。

sp.processor.modify() 方法使用的语法如下:

sp.processor.modify({
pipeline: [
<pipeline>
],
name: <name>,
dlq: {
connectionName: <connectionName>,
db: <db>,
coll: <coll>
},
resumeFromCheckpoint: <resumeFromCheckpoint>,
tier: <tier>
})

sp.processor.modify() 采用以下字段:

字段
类型
必要性
说明

pipeline

阵列

Optional

应用流媒体数据的聚合阶段数组,其中最后一个阶段必须是接收器阶段。要学习;了解更多信息,请参阅流处理聚合。

name

字符串

Optional

流处理器的新名称。

dlq

对象

Optional

为流处理器设置{} 死信队列(DLQ)的对象。要删除现有的死信队列(DLQ),请传递一个空对象()。

dlq.connectionName

字符串

可选的

标识连接注册表中连接的标签。此连接必须引用Atlas 集群。设立死信队列(DLQ)时必需。

dlq.db

字符串

可选的

dlq.connectionName 中指定的集群上的Atlas数据库的名称。设立死信队列(DLQ)时必需。

dlq.coll

字符串

可选的

dlq.db 中指定的数据库中的集合名称。设立死信队列(DLQ)时必需。

resumeFromCheckpoint

布尔

Optional

指定修改后的流处理器是否从其上一个检查点恢复的标志。默认下,此字段为 true。设立为 false 时,处理器仅保留摘要统计信息。

tier

字符串

Optional

分配给流处理器的层级。如果不声明此选项,处理器将保留其当前层级。必须是以下值之一:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

要学习;了解更多信息,请参阅层级。

在调用此方法之前,流处理器必须处于 STOPPED 状态。 pipeline 参数会替换处理器的整个现有管道,包括您未更改的阶段。

默认下,修改后的处理器会从其上一个检查点恢复。如果将 resumeFromCheckpoint设立为 false,则修改后的处理器仅保留摘要统计信息。当您修改具有打开窗口的处理器时, Atlas Stream Processing会在更新的管道上重新计算这些窗口。

有关修改流处理器时应用的限制,请参阅修改流处理器。

要运行sp.processor.modify() ,您必须具有atlasAdmin 角色。

该示例更改了名为 solarDemo 的已停止流处理器,以添加 $match 阶段、重命名、更新其层级并配置死信队列(DLQ):

sp.solarDemo.modify({
pipeline: [
{ $source: { connectionName: "sample_stream_solar" }},
{ $match: { device_id: "device_0" }},
{ $merge: { into: {
connectionName: "cluster0",
db: "testout",
coll: "testout2"
}}}
],
name: "solarDemoRenamed",
dlq: {
connectionName: "cluster0",
db: "testout",
coll: "dlq"
},
resumeFromCheckpoint: true,
tier: "SP10"
})
{ ok: 1 }

启动重命名的处理器,然后运行sp.listStreamProcessors() 以验证名称、层级和死信队列(DLQ)更改:

sp.solarDemoRenamed.start()
sp.listStreamProcessors()
[
{
id: '6a39b08e6d9040e1cef8e31f',
name: 'solarDemoRenamed',
lastModified: ISODate('2026-06-22T22:00:46.858Z'),
state: 'STARTED',
tier: 'SP10',
errorMsg: '',
workers: [ 'worker-5f4c5bbc9d-7hg2q' ],
pipeline: [
{ '$source': { connectionName: 'sample_stream_solar' } },
{ '$match': { device_id: 'device_0' } },
{
'$merge': {
into: {
connectionName: 'cluster0',
db: 'testout',
coll: 'testout2'
}
}
}
],
lastStateChange: ISODate('2026-06-22T22:01:16.835Z'),
dlq: {
connectionName: 'cluster0',
db: 'testout',
coll: 'dlq'
}
}
]

运行sp.processor.sample() 以验证管道更改:

sp.solarDemoRenamed.sample()
{
device_id: 'device_0',
group_id: 9,
timestamp: '2026-06-22T22:01:25.828+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 122,
temp: 18
}
}
{
device_id: 'device_0',
group_id: 3,
timestamp: '2026-06-22T22:01:26.828+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 377,
temp: 7
}
}

后退

sp.processor.drop

在此页面上