Learn the "why" behind slow queries and how to fix them in our 2-Part Webinar.
Register now >
Menu Docs
Página inicial do Docs
/ /

Arquitetura do Atlas Stream Processing

A abstração principal do Atlas Stream Processing é o processador de fluxo. Um processador de stream é umpipeline de agregação do MongoDB que opera continuamente no streaming de dados de uma fonte especificada e grava a saída em um coletor. Para saber mais, consulte Estrutura de um processador de stream.

O processamento de fluxos é realizado em espaços de trabalho do processamento de fluxos. Cada espaço de trabalho do processamento de fluxos é um namespace do Atlas que associa o seguinte:

  • Um ou mais processadores de fluxo, cada um executando sua própria alocação de RAM e CPUs.

  • Um nível padrão, que determina a quantidade de memória e computação disponível para cada processador de fluxo quando você não especifica um nível.

  • Um nível máximo que determina a maior quantidade de memória e computação que pode ser alocada para um pod dentro desse espaço de trabalho do processamento de fluxos.

  • Um provedor de nuvem e uma região de nuvem.

  • Um registro de conexão, que armazena a lista de fontes e sumidouros disponíveis de dados de streaming.

  • Um contexto de segurança no qual definir autorizações de usuário.

  • Uma string de conexão para o próprio espaço de trabalho do processamento de fluxos.

Quando você define um processador de fluxo, ele fica disponível somente para o espaço de trabalho de stream processing no qual você o define. Cada processador de stream executa em recursos alocados de acordo com seu nível. O Atlas Stream Processing cobra dos usuários um processador de stream apenas enquanto ele está em execução.

Se você iniciar um processador de fluxo sem declarar um tamanho de nível, ele executará o nível do espaço de trabalho do processamento de fluxos. Você pode iniciar um processador do processamento de fluxos de qualquer nível até e incluindo o nível máximo do espaço de trabalho do processamento de fluxos

Exemplo

Você define um processador de fluxo em um espaço de trabalho do processamento de fluxos chamado myWorkspace com um nível padrão de SP10 e um nível máximo de SP30. Se você iniciar o processador sem especificar um nível, o Atlas Stream Processing o aloca a um pod SP10. No entanto, você pode declarar qualquer nível de SP2 até SP30, e o Atlas Stream Processing alocará o processador a um pod de tamanho apropriado.

Cada trabalhador pode hospedar até quatro processadores de fluxo em execução. Os espaços de trabalho de Stream Processing que são executados no modelo de trabalhadores legado cobram dos usuários de acordo com o número de trabalhadores. O Atlas Stream Processing dimensiona automaticamente seu espaço de trabalho de stream processing à medida que você inicia os processadores de stream, provisionando trabalhadores conforme necessário. Você pode desprovisionar um trabalhador interrompendo todos os processadores de stream nele. O Atlas Stream Processing sempre prefere atribuir um processador de stream a um trabalhador existente em vez de provisionamento de novos trabalhadores.

Exemplo

Você tem um espaço de trabalho do processamento de fluxos executando oito processadores de fluxo, chamados proc01 a proc08. proc01 a proc04 executado em um worker, proc05 a proc08 executado em um segundo worker. Você inicia um novo processador de fluxo chamado proc09. O Atlas Stream Processing provisiona um terceiro worker para hospedar proc09.

Depois, você para o proc03 no primeiro trabalhador. Quando você interrompe e reinicia o proc09, o Atlas Stream Processing reatribui proc09 ao primeiro trabalhador e desprovisiona o terceiro trabalhador.

Se você iniciar um novo processador de fluxo denominado proc10 antes de parar e reiniciar proc09, o Atlas Stream Processing atribuirá proc10 ao primeiro trabalhador no slot alocado anteriormente a proc03.

Ao fazer o dimensionamento, o Atlas Stream Processing considera apenas o número de processadores de fluxo em execução no momento. Ele não conta processadores de fluxo definidos que não estão em execução. O nível do espaço de trabalho do processamento de fluxos determina a alocação de RAM e CPU dos workers.

Importante

SP10 e os processadores SP30 operam e faturam os usuários de acordo com o modelo de trabalho legado . Esses processadores atualizam para o modelo de preços por processador em 3de dezembro de 2025. Para saber mais, consulte a seção modelo de trabalho da visão geral da arquitetura do Atlas Stream Processing.

Os registros de conexão armazenam uma ou mais conexões. Cada conexão atribui um nome à combinação de detalhes de rede e segurança que permitem que um processador de stream interaja com serviços externos. As conexões exibem o seguinte comportamento:

  • Somente uma conexão definida no registro de conexões de um determinado espaço de trabalho do processamento de fluxos pode atender aos processadores de fluxo hospedados nesse espaço de trabalho do processamento de fluxos.

  • Cada conexão pode atender a um número arbitrário de processadores de fluxo

  • Somente uma conexão pode servir como fonte de um determinado processador de fluxo.

  • Somente uma única conexão pode servir como coletor de um determinado processador de fluxo.

  • Uma conexão não é definida naturalmente como fonte ou coletor. Qualquer conexão pode servir qualquer função dependendo de como um processador de fluxo invoca essa conexão.

O Atlas Stream Processing executa pods do processamento de fluxos em contêineres dedicados ao cliente, em infraestrutura multilocatária. Para saber mais sobre segurança e conformidade do MongoDB, consulte a Central de Confiança do MongoDB.

O Atlas Stream Processing captura o estado de um processador de stream usando checkpoints. Cada checkpoint tem um ID exclusivo e está sujeito ao fluxo da lógica do processador de fluxo. Depois que todos os operadores de um processador de stream adicionam seu estado a um checkpoint, o Atlas Stream Processing confirma o checkpoint, gerando dois tipos de registros:

  • Um único registro de confirmação que valida o ID do checkpoint e o processador de fluxo ao qual ele pertence

  • Um conjunto de registros que descrevem o estado de cada operação com estado no processador de fluxo relevante no instante em que o Atlas Stream Processing executou o checkpoint.

Quando você reinicia um processador de fluxo após uma interrupção, o Atlas Stream Processing faz query do último checkpoint confirmado e retoma a operação a partir do estado descrito.

Depois que o Atlas Stream Processing confirma um checkpoint, ele atualiza os deslocamentos do grupo de consumidores no Kafka de forma assíncrona. Quando você usa o Apache Kafka $sourcecomo, os grupos de consumidores acompanham esses checkpoints como offsets comprometidos no cluster Kafka para cada partição.

Como essas atualizações ocorrem periódica e assincronamente, os deslocamentos comprometidos do grupo de consumidores podem atrasar temporariamente o checkpoint mais recente. Isso pode causar atrasos de curto prazo no monitoramento e métricas de atraso para ferramentas que leem deslocamentos comprometidos do Kafka, como a ferramenta CLI kafka-consumer-group. Essas ferramentas podem mostrar ao grupo de consumidores "atrás" da verdadeira posição interna do processador de stream.

O atraso entre os checkpoints internos e os offsets do Kafka confirmados não é fixo e pode variar de acordo com o volume de trabalho e a configuração. Em volumes de trabalho típicos, é da ordem de segundos, mas não há limite superior estrito.

O Atlas Stream Processing oferece suporte ao uso de uma coleção de banco de dados Atlas como uma fila de mensagens não entregues (DLQ). Quando o Atlas Stream Processing não consegue processar um documento do seu fluxo de dados, ele grava o conteúdo do documento na DLQ juntamente com os detalhes da falha de processamento. Você pode atribuir uma coleção como DLQ nas definições do seu processador de fluxo.

Para saber mais, consulte Criar um processador de fluxo.

O Atlas Stream Processing implementa políticas abrangentes de tratamento de erros e novas tentativas para garantir o processamento de streams confiável. O sistema distingue entre diferentes tipos de erros e aplica estratégias apropriadas de repetição com base na classificação do erro.

O Atlas Stream Processing classifica os erros em duas categorias principais:

Erros do usuário

Erros causados pela configuração do usuário, problemas de dados ou problemas de serviço externo que estão fora do controle do Atlas Stream Processing. Exemplos incluem:

  • Credenciais de conexão inválidas

  • Problemas de conectividade de rede com serviços externos

  • Dados malformados que não podem ser processados

  • Problemas de permissão para acessar recursos externos

Erros internos

Erros que ocorrem no próprio sistema Atlas Stream Processing , geralmente devido a problemas temporários de infraestrutura ou interrupções de serviços. A resolução deles é considerada responsabilidade do serviço de Atlas Stream Processing .

O comportamento da nova tentativa varia de acordo com a classificação do erro:

Política de nova tentativa de erro do usuário

  • O Atlas Stream Processing tenta reiniciar o processador de stream um número limitado de vezes em um período de 5minutos

  • Se todas as tentativas de repetição falharem dentro desse período, o processador de fluxo fará a transição para um estado FAILED

  • Alguns erros do usuário são classificados como não repetíveis e fazem com que o processador falhe imediatamente. Exemplos incluem:

    • StreamProcessorWorkerOutOfMemory (418): o pipeline excede os limites de memória do nível

    • StreamProcessorInvalidOptions (420): sintaxe ou configuração de pipeline inválida

  • Você pode reiniciar manualmente um processador de stream com falha chamando o método start()

Política de nova tentativa de erro interno

  • O Atlas Stream Processing tenta novamente erros internos sem um limite de tempo

  • Erros internos trigger alertas para a equipe de engenharia do Atlas Stream Processing

  • O sistema depende de mecanismos automáticos de repetição para recuperar os processadores depois que os problemas internos são resolvidos

  • Os processadores de stream são retomados automaticamente do último checkpoint quando reiniciados

Quando um processador de stream encontra um erro e exige uma reinicialização, o processo de recuperação segue estas etapas:

1

O estado atual é preservado por meio do mecanismo de checkpoint.

2

O Atlas Stream Processing tenta reiniciar o processador de acordo com a política de novas tentativas aplicável.

3

Após a reinicialização bem-sucedida, o processador retoma a operação a partir do último checkpoint confirmado.

4

O processador continua em processamento de onde parou, garantindo que não haja perda de dados.

Esse processo de recuperação garante que falhas temporárias não resultem em perda de dados ou exijam intervenção manual na maioria dos casos.

No processamento de dados em streaming, os documentos estão sujeitos a dois sistemas de temporização:

  • Hora do evento

  • Tempo em processamento

O Atlas Stream Processing oferece vários parâmetros para controlar como os processadores de stream interagem com esses sistemas de temporização.

A hora do evento é a hora em que o fluxo de origem gera um documento ou o sistema de mensagens (por exemplo Apache Kafka) recebe o documento. Isto é verificado pelo carimbo de data/hora do documento.

A latência de rede, o processamento upstream e outros fatores podem não apenas causar discrepâncias entre esses tempos de um determinado documento, mas também podem fazer com que documentos cheguem em um processador de fluxo fora da ordem do tempo de evento. Em ambos os casos, o Windows pode perder documentos que você pretende que eles capturem. O Atlas Stream Processing considera esses documentos atrasos e os envia para a fila de letras mortas, se você configurar uma.

Event Time é uma opção configurável para o campo boundary suportada em Tumbling Windows e Hopping Windows.

O horário de processamento é o momento em que o processador de fluxo consome um documento. Isso é apurado pelo relógio do sistema que hospeda o processador de fluxo.

O Tempo de Processamento é uma opção configurável para o campo boundary suportado em Tumbling Windows e Hopping Windows. Ele permite que você crie um pipeline com uma tipo de janela que acumula dados com base no horário do relógio do servidor. Ao contrário do Windows de tempo de evento, o Windows de tempo de processamento atribui a cada evento um carimbo de data/hora com base no horário do relógio do servidor quando ele chega ao processador de fluxo.

Os registros de data e hora dos documentos e os registros de data e hora dos limites da janela estão em UTC. Você não pode especificar as opções idleTimeout ou allowLateness ao configurar uma janela processingTime.

Exemplo

Você cria um pipeline com uma janela de tempo de evento de 5 minutos. Um evento é adicionado a um cluster Kafka de origem às 09:33. Devido a um atraso no cluster do Kafka, ele chega ao processador de fluxo às 09:37.

Se o pipeline tiver uma janela de tempo de evento de 5 minutos, esse evento será atribuído à janela 09:30-09:35. Se o pipeline tiver uma janela de processamento de 5 minutos, o evento será atribuído à janela de 09:35-09:40 minutos.

Uma marca dágua substitui o tempo de processamento e atualiza somente quando o processador consome um documento com um tempo de evento posterior a qualquer documento consumido anteriormente. Todos os processadores de stream aplicam marcas d'agua no Atlas Stream Processing.

Você configura um processador de stream com o Windows de 5minutos. Você inicia o processador em 12:00, de modo que as duas primeiras Windows tenham durações de 12:00-12:05 e 12:05-12:10. A tabela a seguir ilustra qual Windows capturará quais eventos dados atrasos variáveis, com e sem marcas d''d'gua.

Hora do evento
Tempo em processamento
Tempo de janela (sem marcas d'gua)
Tempo de janela (marcas d'água)

12:00

12:00

12:00-12:05

12:00-12:05

12:01

12:03

12:00-12:05

12:00-12:05

12:02

12:05

12:05-12:10

12:00-12:05

12:01

12:06

12:05-12:10

12:00-12:05

12:06

12:07

12:05-12:10

12:05-12:10

Sem marcas d'água, a janela 12:00-12:05 fecha em 12:05 de acordo com o relógio do sistema do espaço de trabalho do processamento de fluxos, e a janela 12:05-12:10 abre imediatamente. Assim, embora a fonte tenha gerado quatro dos documentos durante o intervalo 12:00-12:05, a janela relevante captura apenas dois documentos.

Com marcas d' d'gua, a janela 12:00-12:05 no fecha em 12:05 porque entre os documentos que ingere at esse ponto, a hora do evento mais recente — e, portanto, o valor da marca d'gua — é 12:03. A janela 12:00-12:05 não fecha até 12:07 no relógio do sistema, quando o processador de fluxo ingere um documento com uma hora de evento de 12:05, avança a marca d''d'gua para essa hora e abre a janela 12:05-12:10. Cada janela captura todos os documentos apropriados.

Ao ler do Apache Kafka, o Atlas Stream Processing aguarda até que todas as partições ultrapassem a marca d'água. Se uma partição ficar ociosa e não gerar eventos com carimbos de data e hora posteriores à marca d'água, a janela não será fechada nem produzirá resultados. Para resolver isso, configure partitionIdleTimeout para garantir que as partições ociosas não interrompam a progressão das marcas d'água. Para aprender mais, veja $source Estágio (Stream Processing).

Se as diferenças entre o tempo do evento e o tempo de processamento variarem o suficiente, os documentos poderão chegar a um processador de fluxo depois que a marca d''d'''d''gua tiver avançado o suficiente para fechar a janela esperada. Para mitigar isso, o Atlas Stream Processing suporta o Lateness permitido, uma configuração que atrasa o fechamento de uma janela por um intervalo definido em relação à marca d'gua.

Enquanto as marcas d'água são propriedades dos processadores de fluxo, o atraso permitido é uma propriedade de uma janela e afeta somente quando essa janela é fechada. Se a marca d'água do processador de fluxo avançar a um ponto que acione uma nova janela, o atraso permitido mantém as janelas anteriores abertas sem evitar isso.

Você configura um processador de fluxo com 5minutos girando Windows. Você inicia o processador em 12:00, de modo que as duas primeiras Windows tenham durações de 12:00-12:05 e 12:05-12:10. Você definiu um atraso permitido de 2 minutos.

A tabela abaixo reflete a ordem na qual o processador de fluxo ingere os documentos descritos.

Hora do evento
Marca d'água
Tempo de atraso permitido
Janela de tempo

12:00

12:00

11:58

12:00-12:05

12:02

12:03

12:01

12:00-12:05

12:01

12:04

12:02

12:00-12:05

12:05

12:05

12:03

12:00-12:15, 12:05-12:10

12:04

12:06

12:04

12:00-12:05, 12:05-12:10

12:07

12:07

12:05

12:05-12:10

Quando a marca d'água avança para 12:05, a janela 12:05-12:10 se abre. No entanto, como o intervalo de atraso permitido é de 2 minutos, de dentro da janela 12:00-12:05, ele é efetivamente apenas 12:03, portanto, permanece aberto. Somente quando a marca d'água avança para 12:07 é que o tempo ajustado chega a 12:05. Nesse ponto, a janela 12:00-12:05 é fechada.

O desacoplamento do comportamento de janela do tempo de processamento por padrão melhora a correção do processamento de fluxo na maioria dos casos. No entanto, uma fonte de dados de streaming pode ter períodos prolongados de inatividade. Nesse cenário, uma janela pode capturar eventos antes do período de ociosidade e não conseguir retornar os resultados processados enquanto espera que a marca d'água avance o suficiente para fechar.

O Atlas Stream Processing permite que os usuários configurem um tempo limite de inatividade para o Windows para mitigar esses cenários usando o tempo de processamento. Um tempo limite de ociosidade é um intervalo que começa quando o tempo de processamento passa do final do intervalo de uma janela aberta e a origem do processador de fluxo está ociosa. Se a fonte permanecer ociosa por um intervalo igual ao tempo limite de ociosidade, a janela será fechada e a marca d''d'''''ausência independente de qualquer ingestão de documento .

Você configura uma janela em cascata com um intervalo de 3 minutos e um tempo limite de ociosidade de 1 minuto. A tabela a seguir ilustra os efeitos do tempo limite de ociosidade durante e após o intervalo de uma janela.

Tempo em processamento
Hora ou status do evento
Marca d'água
Janela de tempo

12:00

12:00

12:00

12:00-12:03

12:01

Fonte ociosa

12:00

12:00-12:03

12:02

Fonte ociosa

12:00

12:00-12:03

12:03

Fonte ociosa

12:00

12:00-12:03

12:04

12:02

12:02

12:00-12:03

12:05

12:05

12:05

12:03-12:06

12:06

Fonte ociosa

12:05

12:03-12:06

12:07

Fonte ociosa

12:00

12:06-12:09

12:08

Fonte ociosa

12:00

12:06-12:09

12:09

12:09

12:09

12:09-12:12

Durante o intervalo 12:00-12:03, a fonte fica inativa por três minutos, mas o processador de stream não fecha a janela porque o tempo de processamento não passou do final do intervalo da janela e a fonte não permanece inativa após o término do intervalo da janela. Quando a marca d'água avança para 12:05, a janela fecha normalmente e a janela 12:03-12:06 se abre.

Quando a fonte fica inativa em 12:06, ela permanece inativa em 12:07, acionando o tempo limite de inatividade e avançando a marca d' d'gua para 12:06.

Voltar

Guia de dimensionamento de nível

Nesta página