Menu Docs
Página inicial do Docs
/
Atlas
/

Introdução ao Atlas Stream Processing

Este tutorial orienta você pelas etapas de configuração do Atlas Stream Processing e execução do seu primeiro processador de stream.

Para concluir este tutorial você precisa:

  • Um projeto do Atlas com um cluster vazio. Esse cluster serve como coletor de dados para seu processador de stream.

  • Um usuário de banco de dados com o papel atlasAdmin para criar e executar processadores de fluxo

  • mongosh versão 2.0 ou superior

  • Um Atlas user com a função Project Owner ou Project Stream Processing Owner para gerenciar uma Instância do Stream Processing e um registro de conexão

    Observação

    A função Project Owner permite que você crie sistemas de banco de dados, gerencie o acesso ao projeto e as configurações do projeto, gerencie entradas da lista de acesso IP e muito mais.

    A função Project Stream Processing Owner permite ações do Atlas Stream Processing, como visualizar, criar, excluir e editar instâncias de processamento de fluxo e visualizar, adicionar, modificar e excluir conexões no registro de conexões.

    Consulte Funções de projeto para saber mais sobre as diferenças entre as duas funções.

Este tutorial mostra como criar uma instância de processamento de fluxo, conectá-la a um Atlas cluster existente e configurar um processador de fluxo para ingestão de dados de amostra de dispositivos de streaming solar e gravação dos dados em seu cluster conectado.

1
  1. No Atlas, Go para a página Stream Processing do seu projeto.

    AVISO: Melhorias de navegação em andamento

    No momento, estamos implementando uma experiência de navegação nova e aprimorada. Se as etapas a seguir não corresponderem à sua visualização na IU do Atlas, consulte a documentação de pré-visualização.

    1. Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.

    2. Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.

    3. Na barra lateral, clique em Stream Processing sob o título Services.

      A página Processamento de fluxo é exibida.

  2. Clique em Create a workspace.

  3. Na página Create a stream processing instance , configure sua instância da seguinte maneira:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. Clique em Create.

2

Adicione uma conexão com um Atlas cluster vazio existente ao registro de conexão. Seu processador de stream usará esta conexão como coletor de dados de streaming.

  1. No painel da instância do Atlas Stream Processing , clique em Configure.

  2. Na aba Connection Registry , clique em + Add Connection no canto superior direito.

  3. Na lista suspensa Connection Type, clique em Atlas Database.

  4. No campo Connection Name, digite mongodb1.

  5. Na lista suspensa Atlas Cluster, selecione um Atlas cluster sem nenhum dado armazenado nele.

  6. Na lista suspensa Execute as, selecione Read and write to any database.

  7. Clique em Add connection.

3

Sua instância de processamento de fluxo vem pré-configurada com uma conexão com uma fonte de dados de amostra chamada sample_stream_solar. Essa fonte gera um fluxo de relatórios de vários dispositivos de energia solar. Cada relatório descreve a potência e a temperatura observadas de um único dispositivo solar em um ponto temporal específico, bem como a potência máxima desse dispositivo.

O seguinte documento representa um relatório desta fonte de dados:

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
}
}

Para verificar se esta fonte emite mensagens, crie um processador de stream interativamente usando mongosh:

  1. Conecte-se à sua instância de processamento de fluxo.

    Use a connection string associada à sua instância de Atlas Stream Processing para se conectar usando mongosh.

    1. No painel da instância do Atlas Stream Processing , clique em Connect.

    2. Na caixa de diálogo Connect to your instance, selecione a aba Shell.

    3. Copie a string de conexão exibida na caixa de diálogo. Ele tem o seguinte formato, em que <atlas-stream-processing-url> é a URL da sua instância de processamento de fluxo e <username> é o nome de usuário de um usuário de banco de dados com a função atlasAdmin:

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. Cole a string de conexão no seu terminal e substitua o espaço reservado <password> pelas credenciais do usuário.

      Pressione Enter para executá-lo e conecte-se à instância de processamento de fluxo.

  2. No prompt mongosh, use o método sp.process() para criar o processador de stream interativamente.

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    Verifique se os dados da conexão sample_stream_solar são exibidos no console e encerre o processo.

    Os processadores de stream que você cria com sp.process() não persistem depois que você os encerra.

4

Um processador de fluxo persistente ingere, processa e escreve continuamente dados de streaming em um coletor de dados especificado até que você solte o processador. O processador de fluxo a seguir é um pipeline de agregação que deriva a temperatura máxima e as potências média, máxima e mínima de cada dispositivo solar em intervalos de 10segundos e, em seguida, grava os resultados em seu cluster vazio conectado.

Selecione uma das seguintes guias para criar um processador de fluxo utilizando a UI do Atlas ou mongosh:

Para criar um processador de fluxo na interface do usuário do Atlas , vá para a página Stream Processing do seu projeto Atlas e clique em Configure no painel da sua instância de processamento de fluxo. Em seguida, escolha entre utilizar o construtor visual ou o editor JSON para configurar um processador de fluxo denominado solarDemo:

  1. Clique em Create with visual builder.

    O Construtor Visual abre com um formulário onde você pode configurar seu processador de fluxo.

  2. No campo Stream processor name, digite solarDemo.

  3. No campo Source, selecione sample_stream_solar na lista suspensa Connection.

    Isso adiciona o seguinte estágio do $source ao seu pipeline de agregação :

    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    }
  4. Configure um estágio $tumblingWindow.

    Start building your pipeline No painel, clique + Custom stage em e copie e cole o seguinte JSON na caixa de texto que aparece. Isso define um estágio $tumblingWindow com um estágio $group aninhado que deriva a temperatura máxima e as potências máximas, mínimas e médias de cada dispositivo solar em intervalos de 10segundos.

    Isso significa, por exemplo, que quando o estágio $group calcula um valor para max_watts, ele extrai o valor máximo dos valores obs.watts para todos os documentos com um determinado group_id ingerido nos 10 segundos anteriores.

    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [ {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }]
    }
    }
  5. No campo Sink, selecione mongodb1 na lista suspensa Connection.

    Na caixa de texto exibida, copie e cole o seguinte JSON. Isso configura um estágio$merge que grava os dados de streaming processados em uma coleção chamada solarColl no banco de dados solarDb do seu Atlas cluster conectado:

    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
  6. Clique em Create stream processor.

    O processador de stream é criado e listado na aba Stream Processors da página Stream Processing.

  1. Clique em Use JSON editor.

    O editor JSON abre com uma caixa de texto onde você pode configurar seu processador de fluxo no formato JSON.

  2. Defina o processador de fluxo.

    Copie e cole a seguinte definição JSON na caixa de texto do editor JSON para definir um processador de fluxo denominado solarDemo. Esse processador de fluxo usa um estágio $tumblingWindow com um estágio $group aninhado para derivar a temperatura máxima e as potências máximas, mínimas e médias de cada dispositivo solar em intervalos de 10segundos e, em seguida, grava os resultados em uma coleção chamada solarColl no banco de dados solarDb do seu Atlas cluster conectado.

    Isso significa, por exemplo, que quando o estágio $group calcula um valor para max_watts, ele extrai o valor máximo dos valores obs.watts para todos os documentos com um determinado group_id ingerido nos 10 segundos anteriores.

    {
    "name": "solarDemo",
    "pipeline": [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
    ]
    }
    [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "avg_watts": {
    "$avg": "$obs.watts"
    },
    "max_temp": {
    "$avg": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "coll": "solarColl",
    "connectionName": "mongodb1",
    "db": "solarDb"
    }
    }
    }
    ]

Execute os seguintes comandos em mongosh para criar um processador de fluxo persistente denominado solarDemo:

  1. Conecte-se à sua instância de processamento de fluxo.

    Use a connection string associada à sua instância de Atlas Stream Processing para se conectar usando mongosh.

    1. No painel da instância do Atlas Stream Processing , clique em Connect.

    2. Na caixa de diálogo Connect to your instance, selecione a aba Shell.

    3. Copie a string de conexão exibida na caixa de diálogo. Ele tem o seguinte formato, em que <atlas-stream-processing-url> é a URL da sua instância de processamento de fluxo e <username> é o nome de usuário de um usuário de banco de dados com a função atlasAdmin:

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. Cole a string de conexão no seu terminal e substitua o espaço reservado <password> pelas credenciais do usuário.

      Pressione Enter para executá-lo e conecte-se à instância de processamento de fluxo.

  2. Configure um estágio $source.

    Defina uma variável para um estágio $source que ingere dados da origem sample_stream_solar.

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  3. Configure um estágio $group.

    Defina uma variável para um estágio $group que deriva a temperatura máxima e as potências média, máxima e mínima de cada dispositivo solar de acordo com seu group_id.

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $max: "$obs.temp"
    },
    avg_watts: {
    $avg: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  4. Configure um estágio $tumblingWindow.

    Para realizar acumulações como $group em dados de streaming, o Atlas Stream Processing usa o Windows para vincular o conjunto de dados. Defina uma variável para um estágio $tumblingWindow que separa o fluxo em intervalos consecutivos de 10segundos.

    Isso significa, por exemplo, que quando o estágio $group calcula um valor para max_watts, ele extrai o valor máximo dos valores obs.watts para todos os documentos com um determinado group_id ingerido nos 10 segundos anteriores.

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  5. Configurar um estágio $merge .

    Defina uma variável para um estágio do $merge que escreve os dados de streaming processados em uma collection chamada solarColl no banco de dados do solarDb do seu Atlas cluster conectado.

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  6. Crie o processador de stream.

    Use o método sp.createStreamProcessor() para atribuir um nome ao seu novo processador de fluxo e declarar seu pipeline de agregação . O estágio $group pertence ao pipeline aninhado do $tumblingWindow, e você não deve incluí-lo na definição de pipeline do processador.

    sp.createStreamProcessor("solarDemo", [s, t, m])

    Isso cria um processador de fluxo chamado solarDemo que aplica a query definida anteriormente e grava os dados processados na coleção solarColl do banco de dados solarDb no cluster ao qual você se conectou. Ele retorna várias medições derivadas de intervalos de 10 segundos de observações de seus dispositivos solares.

    Para saber mais sobre como o Atlas Stream Processing grava em bancos de dados at-rest, consulte $merge (Processamento de Stream).

5

Na lista de processadores de fluxo para sua instância de processamento de fluxo, clique no ícone Start para seu processador de fluxo.

Use o método sp.processor.start() em mongosh:

sp.solarDemo.start()
6

Para verificar se o processador de fluxo está gravando dados no cluster do Atlas:

  1. No Atlas, acesse a página Clusters do seu projeto.

    AVISO: Melhorias de navegação em andamento

    No momento, estamos lançando uma experiência de navegação nova e aprimorada. Se as etapas a seguir não corresponderem à sua visualização na UI do Atlas, consulte a documentação de visualização.

    1. Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.

    2. Se ainda não estiver exibido, selecione o projeto desejado no menu Projects na barra de navegação.

    3. Se ainda não estiver exibido, clique em Clusters na barra lateral.

      A página Clusters é exibida.

  2. Clique no botão Browse Collections para o seu cluster.

    O Data Explorer é exibido.

  3. Veja a coleção MySolar.

Para verificar se o processador está ativo, use o método sp.processor.stats() em mongosh:

sp.solarDemo.stats()

Este método relata estatísticas operacionais do processador de fluxo solarDemo.

Você também pode usar o método sp.processor.sample() em mongosh para retornar uma amostragem de documentos processados no terminal.

sp.solarDemo.sample()
{
_id: 10,
max_temp: 16,
avg_watts: 232,
max_watts: 414,
min_watts: 73
}

Observação

A saída anterior é um exemplo representativo . Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

7

Na lista de processadores de fluxo para sua instância de processamento de fluxo, clique no ícone Delete () para seu processador de fluxo.

Na caixa de diálogo de confirmação exibida, digite o nome do processador de stream (solarDemo) para confirmar que você deseja excluí-lo e clique em Delete.

Use o método sp.processor.drop() em mongosh para soltar solarDemo:

sp.solarDemo.drop()

Para confirmar que você descartou solarDemo, use o método sp.listStreamProcessors() para listar todos os seus processadores de stream disponíveis:

sp.listStreamProcessors()

Saiba como:

Voltar

Visão geral

Nesta página