Este tutorial orienta você pelas etapas de configuração do Atlas Stream Processing e execução do seu primeiro processador de stream.
Pré-requisitos
Para concluir este tutorial você precisa:
Um projeto do Atlas
mongoshversão 2.0 ou superiorUm Atlas user com a função
Project OwnerouProject Stream Processing Ownerpara gerenciar uma Instância de processamento de fluxo e um Registro de conexãoObservação
A função
Project Ownerpermite que você crie sistemas de banco de dados, gerencie o acesso ao projeto e as configurações do projeto, gerencie entradas da lista de acesso IP e muito mais.A função
Project Stream Processing Ownerpermite ações do Atlas Stream Processing, como visualizar, criar, excluir e editar instâncias de processamento de fluxo e visualizar, adicionar, modificar e excluir conexões no registro de conexões.Consulte Funções de projeto para saber mais sobre as diferenças entre as duas funções.
Um utilizador de banco de dados com a função
atlasAdminpara criar e executar processadores de streamUm cluster do Atlas
Procedimento
No Atlas, VáGo para a Stream Processing página do seu projeto.
AVISO: Melhorias de navegação em andamento
No momento, estamos lançando uma experiência de navegação nova e aprimorada. Se as etapas a seguir não corresponderem à sua visualização na UI do Atlas, consulte a documentação de visualização.
Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.
Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.
Na barra lateral, clique em Stream Processing sob o título Services.
A página Processamento de fluxo é exibida.
Crie uma Instância de Atlas Stream Processing .
Clique em Get Started no canto inferior direito. O Atlas fornece uma breve explicação dos principais componentes do Atlas Stream Processing.
Clique no botão Create instance.
Na página Create a stream processing instance , configure sua instância da seguinte maneira:
Tier:
SP30Provider:
AWSRegion:
us-east-1Instance Name:
tutorialInstance
Clique em Create.
Obtenha a Atlas Stream Processing connection da instância do string.
Localize o painel de visão geral da sua instância de Atlas Stream Processing e clique em Connect.
Selecione I have the MongoDB shell installed.
No menu suspenso Select your mongo shell version , selecione a versão mais recente do
mongosh.Copie a connection string fornecida em Run your connection string in your command line. Você precisará disso em uma etapa posterior.
Clique em Close.
Adicione uma conexão MongoDB Atlas ao registro de conexão.
Essa conexão serve como nosso coletor de dados de streaming.
No painel da instância do Atlas Stream Processing , clique em Configure.
Na aba Connection Registry , clique em + Add Connection no canto superior direito.
Clique Atlas Database. No campo Connection Name , insira
mongodb1. Na lista suspensa Atlas Cluster , selecione um Atlas cluster sem quaisquer dados armazenados nele.Clique em Add connection.
Verifique se sua fonte de dados de streaming emite mensagens.
Sua instância de processamento de fluxo vem pré-configurada com uma conexão com uma fonte de dados de amostra chamada sample_stream_solar. Essa fonte gera um fluxo de relatórios de vários dispositivos de energia solar. Cada relatório descreve a potência e a temperatura observadas de um único dispositivo solar em um ponto temporal específico, bem como a potência máxima desse dispositivo.
O documento a seguir é um exemplo representativo.
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 } }
Para verificar se essa fonte emite mensagens, crie um processador de fluxo de forma interativa.
Abra um aplicativo de terminal de sua escolha.
Conecte-se à sua instância do Atlas Stream Processing com
mongosh.Cole a
mongoshconnection string que você copiou em uma etapa anterior em seu terminal, onde<atlas-stream-processing-url>é a URL da sua Atlas Stream Processing instância de e<username>é um usuário com o papelatlasAdmin.mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> Digite sua senha quando solicitado.
Crie o processador de stream.
Copie o seguinte código no prompt
mongosh:sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) Verifique se os dados da conexão
sample_stream_solarsão exibidos no console e encerre o processo.Os processadores de stream que você cria com
sp.process()não persistem depois que você os encerra.
Crie um processador de fluxo persistente.
Usando um pipeline de agregação, você pode transformar cada documento à medida que ele é ingerido. O seguinte pipeline de agregação deriva a temperatura máxima e as potências média, mediana, máxima e mínima de cada dispositivo solar em intervalos de um segundo.
Configure um estágio
$source.A próxima etapa
$sourceingere dados da fontesample_stream_solar.let s = { source: { connectionName: "sample_stream_solar" } } Configure um estágio
$group.O estágio
$groupa seguir organiza todos os dados recebidos de acordo com seusgroup_id, acumula os valores dos camposobs.tempeobs.wattsde todos os documentos para cadagroup_ide, em seguida, obtém os dados desejados.let g = { group: { _id: "$group_id", max_temp: { $avg: "$obs.temp" }, avg_watts: { $min: "$obs.watts" }, median_watts: { $min: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } Configure um estágio
$tumblingWindow.Para realizar acumulações como
$groupem dados de streaming, o Atlas Stream Processing usa janelas para vincular o conjunto de dados. O estágio$tumblingWindowa seguir separa o fluxo em intervalos consecutivos de 10 segundos.Isso significa, por exemplo, que quando o estágio
$groupcalcula um valor paramedian_watts, ele utiliza os valoresobs.wattspara todos os documentos com um determinadogroup_idingerido nos 10 segundos anteriores.let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } Configurar um estágio $merge .
$mergepermite que você grave seus dados de streaming processados em um banco de dados Atlas.let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } Crie o processador de stream.
Atribua um nome ao seu novo processador de fluxo e declare seu pipeline de agregação listando cada estágio em ordem. O estágio
$grouppertence ao pipeline aninhado do$tumblingWindowe você não deve incluí-lo na definição do pipeline de processador.sp.createStreamProcessor("solarDemo", [s, t, m])
Isso cria um processador de fluxo chamado solarDemo que aplica a query definida anteriormente e grava os dados processados na coleção solarColl do banco de dados solarDb no cluster ao qual você se conectou. Ele retorna várias medições derivadas de intervalos de 10 segundos de observações de seus dispositivos solares.
Para saber mais sobre como o Atlas Stream Processing grava em bancos de dados at-rest, consulte $merge (Processamento de Stream).
Inicie o processador de fluxo.
Execute o seguinte comando em mongosh:
sp.solarDemo.start()
Verifique a saída do processador de fluxo.
Para verificar se o processador está ativo, execute o seguinte comando em mongosh:
sp.solarDemo.stats()
Este comando relata estatísticas operacionais do processador de fluxo do solarDemo .
Para verificar se o processador de fluxo está gravando dados no cluster do Atlas:
No Atlas, acesse a página Clusters do seu projeto.
AVISO: Melhorias de navegação em andamento
No momento, estamos implementando uma experiência de navegação nova e aprimorada. Se as etapas a seguir não corresponderem à sua visualização na IU do Atlas, consulte a documentação de pré-visualização.
Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.
Se ainda não estiver exibido, selecione o projeto desejado no menu Projects na barra de navegação.
Se ainda não estiver exibido, clique em Clusters na barra lateral.
A página Clusters é exibida.
Clique no botão Browse Collections para o seu cluster.
O Data Explorer é exibido.
Veja a coleção
MySolar.
Como alternativa, você pode exibir uma amostra de documentos processados no terminal usando mongosh:
sp.solarDemo.sample()
{ _id: 10, max_watts: 136, min_watts: 130, avg_watts: 133, median_watts: 130, max_temp: 7 }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.
Solte o processador de fluxo.
Execute o seguinte comando em mongosh:
sp.solarDemo.drop()
Para confirmar que você descartou avgWatts, liste todos os seus processadores de stream disponíveis:
sp.listStreamProcessors()
Próximos passos
Saiba como: