定义
兼容性
语法
sp.createStreamProcessor() 方法使用的语法如下:
sp.createStreamProcessor(   <name>,   [     <pipeline>   ],   {     <options>   } ) 
命令字段
sp.createStreamProcessor() 采用这些字段:
字段  | 类型  | 必要性  | 说明  | 
|---|---|---|---|
  | 字符串  | 必需  | 流处理器的逻辑名称。这在流处理工作区中必须是唯一的。  | 
  | 阵列  | 必需  | 要应用于流数据的流聚合管道。  | 
  | 对象  | Optional  | 为流处理器定义各种可选设置的对象。  | 
  | 对象  | 可选的  | |
  | 字符串  | 可选的  | 标识连接注册表中连接的标签。 此连接必须引用 Atlas 集群。 如果您定义  | 
  | 字符串  | 可选的  | 
  | 
  | 字符串  | 可选的  | 
  | 
  | 字符串  | Optional  | Atlas Stream Processing将处理器分配到的层级。如果您未声明此选项, Atlas Stream Processing会将处理器分配给流处理工作区的层级。必须是以下之一: 
  | 
行为
sp.createStreamProcessor() 在当前流处理工作区上创建一个持久的、已命名的流处理器。您可以使用sp.processor.start() 初始化此流处理器。如果您尝试创建与现有流处理器同名的流处理器,mongosh 将返回错误。
访问控制
运行sp.createStreamProcessor()的用户必须具有atlasAdmin角色。
例子
以下示例创建了一个名为solarDemo的流处理器,该处理器从sample_stream_solar连接摄取数据。 处理器会排除device_id字段值为device_8的所有文档,将其余文档传递到持续时间为10秒的滚动窗口。 每个窗口对其接收的文档进行分组,然后返回每组的各种有用的统计数据。 然后,流处理器通过mongodb1连接将这些记录合并到solar_db.solar_coll 。
sp.createStreamProcessor(   'solarDemo',   [     {       $source: {         connectionName: 'sample_stream_solar',         timeField: {           $dateFromString: {             dateString: '$timestamp'           }         }       }     },     {       $match: {         $expr: {           $ne: [             "$device_id",             "device_8"           ]         }       }     },     {       $tumblingWindow: {         interval: {           size: Int32(10),           unit: "second"         },         "pipeline": [           {             $group: {               "_id": {  "device_id": "$device_id" },               "max_temp": { $max: "$obs.temp" },               "max_watts": { $max: "$obs.watts" },               "min_watts": { $min: "$obs.watts" },               "avg_watts": { $avg: "$obs.watts" },               "median_watts": {                                 $median: {                                   input: "$obs.watts",                                   method: "approximate"                                 }                               }             }           }         ]       }     },     {       $merge: {         into: {           connectionName: "mongodb1",           db: "solar_db",           coll: "solar_coll"         },         on: ["_id"]       }     }   ] )