Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/ /

sp.createStreamProcessor() (método mongosh)

sp.createStreamProcessor()

Novedad en la 7.0 versión: Crea un Stream Processor en el Stream Processing Workspaceactual.

Este método es compatible en espacios de trabajo de Atlas Stream Processing.

El método sp.createStreamProcessor() tiene la siguiente sintaxis:

sp.createStreamProcessor(
<name>,
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() toma estos campos:

Campo
Tipo
Necesidad
Descripción

name

string

Requerido

Nombre lógico para el procesador de flujos. Esto debe ser único dentro del espacio de trabajo de stream processing.

pipeline

arreglo

Requerido

Transmisión pipeline de agregación que desea aplicar a sus datos de transmisión.

options

Objeto

Opcional

Objeto que define varias configuraciones opcionales para tu procesador de flujo.

options.dlq

Objeto

Condicional

Objeto que asigna un Cola demensajes no entregados para su options espacio de trabajo de procesamiento de flujos. Este campo es necesario si define el campo.

options.dlq.connectionName

string

Condicional

Etiqueta que identifica una conexión en tu registro de conexiones. Esta conexión debe hacer referencia a un clúster de Atlas. Este campo es necesario si se define el campo options.dlq.

options.dlq.db

string

Condicional

Nombre de una base de datos Atlas en el clúster especificado en options.dlq.connectionName. Este campo es necesario si se define el campo options.dlq.

options.dlq.coll

string

Condicional

Nombre de una colección en la base de datos especificada en options.dlq.db. Este campo es necesario si defines el campo options.dlq.

options.tier

string

Opcional

El nivel al que Atlas Stream Processing asigna el procesador. Si no declara esta opción, Atlas Stream Processing asigna el procesador al nivel del espacio de trabajo de Stream Processing. Debe ser uno de los siguientes:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

Para obtener más información, consulta Niveles.

sp.createStreamProcessor() crea un procesador de flujo persistente y nombrado en el espacio de trabajo actual de stream processing. Puedes inicializar este procesador de flujo con sp.processor.start(). Si intenta crear un procesador de flujo con el mismo nombre que un procesador de flujo existente, mongosh devolverá un error.

El usuario que ejecuta sp.createStreamProcessor() debe tener el rol atlasAdmin.

El siguiente ejemplo crea un procesador de flujo llamado solarDemo que ingiere datos de la conexión sample_stream_solar. El procesador excluye todos los documentos donde el valor del campo device_id es device_8, trasladando el resto a una ventana deslizante con una duración de 10segundos. Cada ventana agrupa los documentos recibidos y devuelve diversas estadísticas útiles de cada grupo. A continuación, el procesador de flujo combina estos registros a solar_db.solar_coll a través de la conexión mongodb1.

sp.createStreamProcessor(
'solarDemo',
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

Volver

db.colección.actualizarÍndiceDeBúsqueda

En esta página