Menu Docs
Página inicial do Docs
/ /

b.createStreamProcessor() (método mongosh)

sp.createStreamProcessor()

Novo na 7.0 versão: Cria um Processador de Stream no atual Stream Processing Workspace.

Esse método é suportado em Atlas Stream Processing Workspaces.

O método sp.createStreamProcessor() tem a seguinte sintaxe:

sp.createStreamProcessor(
<name>,
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() usa estes campos:

Campo
Tipo
necessidade
Descrição

name

string

Obrigatório

Nome lógico para o processador de stream. Isso deve ser exclusivo dentro do espaço de trabalho do processamento de fluxo.

pipeline

array

Obrigatório

Transmita o pipeline de agregação que você deseja aplicar aos seus dados de streaming.

options

objeto

Opcional

objeto que define várias configurações opcionais para o processador de fluxo.

Observação

Todos os campos abaixo são opções que você pode configurar.

dlq

objeto

Condicional

Objeto que atribui uma fila de mensagens não entregues (DLQ) para seu options espaço de trabalho de processamento de fluxo. Este campo é necessário se você definir o campo.

dlq.connectionName

string

Condicional

Etiqueta que identifica uma conexão em seu registro de conexão. Esta conexão deve fazer referência a um cluster Atlas. Este campo é necessário se você definir o campo dlq .

dlq.db

string

Condicional

Nome de um reconhecimento de data center Atlas no cluster especificado em dlq.connectionName. Este campo é necessário se você definir o campo dlq .

dlq.coll

string

Condicional

Nome de uma collection no reconhecimento de data center especificado no dlq.db. Este campo é necessário se você definir o campo dlq .

options.tier

string

Opcional

O nível para o qual o Atlas Stream Processing atribui o processador. Se você não declarar esta opção, o Atlas Stream Processing atribuirá o processador à camada do espaço de trabalho de processamento de fluxo. Deve ser um dos seguintes:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

Para saber mais,consulte Níveis.

sp.createStreamProcessor() cria um processador de fluxo persistente e nomeado no espaço de trabalho de processamento de fluxo atual. Você pode inicializar este processador de fluxo com. Se você tentar criar um processador de stream com o mesmo nome de um processador de stream sp.processor.start() existente, mongosh retornará um erro.

O usuário que executa sp.createStreamProcessor() deve ter a função atlasAdmin .

O exemplo a seguir cria um processador de fluxo denominado solarDemo que ingere dados da conexão sample_stream_solar . O processador exclui todos os documentos onde o valor do campo device_id é device_8, passando o restante para uma janela em cascata com uma duração 10segundos. Cada janela agrupa os documentos que recebe e, em seguida, retorna várias estatísticas úteis de cada grupo. Em seguida, o processador de fluxo mescla esses registros com solar_db.solar_coll pela conexão mongodb1 .

sp.createStreamProcessor(
'solarDemo',
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

Voltar

Atlas Stream Processing

Nesta página