Menu Docs
Página inicial do Docs
/ /

start.sp.processor.modify() (método mongosh)

sp.processor.modify()

Modifica um processador de stream nomeado no espaço de trabalho de processamento de stream atual.

Esse método é suportado em Atlas Stream Processing Workspaces.

O método sp.processor.modify() tem a seguinte sintaxe:

sp.processor.modify({
pipeline: [
<pipeline>
],
name: <name>,
dlq: {
connectionName: <connectionName>,
db: <db>,
coll: <coll>
},
resumeFromCheckpoint: <resumeFromCheckpoint>,
tier: <tier>
})

sp.processor.modify() usa os seguintes campos:

Campo
Tipo
necessidade
Descrição

pipeline

array

Opcional

Array de estágios de agregação para aplicar aos seus dados de streaming onde o último estágio deve ser um estágio de coletor. Para saber mais, consulte Agregação de processamento de fluxo.

name

string

Opcional

Novo nome para o processador de stream.

dlq

objeto

Opcional

Objeto que define uma fila de mensagens nãoentregues {} (DLQ) () para seu processador de stream. Para remover uma fila de mensagens não entregues (DLQ)(DLQ) existente, passe um objeto vazio ().

dlq.connectionName

string

Condicional

Etiqueta que identifica uma conexão em seu registro de conexão. Esta conexão deve fazer referência a um cluster MongoDB Atlas . Obrigatório ao definir uma fila de mensagens não entregues (DLQ).

dlq.db

string

Condicional

Nome de um banco de dados MongoDB Atlas no cluster especificado no dlq.connectionName. Obrigatório ao definir uma fila de mensagens não entregues (DLQ).

dlq.coll

string

Condicional

Nome de uma collection no banco de dados especificado no dlq.db. Obrigatório ao definir uma fila de mensagens não entregues (DLQ).

resumeFromCheckpoint

booleano

Opcional

Sinalizador que especifica se o processador de fluxo modificado retoma de seu último checkpoint. Por padrão, este campo é true. Quando definido para false, o processador retém apenas estatísticas resumidas.

tier

string

Opcional

A camada a ser atribuída ao processador de fluxo. Se você não declarar esta opção, o processador manterá seu nível atual. Deve ser um dos seguintes valores:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

Para saber mais, consulte Níveis.

O processador de fluxo deve estar em um estado STOPPED antes de invocar este método. O argumento pipeline substitui todo o pipeline existente do processador, incluindo os estágios que você não altera.

Por padrão, o processador modificado retoma a partir do último checkpoint. Se você definir resumeFromCheckpoint como false, o processador modificado reterá apenas estatísticas resumidas. Quando você modifica um processador com janelas abertas, o Atlas Stream Processing recalcula essas janelas no pipeline atualizado.

Para limitações que se aplicam quando você modifica processadores de fluxo, consulte Modificar um processador de fluxo.

Para sp.processor.modify() executar, você deve ter a atlasAdmin função.

O exemplo altera um processador de fluxo interrompido chamado solarDemo para adicionar um estágio $match, renomeá-lo, atualizar seu nível e configurar uma fila de mensagens não entregues (DLQ):

sp.solarDemo.modify({
pipeline: [
{ $source: { connectionName: "sample_stream_solar" }},
{ $match: { device_id: "device_0" }},
{ $merge: { into: {
connectionName: "cluster0",
db: "testout",
coll: "testout2"
}}}
],
name: "solarDemoRenamed",
dlq: {
connectionName: "cluster0",
db: "testout",
coll: "dlq"
},
resumeFromCheckpoint: true,
tier: "SP10"
})
{ ok: 1 }

Inicie o processador renomeado e execute sp.listStreamProcessors() para verificar as alterações de nome, nível e fila de mensagens não entregues (DLQ) (DLQ):

sp.solarDemoRenamed.start()
sp.listStreamProcessors()
[
{
id: '6a39b08e6d9040e1cef8e31f',
name: 'solarDemoRenamed',
lastModified: ISODate('2026-06-22T22:00:46.858Z'),
state: 'STARTED',
tier: 'SP10',
errorMsg: '',
workers: [ 'worker-5f4c5bbc9d-7hg2q' ],
pipeline: [
{ '$source': { connectionName: 'sample_stream_solar' } },
{ '$match': { device_id: 'device_0' } },
{
'$merge': {
into: {
connectionName: 'cluster0',
db: 'testout',
coll: 'testout2'
}
}
}
],
lastStateChange: ISODate('2026-06-22T22:01:16.835Z'),
dlq: {
connectionName: 'cluster0',
db: 'testout',
coll: 'dlq'
}
}
]

Execute o para verificar a alteração do sp.processor.sample() pipeline:

sp.solarDemoRenamed.sample()
{
device_id: 'device_0',
group_id: 9,
timestamp: '2026-06-22T22:01:25.828+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 122,
temp: 18
}
}
{
device_id: 'device_0',
group_id: 3,
timestamp: '2026-06-22T22:01:26.828+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 377,
temp: 7
}
}

Voltar

start.processor.drop

Nesta página