MongoDB.local SF, Jan 15: See the speaker lineup & ship your AI vision faster. Use WEB50 to save 50%
Find out more >
Menu Docs
Página inicial do Docs
/ /

b.createStreamProcessor() (método mongosh)

sp.createStreamProcessor()

Novidades na versão 7.0: Cria um Processador de Stream no Espaço de Trabalho de Processamento de Streamatual.

Esse método é suportado em Atlas Stream Processing Workspaces.

O método sp.createStreamProcessor() tem a seguinte sintaxe:

sp.createStreamProcessor(
<name>,
[
<pipeline>
],
{
<options>
}
)

sp.createStreamProcessor() usa estes campos:

Campo
Tipo
necessidade
Descrição

name

string

Obrigatório

Nome lógico para o processador de stream. Isso deve ser exclusivo dentro do espaço de trabalho do stream processing.

pipeline

array

Obrigatório

Transmita o pipeline de agregação que você deseja aplicar aos seus dados de streaming.

options

objeto

Opcional

objeto que define várias configurações opcionais para o processador de fluxo.

Observação

Todos os campos abaixo são opções que você pode configurar.

dlq

objeto

Condicional

Objeto que atribui uma fila de mensagens não entregues (DLQ) para seu espaço de trabalho de stream processing. Este campo é necessário se você definir o campo options.

dlq.connectionName

string

Condicional

Etiqueta que identifica uma conexão em seu registro de conexão. Esta conexão deve fazer referência a um cluster Atlas. Este campo é necessário se você definir o campo dlq .

dlq.db

string

Condicional

Nome de um reconhecimento de data center Atlas no cluster especificado em dlq.connectionName. Este campo é necessário se você definir o campo dlq .

dlq.coll

string

Condicional

Nome de uma collection no reconhecimento de data center especificado no dlq.db. Este campo é necessário se você definir o campo dlq .

options.tier

string

Opcional

O nível para o qual o Atlas Stream Processing atribui o processador. Se você não declarar esta opção, o Atlas Stream Processing atribuirá o processador ao nível do espaço de trabalho de stream processing. Deve ser um dos seguintes:

  • SP2

  • SP5

  • SP10

  • SP30

  • SP50

Para saber mais, consulte Níveis.

sp.createStreamProcessor() cria um processador stream processing persistente e nomeado no espaço de trabalho stream processing atual. Você pode inicializar este processador de fluxo com sp.processor.start(). Se você tentar criar um processador de stream com o mesmo nome de um processador de stream existente, mongosh retornará um erro.

O usuário que executa o sp.createStreamProcessor() deve ter a função atlasAdmin.

O exemplo a seguir cria um processador de fluxo denominado solarDemo que ingere dados da conexão sample_stream_solar . O processador exclui todos os documentos onde o valor do campo device_id é device_8, passando o restante para uma janela em cascata com uma duração 10segundos. Cada janela agrupa os documentos que recebe e, em seguida, retorna várias estatísticas úteis de cada grupo. Em seguida, o processador de fluxo mescla esses registros com solar_db.solar_coll pela conexão mongodb1 .

sp.createStreamProcessor(
'solarDemo',
[
{
$source: {
connectionName: 'sample_stream_solar',
timeField: {
$dateFromString: {
dateString: '$timestamp'
}
}
}
},
{
$match: {
$expr: {
$ne: [
"$device_id",
"device_8"
]
}
}
},
{
$tumblingWindow: {
interval: {
size: Int32(10),
unit: "second"
},
"pipeline": [
{
$group: {
"_id": { "device_id": "$device_id" },
"max_temp": { $max: "$obs.temp" },
"max_watts": { $max: "$obs.watts" },
"min_watts": { $min: "$obs.watts" },
"avg_watts": { $avg: "$obs.watts" },
"median_watts": {
$median: {
input: "$obs.watts",
method: "approximate"
}
}
}
}
]
}
},
{
$merge: {
into: {
connectionName: "mongodb1",
db: "solar_db",
coll: "solar_coll"
},
on: ["_id"]
}
}
]
)

Voltar

Atlas Stream Processing

Nesta página