Definición
sp.process()Crea un efímero Stream Processor en el Stream Processing Workspaceactual.
Solo puedes invocar este comando mientras estés conectado a un entorno de trabajo de Stream Processing.
Este comando requiere
mongoshversión ≥ 2.0.
Compatibilidad
Este método es compatible en espacios de trabajo de Atlas Stream Processing.
Sintaxis
El método sp.process() tiene la siguiente sintaxis:
sp.process( [ <pipeline> ], { <options> } )
Campos de comandos
sp.createStreamProcessor() toma estos campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Requerido | Nombre lógico para el procesador de flujos. Esto debe ser único dentro del espacio de trabajo de stream processing. |
| arreglo | Requerido | Transmisión pipeline de agregación que desea aplicar a sus datos de transmisión. |
| Objeto | Opcional | Objeto que define varias configuraciones opcionales para tu procesador de flujo. |
| Objeto | Condicional | Objeto que asigna un
Cola demensajes no entregados para su |
| string | Condicional | Etiqueta que identifica una conexión en tu registro de conexiones. Esta conexión debe hacer referencia a un clúster de Atlas. Este campo es necesario si se define el campo |
| string | Condicional | Nombre de una base de datos Atlas en el clúster especificado en |
| string | Condicional | Nombre de una colección en la base de datos especificada en |
Comportamiento
sp.process() crea un procesador de flujo efímero y sin nombre en el espacio de trabajo de Stream Processing actual y lo inicializa inmediatamente. Este procesador de flujos sólo persiste mientras se ejecuta. Si finalizas un procesador de flujos efímero, debes crearlo nuevamente para poder utilizarlo.
Control de acceso
El usuario que ejecuta sp.process() debe tener el rol atlasAdmin.
Ejemplo
El siguiente ejemplo crea un procesador de flujo efímero que ingiere datos de la conexión sample_stream_solar. El procesador excluye todos los documentos donde el valor del campo device_id es device_8, pasando el resto a una ventana móvil con una duración de 10segundos. Cada ventana agrupa los documentos recibidos y devuelve diversas estadísticas útiles de cada grupo. El procesador de flujo luego fusiona estos registros en solar_db.solar_coll a través de la conexión mongodb1.
sp.process( [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: Int32(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )