Docs Menu
Docs Home
/ /

sp.processor.modify() (método mongosh)

sp.processor.modify()

Modifica un elemento con nombre Stream Processor en el Stream Processing Workspaceactual.

Este método es compatible en espacios de trabajo de Atlas Stream Processing.

El método sp.processor.modify() tiene la siguiente sintaxis:

sp.processor.modify({
pipeline: [
<pipeline>
],
name: <name>,
dlq: {
connectionName: <connectionName>,
db: <db>,
coll: <coll>
},
resumeFromCheckpoint: <resumeFromCheckpoint>,
tier: <tier>
})

sp.processor.modify() requiere los siguientes campos:

Campo
Tipo
Necesidad
Descripción

pipeline

arreglo

Opcional

Conjunto de etapas de agregación que se aplican a los datos en tiempo real, donde la última etapa debe ser una etapa de destino. Para obtener más información, consulte Agregación de procesamiento de flujos.

name

string

Opcional

Nuevo nombre para el procesador de streams.

dlq

Objeto

Opcional

Objeto que establece un Cola de mensajes no entregados para su procesador de flujo. Para eliminar una cola de mensajes no entregados existente, pase un objeto vacío{} ().

dlq.connectionName

string

Condicional

Etiqueta que identifica una conexión en su registro de conexiones. Esta conexión debe hacer referencia a un clúster de Atlas. Es necesaria al configurar una cola de mensajes no entregados.

dlq.db

string

Condicional

Nombre de una base de datos Atlas en el clúster especificado en dlq.connectionName. Obligatorio al configurar una cola de mensajes no entregados.

dlq.coll

string

Condicional

Nombre de una colección en la base de datos especificada en dlq.db. Obligatorio al configurar una cola de mensajes no entregados.

resumeFromCheckpoint

booleano

Opcional

Indicador que especifica si el procesador de flujo modificado reanuda la ejecución desde su último punto de control. Por defecto, este campo es true. Cuando se establece en false, el procesador conserva únicamente las estadísticas resumidas.

tier

string

Opcional

El nivel que se asignará al procesador de flujo. Si no se declara esta opción, el procesador conserva su nivel actual. Debe ser uno de los siguientes valores:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

Para obtener más información, consulta Niveles.

El procesador de flujo debe estar en estado STOPPED antes de invocar este método. El argumento pipeline reemplaza toda la canalización existente del procesador, incluidas las etapas que no modifique.

Por defecto, el procesador modificado reanuda la ejecución desde su último punto de control. Si se establece resumeFromCheckpoint en false, el procesador modificado conserva únicamente las estadísticas de resumen. Al modificar un procesador con ventanas abiertas, Atlas Stream Processing vuelve a calcular dichas ventanas en la canalización actualizada.

Para conocer las limitaciones que se aplican al modificar los procesadores de flujo, consulte Modificar un procesador de flujo.

Para sp.processor.modify() ejecutar, debes tener el atlasAdmin rol.

El ejemplo modifica un procesador de flujo detenido llamado solarDemo para agregar una etapa $match, renombrarlo, actualizar su nivel y configurar una cola de mensajes no entregados:

sp.solarDemo.modify({
pipeline: [
{ $source: { connectionName: "sample_stream_solar" }},
{ $match: { device_id: "device_0" }},
{ $merge: { into: {
connectionName: "cluster0",
db: "testout",
coll: "testout2"
}}}
],
name: "solarDemoRenamed",
dlq: {
connectionName: "cluster0",
db: "testout",
coll: "dlq"
},
resumeFromCheckpoint: true,
tier: "SP10"
})
{ ok: 1 }

Inicie el procesador renombrado y, a continuación, ejecute sp.listStreamProcessors() para verificar los cambios de nombre, nivel y cola de mensajes fallidos:

sp.solarDemoRenamed.start()
sp.listStreamProcessors()
[
{
id: '6a39b08e6d9040e1cef8e31f',
name: 'solarDemoRenamed',
lastModified: ISODate('2026-06-22T22:00:46.858Z'),
state: 'STARTED',
tier: 'SP10',
errorMsg: '',
workers: [ 'worker-5f4c5bbc9d-7hg2q' ],
pipeline: [
{ '$source': { connectionName: 'sample_stream_solar' } },
{ '$match': { device_id: 'device_0' } },
{
'$merge': {
into: {
connectionName: 'cluster0',
db: 'testout',
coll: 'testout2'
}
}
}
],
lastStateChange: ISODate('2026-06-22T22:01:16.835Z'),
dlq: {
connectionName: 'cluster0',
db: 'testout',
coll: 'dlq'
}
}
]

Ejecute para verificar el cambio en la sp.processor.sample() canalización:

sp.solarDemoRenamed.sample()
{
device_id: 'device_0',
group_id: 9,
timestamp: '2026-06-22T22:01:25.828+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 122,
temp: 18
}
}
{
device_id: 'device_0',
group_id: 3,
timestamp: '2026-06-22T22:01:26.828+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 377,
temp: 7
}
}

Volver

sp.processor.descartar

En esta página