sp.process()
定义
兼容性
语法
sp.process()
方法使用的语法如下:
sp.process( [ <pipeline> ], { <options> } )
命令字段
sp.createStreamProcessor()
采用这些字段:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
| 字符串 | 必需 | 流处理器的逻辑名称。 它在Atlas Stream Processing实例中必须是唯一的。 |
| 阵列 | 必需 | 要应用于流数据的流聚合管道。 |
| 对象 | Optional | 为流处理器定义各种可选设置的对象。 |
| 对象 | 可选的 | |
| 字符串 | 可选的 | 标识连接注册表中连接的标签。 此连接必须引用 Atlas 集群。 如果您定义 |
| 字符串 | 可选的 |
|
| 字符串 | 可选的 |
|
行为
sp.process()
在当前Atlas Stream Processing实例上创建一个临时的未命名流处理器并立即将其初始化。 此流处理器仅在运行时持续存在。 如果终止临时流处理器,则必须重新创建才能使用。
访问控制
运行sp.process()
的用户必须具有atlasAdmin
角色。
例子
以下示例创建了一个临时流处理器,该处理器从sample_stream_solar
连接摄取数据。 处理器会排除device_id
字段值为device_8
的所有文档,将其余文档传递到持续时间为10秒的滚动窗口。 每个窗口对其接收的文档进行分组,然后返回每组的各种有用的统计数据。 然后,流处理器通过mongodb1
连接将这些记录合并到solar_db.solar_coll
。
sp.process( [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )