MongoDB.local SF, Jan 15: See the speaker lineup & ship your AI vision faster. Use WEB50 to save 50%
Find out more >
Menu Docs
Página inicial do Docs
/ /

Introdução ao Atlas Stream Processing

Este tutorial orienta você pelas etapas de configuração do Atlas Stream Processing e execução do seu primeiro processador de stream.

Para concluir este tutorial você precisa:

  • Um projeto do Atlas com um cluster vazio. Esse cluster serve como coletor de dados para seu processador de stream.

  • Um usuário de banco atlasAdmin de dados com a função para criar e executar processadores de stream

  • Um usuário do Atlas com a Project Owner função ou Project Stream Processing Owner para gerenciar um espaço de trabalho de processamento de fluxo e um registro de conexão

    Observação

    A função Project Owner permite que você crie sistemas de banco de dados, gerencie o acesso ao projeto e as configurações do projeto, gerencie entradas da lista de acesso IP e muito mais.

    A função Project Stream Processing Owner permite ações do Atlas Stream Processing, como ver, criar, excluir e editar espaços de trabalho do processamento de fluxos, e ver, adicionar, modificar e excluir conexões no registro de conexões.

    Consulte Funções de projeto para saber mais sobre as diferenças entre as duas funções.

Este tutorial guia você na criação de um espaço de trabalho do processamento de fluxos, conectando-o a um cluster Atlas existente e configurando um processador de fluxo para ingerir dados de amostra de aparelhos solares de streaming e gravar os dados no seu cluster conectado.

1
  1. No Atlas, Go para a página Stream Processing do seu projeto.

    1. Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.

    2. Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.

    3. Na barra lateral, clique em Stream Processing sob o título Streaming Data.

      A página Processamento de fluxo é exibida.

  2. Clique em Create a workspace.

  3. Na página Create a stream processing workspace, configure seu espaço de trabalho da seguinte maneira:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Workspace Name: tutorialWorkspace

  4. Clique em Create.

2

Adicione uma conexão com um Atlas cluster vazio existente ao registro de conexão. Seu processador de stream usará esta conexão como coletor de dados de streaming.

  1. No painel da sua área de trabalho do processamento de fluxos, clique em Configure.

  2. Na aba Connection Registry , clique em + Add Connection no canto superior direito.

  3. Na lista suspensa Connection Type, clique em Atlas Database.

  4. No campo Connection Name, digite mongodb1.

  5. Na lista suspensa Atlas Cluster, selecione um Atlas cluster sem nenhum dado armazenado nele.

  6. Na lista suspensa Execute as, selecione Read and write to any database.

  7. Clique em Add connection.

3

Seu espaço de trabalho do processamento de fluxos vem pré-configurado com uma conexão para uma fonte de dados de amostra chamada sample_stream_solar. Essa fonte gera um fluxo de relatórios de vários dispositivos de energia solar. Cada relatório descreve a potência e a temperatura observadas de um único aparelho solar em um ponto temporal específico, bem como a potência máxima desse aparelho.

O seguinte documento representa um relatório desta fonte de dados:

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
}
}

Para verificar se esta fonte emite mensagens, crie um processador de stream interativamente usando mongosh:

  1. Conecte-se ao seu espaço de trabalho do processamento de fluxos.

    Use a string de conexão associada ao seu espaço de trabalho do processamento de fluxos para conectar usando mongosh.

    1. No painel da sua área de trabalho do processamento de fluxos, clique em Connect.

    2. Na caixa de diálogo de conexão do espaço de trabalho, clique em Choose a connection method e selecione a aba Shell.

    3. Copie a string de conexão exibida na caixa de diálogo. Ele tem o seguinte formato, onde <atlas-stream-processing-url> é a URL do seu espaço de trabalho de processamento de fluxo e <username> é o nome de usuário de um usuário de banco de dados com o atlasAdmin papel:

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. Cole a string de conexão no seu terminal e substitua o espaço reservado <password> pelas credenciais do usuário.

      Pressione "Enter" para executar e conectar-se ao seu espaço de trabalho do processamento de fluxos.

  2. No prompt mongosh, use o método sp.process() para criar o processador de stream interativamente.

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    Verifique se os dados da conexão sample_stream_solar são exibidos no console e encerre o processo.

    Os processadores de stream que você cria com sp.process() não persistem depois que você os encerra.

4

Um processador de fluxo persistente ingere, processa e escreve continuamente dados de streaming em um coletor de dados especificado até que você solte o processador. O processador de fluxo a seguir é um pipeline de agregação que deriva a temperatura máxima e as potências média, máxima e mínima de cada dispositivo solar em intervalos de 10segundos e, em seguida, grava os resultados em seu cluster vazio conectado.

Selecione uma das seguintes guias para criar um processador de fluxo utilizando a UI do Atlas ou mongosh:

Para criar um processador de fluxo na IU do Atlas, acesse a página Stream Processing do seu projeto do Atlas e clique em Configure no painel do seu espaço de trabalho do processamento de fluxos. Em seguida, escolha entre usar o construtor visual ou o editor JSON para configurar um processador de fluxo chamado solarDemo:

  1. Clique em Create with visual builder.

    O Construtor Visual abre com um formulário onde você pode configurar seu processador de fluxo.

  2. No campo Stream processor name, digite solarDemo.

  3. No campo Source, selecione sample_stream_solar na lista suspensa Connection.

    Isso adiciona o seguinte estágio do $source ao seu pipeline de agregação :

    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    }
  4. Configure um estágio $tumblingWindow.

    Start building your pipeline No painel, clique + Custom stage em e copie e cole o seguinte JSON na caixa de texto que aparece. Isso define um estágio $tumblingWindow com um estágio $group aninhado que deriva a temperatura máxima e as potências máximas, mínimas e médias de cada dispositivo solar em intervalos de 10segundos.

    Isso significa, por exemplo, que quando o estágio $group calcula um valor para max_watts, ele extrai o valor máximo dos valores obs.watts para todos os documentos com um determinado group_id ingerido nos 10 segundos anteriores.

    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [ {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }]
    }
    }
  5. No campo Sink, selecione mongodb1 na lista suspensa Connection.

    Na caixa de texto exibida, copie e cole o seguinte JSON. Isso configura um estágio$merge que grava os dados de streaming processados em uma coleção chamada solarColl no banco de dados solarDb do seu Atlas cluster conectado:

    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "solarDb",
    "coll": "solarColl"
    }
    }
    }
  6. Clique em Create stream processor.

    O processador de stream é criado e listado na aba Stream Processors da página Stream Processing.

  1. Clique em Use JSON editor.

    O editor JSON abre com uma caixa de texto onde você pode configurar seu processador de fluxo no formato JSON.

  2. Defina o processador de fluxo.

    Copie e cole a seguinte definição JSON na caixa de texto do editor JSON para definir um processador de fluxo denominado solarDemo. Esse processador de fluxo usa um estágio $tumblingWindow com um estágio $group aninhado para derivar a temperatura máxima e as potências máximas, mínimas e médias de cada dispositivo solar em intervalos de 10segundos e, em seguida, grava os resultados em uma coleção chamada solarColl no banco de dados Cluster0 do seu Atlas cluster conectado.

    Isso significa, por exemplo, que quando o estágio $group calcula um valor para max_watts, ele extrai o valor máximo dos valores obs.watts para todos os documentos com um determinado group_id ingerido nos 10 segundos anteriores.

    {
    "name": "solarDemo",
    "pipeline": [
    {
    "$source": {
    "connectionName": "sample_stream_solar"
    }
    },
    {
    "$tumblingWindow": {
    "interval": {
    "size": 10,
    "unit": "second"
    },
    "pipeline": [
    {
    "$group": {
    "_id": "$group_id",
    "max_temp": {
    "$max": "$obs.temp"
    },
    "max_watts": {
    "$max": "$obs.watts"
    },
    "min_watts": {
    "$min": "$obs.watts"
    },
    "avg_watts": {
    "$avg": "$obs.watts"
    }
    }
    }
    ]
    }
    },
    {
    "$merge": {
    "into": {
    "connectionName": "mongodb1",
    "db": "Cluster0",
    "coll": "solarColl"
    },
    "parallelism":16,
    }
    }
    ]
    }

Execute os seguintes comandos em mongosh para criar um processador de fluxo persistente denominado solarDemo:

  1. Conecte-se ao seu espaço de trabalho do processamento de fluxos.

    Use a string de conexão associada ao seu espaço de trabalho do processamento de fluxos para conectar usando mongosh.

    1. No painel da sua área de trabalho do processamento de fluxos, clique em Connect.

    2. Na caixa de diálogo Connect to your workspace, selecione a aba Shell.

    3. Copie a string de conexão exibida na caixa de diálogo. Ele tem o seguinte formato, onde <atlas-stream-processing-url> é a URL do seu espaço de trabalho de processamento de fluxo e <username> é o nome de usuário de um usuário de banco de dados com o atlasAdmin papel:

      mongosh "mongodb://<atlas-stream-processing-url>/"
      --tls --authenticationDatabase admin --username <username>
      --password <password>
    4. Cole a string de conexão no seu terminal e substitua o espaço reservado <password> pelas credenciais do usuário.

      Pressione "Enter" para executar e conectar-se ao seu espaço de trabalho do processamento de fluxos.

  2. Configure um estágio $source.

    Defina uma variável para um estágio $source que ingere dados da origem sample_stream_solar.

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  3. Configure um estágio $group.

    Defina uma variável para um estágio $group que deriva a temperatura máxima e as potências média, máxima e mínima de cada dispositivo solar de acordo com seu group_id.

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $max: "$obs.temp"
    },
    avg_watts: {
    $avg: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  4. Configure um estágio $tumblingWindow.

    Para realizar acumulações como $group em dados de streaming, o Atlas Stream Processing usa o Windows para vincular o conjunto de dados. Defina uma variável para um estágio $tumblingWindow que separa o fluxo em intervalos consecutivos de 10segundos.

    Isso significa, por exemplo, que quando o estágio $group calcula um valor para max_watts, ele extrai o valor máximo dos valores obs.watts para todos os documentos com um determinado group_id ingerido nos 10 segundos anteriores.

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  5. Configurar um estágio $merge .

    Defina uma variável para um estágio do $merge que escreve os dados de streaming processados em uma collection chamada solarColl no banco de dados do solarDb do seu Atlas cluster conectado.

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  6. Crie o processador de stream.

    Use o método sp.createStreamProcessor() para atribuir um nome ao seu novo processador de fluxo e declarar seu pipeline de agregação . O estágio $group pertence ao pipeline aninhado do $tumblingWindow, e você não deve incluí-lo na definição de pipeline do processador.

    sp.createStreamProcessor("solarDemo", [s, t, m])

    Isso cria um processador de fluxo chamado solarDemo que aplica a query definida anteriormente e grava os dados processados na coleção solarColl do banco de dados solarDb no cluster ao qual você se conectou. Ele retorna várias medições derivadas de intervalos de 10 segundos de observações de seus dispositivos solares.

    Para saber mais sobre como o Atlas Stream Processing grava em bancos de dados at-rest, consulte $merge (Processamento de Stream).

5

Na lista de processadores do processamento de fluxos para sua área de trabalho do processamento de fluxos, clique no ícone Start para seu processador de fluxos.

Use o método sp.processor.start() em mongosh:

sp.solarDemo.start()
6

Para verificar se o processador de fluxo está gravando dados no cluster do Atlas:

  1. No Atlas, acesse a página Clusters do seu projeto.

    1. Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.

    2. Se ainda não estiver exibido, selecione o projeto desejado no menu Projects na barra de navegação.

    3. Na barra lateral, clique em Clusters sob o título Database.

      A página Clusters é exibida.

  2. No Atlas, acesse a página Data Explorer do seu projeto.

    1. Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.

    2. Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.

    3. Na barra lateral, clique em Data Explorer sob o título Database.

      O Data Explorer é exibido.

    Observação

    Você também pode ir para a página Clusters e clicar em Data Explorer sob o título Shortcuts.

  3. Veja a coleção MySolar.

Para verificar se o processador está ativo, use o método sp.processor.stats() em mongosh:

sp.solarDemo.stats()

Este método relata estatísticas operacionais do processador de fluxo solarDemo.

Você também pode usar o método sp.processor.sample() em mongosh para retornar uma amostragem de documentos processados no terminal.

sp.solarDemo.sample()
{
_id: 10,
max_temp: 16,
avg_watts: 232,
max_watts: 414,
min_watts: 73
}

Observação

A saída anterior é um exemplo representativo . Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

7

Na lista de processadores de fluxo para seu espaço de trabalho do processamento de fluxos, clique no ícone Delete () do seu processador de fluxo.

Na caixa de diálogo de confirmação exibida, digite o nome do processador de stream (solarDemo) para confirmar que você deseja excluí-lo e clique em Delete.

Use o método sp.processor.drop() em mongosh para soltar solarDemo:

sp.solarDemo.drop()

Para confirmar que você descartou solarDemo, use o método sp.listStreamProcessors() para listar todos os seus processadores de stream disponíveis:

sp.listStreamProcessors()

Saiba como:

Voltar

Atlas Stream Processing

Nesta página