Docs Menu
Docs Home
/ /

sp.createStreamProcessor() (método mongosh)

sp.createStreamProcessor()

Novedad en la 7.0 versión: Crea un Procesador de flujo en el espacio de trabajo de procesamiento de flujo actual.

Este método es compatible con los espacios de trabajo de procesamiento de secuencias de Atlas.

El método sp.createStreamProcessor() tiene la siguiente sintaxis:

sp.createStreamProcessor(
<name>,
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() toma estos campos:

Campo
Tipo
Necesidad
Descripción

name

string

Requerido

Nombre lógico del procesador de flujo. Debe ser único dentro del espacio de trabajo de procesamiento de flujo.

pipeline

arreglo

Requerido

Canal de agregación de transmisiones que desea aplicar a sus datos de transmisión.

options

Objeto

Opcional

Objeto que define varias configuraciones opcionales para su procesador de flujo.

options.dlq

Objeto

Condicional

Objeto que asigna un Cola demensajes no entregados para su options espacio de trabajo de procesamiento de flujos. Este campo es necesario si define el campo.

options.dlq.connectionName

string

Condicional

Etiqueta que identifica una conexión en su registro de conexiones. Esta conexión debe hacer referencia a un clúster Atlas. Este campo es necesario si define el campo options.dlq.

options.dlq.db

string

Condicional

Nombre de una base de datos Atlas en el clúster especificado en options.dlq.connectionName. Este campo es necesario si se define el campo options.dlq.

options.dlq.coll

string

Condicional

Nombre de una colección en la base de datos especificada en options.dlq.db. Este campo es necesario si se define el campo options.dlq.

options.tier

string

Opcional

El nivel al que Atlas Stream Processing asigna el procesador. Si no declara esta opción, Atlas Stream Processing asigna el procesador al nivel del espacio de trabajo de Stream Processing. Debe ser uno de los siguientes:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

Para obtener más información, consulte Niveles.

sp.createStreamProcessor() Crea un procesador de flujo persistente con nombre en el espacio de trabajo de procesamiento de flujo actual. Puede inicializar este procesador de flujo sp.processor.start() con. Si intenta crear un procesador de flujo con el mismo nombre que uno existente, mongosh devolverá un error.

El usuario que ejecuta sp.createStreamProcessor() debe tener el atlasAdmin rol.

El siguiente ejemplo crea un procesador de flujo llamado solarDemo que ingiere datos de la sample_stream_solar conexión. El procesador excluye todos los documentos cuyo valor del device_id campo sea device_8 y pasa el resto a una ventana de bucle con una 10duración de segundos. Cada ventana agrupa los documentos que recibe y devuelve diversas estadísticas útiles de cada grupo. El procesador de flujo fusiona estos registros en a solar_db.solar_coll través de la mongodb1 conexión.

sp.createStreamProcessor(
'solarDemo',
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

Volver

db.colección.actualizarÍndiceDeBúsqueda

En esta página