Este tutorial orienta você pelas etapas de configuração do Atlas Stream Processing e execução do seu primeiro processador de stream.
Pré-requisitos
Para concluir este tutorial você precisa:
Um projeto do Atlas com um cluster vazio. Esse cluster serve como coletor de dados para seu processador de stream.
Um usuário de banco de dados com o papel
atlasAdminpara criar e executar processadores de fluxoUm usuário do Atlas com a
Project Ownerfunção ouProject Stream Processing Ownerpara gerenciar um espaço de trabalho de processamento de fluxo e um registro de conexãoObservação
A função
Project Ownerpermite que você crie sistemas de banco de dados, gerencie o acesso ao projeto e as configurações do projeto, gerencie entradas da lista de acesso IP e muito mais.A função habilita ações do Atlas Stream Processing, como visualizar, criar, excluir e editar espaços de trabalho de processamento de fluxo e visualizar, adicionar, modificar e excluir conexões no registro de
Project Stream Processing Ownerconexões.Consulte Funções de projeto para saber mais sobre as diferenças entre as duas funções.
Procedimento
Este tutorial ajuda você a criar um espaço de trabalho de processamento de fluxo, conectá-lo a um Atlas cluster existente e configurar um processador de fluxo para ingestão de dados de amostra de dispositivos de streaming solar e gravação dos dados em seu cluster conectado.
Crie um espaço de trabalho de processamento de fluxo.
No Atlas, Go para a página Stream Processing do seu projeto.
Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.
Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.
Na barra lateral, clique em Stream Processing sob o título Streaming Data.
A página Processamento de stream é exibida.
Clique em Create a workspace.
Na página Create a stream processing workspace, configure seu espaço de trabalho da seguinte maneira:
Tier:
SP30Provider:
AWSRegion:
us-east-1Workspace Name:
tutorialWorkspace
Clique em Create.
Adicione uma conexão de coletor ao registro de conexão.
Adicione uma conexão com um Atlas cluster vazio existente ao registro de conexão. Seu processador de stream usará esta conexão como coletor de dados de streaming.
No painel do seu espaço de trabalho de processamento de fluxo, clique em Configure.
Na aba Connection Registry , clique em + Add Connection no canto superior direito.
Na lista suspensa Connection Type, clique em Atlas Database.
No campo Connection Name, digite
mongodb1.Na lista suspensa Atlas Cluster, selecione um Atlas cluster sem nenhum dado armazenado nele.
Na lista suspensa Execute as, selecione Read and write to any database.
Clique em Add connection.
Verifique se sua fonte de dados de streaming emite mensagens.
Seu espaço de trabalho de processamento de fluxo vem pré-configurado com uma conexão com uma fonte de dados de exemplo chamada sample_stream_solar. Esta fonte gera um fluxo de relatórios de vários dispositivos de energia solar. Cada relatório descreve a potência e a temperatura observadas de um único dispositivo solar em um ponto específico no tempo, bem como a potência máxima desse dispositivo.
O seguinte documento representa um relatório desta fonte de dados:
{   device_id: 'device_8',   group_id: 7,   timestamp: '2024-08-12T21:41:01.788+00:00',   max_watts: 450,   event_type: 0,   obs: {     watts: 252,     temp: 17   } } 
Para verificar se esta fonte emite mensagens, crie um processador de stream interativamente usando mongosh:
Conecte-se ao seu espaço de trabalho de processamento de fluxo.
Use a string de conexão associada ao seu espaço de trabalho de processamento de fluxo para se conectar
mongoshusando.No painel do seu espaço de trabalho de processamento de fluxo, clique em Connect.
Na caixa de diálogo de conexão do espaço de trabalho, clique em Choose a connection method, então selecione a guia Shell.
Copie a string de conexão exibida na caixa de diálogo. Ele tem o seguinte formato, onde
<atlas-stream-processing-url>é a URL do seu espaço de trabalho de processamento de fluxo e<username>é o nome de usuário de um usuário de banco de dados com oatlasAdminpapel:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Cole a string de conexão no seu terminal e substitua o espaço reservado
<password>pelas credenciais do usuário.Pressione Enter para executá-lo e conecte-se ao espaço de trabalho de processamento de fluxo.
No prompt
mongosh, use o métodosp.process()para criar o processador de stream interativamente.sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) Verifique se os dados da conexão
sample_stream_solarsão exibidos no console e encerre o processo.Os processadores de stream que você cria com
sp.process()não persistem depois que você os encerra.
Crie um processador de fluxo persistente.
Um processador de fluxo persistente ingere, processa e escreve continuamente dados de streaming em um coletor de dados especificado até que você solte o processador. O processador de fluxo a seguir é um pipeline de agregação que deriva a temperatura máxima e as potências média, máxima e mínima de cada dispositivo solar em intervalos de 10segundos e, em seguida, grava os resultados em seu cluster vazio conectado.
Selecione uma das seguintes guias para criar um processador de fluxo utilizando a UI do Atlas ou mongosh:
Para criar um processador de fluxo na interface do usuário do Atlas , vá para a página Stream Processing do seu projeto Atlas e clique em Configure no painel do seu espaço de trabalho de processamento de fluxo. Em seguida, escolha entre utilizar o construtor visual ou o editor JSON para configurar um processador de fluxo denominado solarDemo:
Clique em Create with visual builder.
O Construtor Visual abre com um formulário onde você pode configurar seu processador de fluxo.
No campo Stream processor name, digite
solarDemo.No campo Source, selecione
sample_stream_solarna lista suspensa Connection.Isso adiciona o seguinte estágio do
$sourceao seu pipeline de agregação :{ "$source": { "connectionName": "sample_stream_solar" } } Configure um estágio
$tumblingWindow.Start building your pipeline No painel, clique + Custom stage em e copie e cole o seguinte JSON na caixa de texto que aparece. Isso define um estágio
$tumblingWindowcom um estágio$groupaninhado que deriva a temperatura máxima e as potências máximas, mínimas e médias de cada dispositivo solar em intervalos de 10segundos.Isso significa, por exemplo, que quando o estágio
$groupcalcula um valor paramax_watts, ele extrai o valor máximo dos valoresobs.wattspara todos os documentos com um determinadogroup_idingerido nos 10 segundos anteriores.{ "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } }] } } No campo Sink, selecione
mongodb1na lista suspensa Connection.Na caixa de texto exibida, copie e cole o seguinte JSON. Isso configura um estágio
$mergeque grava os dados de streaming processados em uma coleção chamadasolarCollno banco de dadossolarDbdo seu Atlas cluster conectado:{ "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } Clique em Create stream processor.
O processador de stream é criado e listado na aba Stream Processors da página Stream Processing.
Clique em Use JSON editor.
O editor JSON abre com uma caixa de texto onde você pode configurar seu processador de fluxo no formato JSON.
Defina o processador de fluxo.
Copie e cole a seguinte definição JSON na caixa de texto do editor JSON para definir um processador de fluxo denominado
solarDemo. Esse processador de fluxo usa um estágio$tumblingWindowcom um estágio$groupaninhado para derivar a temperatura máxima e as potências máximas, mínimas e médias de cada dispositivo solar em intervalos de 10segundos e, em seguida, grava os resultados em uma coleção chamadasolarCollno banco de dadossolarDbdo seu Atlas cluster conectado.Isso significa, por exemplo, que quando o estágio
$groupcalcula um valor paramax_watts, ele extrai o valor máximo dos valoresobs.wattspara todos os documentos com um determinadogroup_idingerido nos 10 segundos anteriores.{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_temp": { "$max": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" }, "avg_watts": { "$avg": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] } [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "avg_watts": { "$avg": "$obs.watts" }, "max_temp": { "$avg": "$obs.temp" }, "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "coll": "solarColl", "connectionName": "mongodb1", "db": "solarDb" } } } ] 
Execute os seguintes comandos em mongosh para criar um processador de fluxo persistente denominado solarDemo:
Conecte-se ao seu espaço de trabalho de processamento de fluxo.
Use a string de conexão associada ao seu espaço de trabalho de processamento de fluxo para se conectar
mongoshusando.No painel do seu espaço de trabalho de processamento de fluxo, clique em Connect.
Na caixa de diálogo Connect to your workspace, selecione a aba Shell.
Copie a string de conexão exibida na caixa de diálogo. Ele tem o seguinte formato, onde
<atlas-stream-processing-url>é a URL do seu espaço de trabalho de processamento de fluxo e<username>é o nome de usuário de um usuário de banco de dados com oatlasAdminpapel:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Cole a string de conexão no seu terminal e substitua o espaço reservado
<password>pelas credenciais do usuário.Pressione Enter para executá-lo e conecte-se ao espaço de trabalho de processamento de fluxo.
Configure um estágio
$source.Defina uma variável para um estágio
$sourceque ingere dados da origemsample_stream_solar.let s = { source: { connectionName: "sample_stream_solar" } } Configure um estágio
$group.Defina uma variável para um estágio
$groupque deriva a temperatura máxima e as potências média, máxima e mínima de cada dispositivo solar de acordo com seugroup_id.let g = { group: { _id: "$group_id", max_temp: { $max: "$obs.temp" }, avg_watts: { $avg: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } Configure um estágio
$tumblingWindow.Para realizar acumulações como
$groupem dados de streaming, o Atlas Stream Processing usa o Windows para vincular o conjunto de dados. Defina uma variável para um estágio$tumblingWindowque separa o fluxo em intervalos consecutivos de 10segundos.Isso significa, por exemplo, que quando o estágio
$groupcalcula um valor paramax_watts, ele extrai o valor máximo dos valoresobs.wattspara todos os documentos com um determinadogroup_idingerido nos 10 segundos anteriores.let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } Configurar um estágio $merge .
Defina uma variável para um estágio do
$mergeque escreve os dados de streaming processados em uma collection chamadasolarCollno banco de dados dosolarDbdo seu Atlas cluster conectado.let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } Crie o processador de stream.
Use o método
sp.createStreamProcessor()para atribuir um nome ao seu novo processador de fluxo e declarar seu pipeline de agregação . O estágio$grouppertence ao pipeline aninhado do$tumblingWindow, e você não deve incluí-lo na definição de pipeline do processador.sp.createStreamProcessor("solarDemo", [s, t, m]) Isso cria um processador de fluxo chamado
solarDemoque aplica a query definida anteriormente e grava os dados processados na coleçãosolarColldo banco de dadossolarDbno cluster ao qual você se conectou. Ele retorna várias medições derivadas de intervalos de 10 segundos de observações de seus dispositivos solares.Para saber mais sobre como o Atlas Stream Processing grava em bancos de dados at-rest, consulte
$merge(Processamento de Stream).
Inicie o processador de fluxo.
Na lista de processadores de stream para a sua área de trabalho de processamento de stream, clique no ícone Start para o seu processador de stream.
Use o método sp.processor.start() em mongosh:
sp.solarDemo.start() 
Verifique a saída do processador de fluxo.
Para verificar se o processador de fluxo está gravando dados no cluster do Atlas:
No Atlas, acesse a página Clusters do seu projeto.
Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.
Se ainda não estiver exibido, selecione o projeto desejado no menu Projects na barra de navegação.
Na barra lateral, clique em Clusters sob o título Database.
A página Clusters é exibida.
No Atlas, acesse a página Data Explorer do seu projeto.
Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.
Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.
Na barra lateral, clique em Data Explorer sob o título Database.
O Data Explorer é exibido.
Observação
Você também pode ir para a página Clusters e clicar em Data Explorer sob o título Shortcuts.
Veja a coleção
MySolar.
Para verificar se o processador está ativo, use o método sp.processor.stats() em mongosh:
sp.solarDemo.stats() 
Este método relata estatísticas operacionais do processador de fluxo solarDemo.
Você também pode usar o método sp.processor.sample() em mongosh para retornar uma amostragem de documentos processados no terminal.
sp.solarDemo.sample() 
{   _id: 10,   max_temp: 16,   avg_watts: 232,   max_watts: 414,   min_watts: 73 } 
Observação
A saída anterior é um exemplo representativo . Os dados de streaming não são estáticos e cada usuário vê documentos distintos.
Solte o processador de fluxo.
Na lista de processadores de stream para a sua área de trabalho de processamento de stream, clique no Delete ícone ( ) do seu processador de stream.
Na caixa de diálogo de confirmação exibida, digite o nome do processador de stream (solarDemo) para confirmar que você deseja excluí-lo e clique em Delete.
Use o método sp.processor.drop() em mongosh para soltar solarDemo:
sp.solarDemo.drop() 
Para confirmar que você descartou solarDemo, use o método sp.listStreamProcessors() para listar todos os seus processadores de stream disponíveis:
sp.listStreamProcessors() 
Próximos passos
Saiba como: