Menu Docs
Página inicial do Docs
/ /

b.createStreamProcessor() (método mongosh)

sp.createStreamProcessor()

Cria um processador de stream no espaço de trabalho de processamento de stream atual.

Você só pode invocar este comando enquanto estiver conectado a um espaço de trabalho de processamento de fluxo.

Este comando requer mongosh versão ≥ 2.0.

Esse método é suportado em Atlas Stream Processing Workspaces.

O método sp.createStreamProcessor() tem a seguinte sintaxe:

sp.createStreamProcessor(
<name>,
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() usa estes campos:

Campo
Tipo
necessidade
Descrição

name

string

Obrigatório

Nome lógico para o processador de stream. Isso deve ser exclusivo dentro do espaço de trabalho do processamento de fluxo.

pipeline

array

Obrigatório

Transmita o pipeline de agregação que você deseja aplicar aos seus dados de streaming.

options

objeto

Opcional

objeto que define várias configurações opcionais para o processador de fluxo.

options.dlq

objeto

Condicional

Objeto que atribui uma fila de mensagens não entregues (DLQ) para seu options espaço de trabalho de processamento de fluxo. Este campo é necessário se você definir o campo.

options.dlq.connectionName

string

Condicional

Etiqueta que identifica uma conexão em seu registro de conexão. Esta conexão deve fazer referência a um cluster Atlas. Este campo é necessário se você definir o campo options.dlq .

options.dlq.db

string

Condicional

Nome de um reconhecimento de data center Atlas no cluster especificado em options.dlq.connectionName. Este campo é necessário se você definir o campo options.dlq .

options.dlq.coll

string

Condicional

Nome de uma collection no reconhecimento de data center especificado no options.dlq.db. Este campo é necessário se você definir o campo options.dlq .

options.tier

string

Opcional

O nível para o qual o Atlas Stream Processing atribui o processador. Se você não declarar esta opção, o Atlas Stream Processing atribuirá o processador à camada do espaço de trabalho de Stream Processing. Deve ser um dos seguintes:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

Para saber mais,consulte Níveis.

sp.createStreamProcessor() cria um processador de fluxo persistente e nomeado no espaço de trabalho de processamento de fluxo atual. Você pode inicializar este processador de fluxo com. Se você tentar criar um processador de stream com o mesmo nome de um processador de stream sp.processor.start() existente, mongosh retornará um erro.

O usuário que executa sp.createStreamProcessor() deve ter a função atlasAdmin .

O exemplo a seguir cria um processador de fluxo denominado solarDemo que ingere dados da conexão sample_stream_solar . O processador exclui todos os documentos onde o valor do campo device_id é device_8, passando o restante para uma janela em cascata com uma duração 10segundos. Cada janela agrupa os documentos que recebe e, em seguida, retorna várias estatísticas úteis de cada grupo. Em seguida, o processador de fluxo mescla esses registros com solar_db.solar_coll pela conexão mongodb1 .

sp.createStreamProcessor(
'solarDemo',
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

Voltar

Atlas Stream Processing

Nesta página