定义
兼容性
Atlas Stream Processing 工作区支持此方法。
语法
sp.processor.modify() 方法使用的语法如下:
sp.processor.modify({ pipeline: [ <pipeline> ], name: <name>, dlq: { connectionName: <connectionName>, db: <db>, coll: <coll> }, resumeFromCheckpoint: <resumeFromCheckpoint>, tier: <tier> })
命令字段
sp.processor.modify() 采用以下字段:
字段 | 类型 | 必要性 | 说明 |
|---|---|---|---|
| 阵列 | Optional | |
| 字符串 | Optional | 流处理器的新名称。 |
| 对象 | Optional | |
| 字符串 | 可选的 | 标识连接注册表中连接的标签。此连接必须引用Atlas 集群。设立死信队列(DLQ)时必需。 |
| 字符串 | 可选的 | 在 |
| 字符串 | 可选的 |
|
| 布尔 | Optional | 指定修改后的流处理器是否从其上一个检查点恢复的标志。默认下,此字段为 |
| 字符串 | Optional |
行为
在调用此方法之前,流处理器必须处于 STOPPED 状态。 pipeline 参数会替换处理器的整个现有管道,包括您未更改的阶段。
默认下,修改后的处理器会从其上一个检查点恢复。如果将 resumeFromCheckpoint设立为 false,则修改后的处理器仅保留摘要统计信息。当您修改具有打开窗口的处理器时, Atlas Stream Processing会在更新的管道上重新计算这些窗口。
访问控制
要运行sp.processor.modify() ,您必须具有atlasAdmin 角色。
例子
该示例更改了名为 solarDemo 的已停止流处理器,以添加 $match 阶段、重命名、更新其层级并配置死信队列(DLQ):
sp.solarDemo.modify({ pipeline: [ { $source: { connectionName: "sample_stream_solar" }}, { $match: { device_id: "device_0" }}, { $merge: { into: { connectionName: "cluster0", db: "testout", coll: "testout2" }}} ], name: "solarDemoRenamed", dlq: { connectionName: "cluster0", db: "testout", coll: "dlq" }, resumeFromCheckpoint: true, tier: "SP10" })
{ ok: 1 }
启动重命名的处理器,然后运行sp.listStreamProcessors() 以验证名称、层级和死信队列(DLQ)更改:
sp.solarDemoRenamed.start() sp.listStreamProcessors()
[ { id: '6a39b08e6d9040e1cef8e31f', name: 'solarDemoRenamed', lastModified: ISODate('2026-06-22T22:00:46.858Z'), state: 'STARTED', tier: 'SP10', errorMsg: '', workers: [ 'worker-5f4c5bbc9d-7hg2q' ], pipeline: [ { '$source': { connectionName: 'sample_stream_solar' } }, { '$match': { device_id: 'device_0' } }, { '$merge': { into: { connectionName: 'cluster0', db: 'testout', coll: 'testout2' } } } ], lastStateChange: ISODate('2026-06-22T22:01:16.835Z'), dlq: { connectionName: 'cluster0', db: 'testout', coll: 'dlq' } } ]
运行sp.processor.sample() 以验证管道更改:
sp.solarDemoRenamed.sample()
{ device_id: 'device_0', group_id: 9, timestamp: '2026-06-22T22:01:25.828+00:00', max_watts: 450, event_type: 0, obs: { watts: 122, temp: 18 } } { device_id: 'device_0', group_id: 3, timestamp: '2026-06-22T22:01:26.828+00:00', max_watts: 450, event_type: 0, obs: { watts: 377, temp: 7 } }