Definición
Novedad en la 7.0 versión: Crea un efímero Procesador de flujo en el espacio de trabajo de procesamiento de flujo actual.
Compatibilidad
Este método es compatible con los espacios de trabajo de procesamiento de secuencias de Atlas.
Sintaxis
El método sp.process() tiene la siguiente sintaxis:
sp.process( [ <pipeline> ] )
Campos de comandos
sp.createStreamProcessor() toma estos campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| arreglo | Requerido | Canal de agregación de transmisiones que desea aplicar a sus datos de transmisión. |
Comportamiento
sp.process() Crea un procesador de flujo efímero sin nombre en el espacio de trabajo de procesamiento de flujo actual y lo inicializa inmediatamente. Este procesador de flujo solo persiste mientras se ejecuta. Si finaliza un procesador de flujo efímero, debe volver a crearlo para poder usarlo.
Control de acceso
El usuario que ejecuta sp.process() debe tener la
atlasAdmin role.
Ejemplo
El siguiente ejemplo crea un procesador de flujo efímero que ingiere datos de la sample_stream_solar conexión. El procesador excluye todos los documentos cuyo valor del device_id campo sea device_8 y pasa el resto a una ventana de bucle con una 10duración de segundos. Cada ventana agrupa los documentos que recibe y devuelve diversas estadísticas útiles de cada grupo. El procesador de flujo fusiona estos registros en a solar_db.solar_coll través de la mongodb1 conexión.
sp.process( [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: Int32(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )