Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Menu Docs
Página inicial do Docs
/
Atlas
/

Arquitetura do Atlas Stream Processing

A abstração principal do Atlas Stream Processing é o processador de fluxo. Um processador de stream é um pipeline de agregação do MongoDB que opera continuamente no streaming de dados de uma fonte especificada e grava a saída em um coletor. Para saber mais, consulte Estrutura de um processador de stream.

O processamento do fluxo ocorre em instâncias de processamento de fluxo. Cada instância de processamento de fluxo é um namespace do Atlas que associa o seguinte:

  • Um ou mais trabalhadores, que disponibilizam a RAM e as CPUs necessárias para executar seus processadores de fluxo.

  • Um provedor de nuvem e uma região de nuvem.

  • Um registro de conexão, que armazena a lista de fontes e sumidouros disponíveis de dados de streaming.

  • Um contexto de segurança no qual definir autorizações de usuário.

  • Uma connection string com a própria instância do Atlas Stream Processing .

Quando você define um processador de fluxo, ele fica disponível somente para a instância de processamento de fluxo na qual você o define. Cada worker pode hospedar até quatro processadores de fluxo em execução; o Atlas Stream Processing dimensiona automaticamente sua instância de processamento de fluxo à medida que você inicia os processadores de fluxo, provisionando workers conforme necessário. Você pode desprovisionar um worker interrompendo todos os processadores de fluxo nele. O Atlas Stream Processing sempre dá preferência em atribuir um processador de fluxo a um worker existente em vez de provisionar novos workers.

Exemplo

Você tem uma instância Atlas Stream Processing executando oito processadores de stream, denominados proc01 por meio de proc08. proc01 a proc04 são executadas em um trabalhador, proc05 a proc08 executadas em um segundo trabalhador. Você inicia um novo processador de fluxo denominado proc09. O Atlas Stream Processing provisiona um terceiro trabalhador para hospedar proc09.

Depois, você para o proc03 no primeiro trabalhador. Quando você interrompe e reinicia o proc09, o Atlas Stream Processing reatribui proc09 ao primeiro trabalhador e desprovisiona o terceiro trabalhador.

Se você iniciar um novo processador de fluxo denominado proc10 antes de parar e reiniciar proc09, o Atlas Stream Processing atribuirá proc10 ao primeiro trabalhador no slot alocado anteriormente a proc03.

Ao fazer o dimensionamento, o Atlas Stream Processing considera apenas o número de processadores de fluxo em execução no momento. Ele não conta processadores de fluxo definidos que não estão em execução. O nível da instância de processamento de fluxo determina a alocação de RAM e CPU dos seus trabalhadores.

Os registros de conexão armazenam uma ou mais conexões. Cada conexão atribui um nome à combinação de detalhes de rede e segurança que permitem que um processador de stream interaja com serviços externos. As conexões exibem o seguinte comportamento:

  • Somente uma conexão definida no registro de conexões de uma determinada instância de processamento de fluxo pode atender aos processadores de fluxo hospedados nessa instância de processamento de fluxo.

  • Cada conexão pode atender a um número arbitrário de processadores de fluxo

  • Somente uma conexão pode servir como fonte de um determinado processador de fluxo.

  • Somente uma única conexão pode servir como coletor de um determinado processador de fluxo.

  • Uma conexão não é definida naturalmente como fonte ou coletor. Qualquer conexão pode servir qualquer função dependendo de como um processador de fluxo invoca essa conexão.

Atlas Stream Processing executa trabalhadores do Atlas Stream Processing em containers de clientes dedicados, em infraestrutura de vários inquilinos. Para obter mais informações sobre segurança e conformidade do MongoDB, consulte a Central de confiança do MongoDB.

O Atlas Stream Processing captura o estado de um processador de stream usando checkpoints. Cada checkpoint tem um ID exclusivo e está sujeito ao fluxo da lógica do processador de fluxo. Depois que todos os operadores de um processador de stream adicionam seu estado a um checkpoint, o Atlas Stream Processing confirma o checkpoint, gerando dois tipos de registros:

  • Um único registro de confirmação que valida o ID do checkpoint e o processador de fluxo ao qual ele pertence

  • Um conjunto de registros que descrevem o estado de cada operação com estado no processador de fluxo relevante no instante em que o Atlas Stream Processing executou o checkpoint.

Quando você reinicia um processador de fluxo após uma interrupção, o Atlas Stream Processing faz query do último checkpoint confirmado e retoma a operação a partir do estado descrito.

O Atlas Stream Processing oferece suporte ao uso de uma coleção de banco de dados Atlas como uma fila de mensagens não entregues (DLQ). Quando o Atlas Stream Processing não consegue processar um documento do seu fluxo de dados, ele grava o conteúdo do documento na DLQ juntamente com os detalhes da falha de processamento. Você pode atribuir uma coleção como DLQ nas definições do seu processador de fluxo.

Para saber mais, consulte Criar um processador de fluxo.

No processamento de dados em streaming, os documentos estão sujeitos a dois sistemas de temporização:

  • Hora do evento

  • Tempo em processamento

O Atlas Stream Processing oferece vários parâmetros para controlar como os processadores de stream interagem com esses sistemas de temporização.

A hora do evento é a hora em que o fluxo de origem gera um documento ou o sistema de mensagens (por exemplo, Apache Kafka) recebe o documento. Isto é verificado pelo carimbo de data/hora do documento.

A latência de rede, o processamento upstream e outros fatores podem não apenas causar discrepâncias entre esses tempos de um determinado documento, mas também podem fazer com que documentos cheguem em um processador de fluxo fora da ordem do tempo de evento. Em ambos os casos, as janelas podem perder documentos que você pretende que elas capturem. O Atlas Stream Processing considera esses documentos atrasos e os envia para a fila de letras mortas, se você configurar uma.

Event Time é uma opção configurável para o boundary campo suportada em Tbling Windows e Hopping Windows.

O horário de processamento é o momento em que o processador de fluxo consome um documento. Isso é apurado pelo relógio do sistema que hospeda o processador de fluxo.

O Tempo de Processamento é uma opção configurável para o boundary campo suportado em Tbling Windows e Hopping Windows. Ele permite que você crie um pipeline com uma tipo de janela que acumula dados com base no horário do relógio do servidor. Em oposição às janelas de tempo de evento , as janelas de tempo de processamento atribuem a cada evento um carimbo de data/hora com base na hora do relógio do servidor quando ele chega ao processador de fluxo.

Os registros de data e hora dos documentos e os registros de data e hora dos limites da processingTime janela estão em UTC . Você não pode especificar as opções idleTimeout ou allowLateness ao configurar uma janela.

Exemplo

Você cria um pipeline com uma janela de tempo de evento de 5 minutos. Um evento é adicionado a um cluster Kafka de origem às 09:33. Devido a um atraso no cluster do Kafka, ele chega ao processador de fluxo às 09:37.

Se o pipeline tiver uma janela de tempo de evento de 5 minutos, esse evento será atribuído à janela 09:30-09:35. Se o pipeline tiver uma janela de processamento de 5 minutos, o evento será atribuído à janela de 09:35-09:40 minutos.

Uma marca dágua substitui o tempo de processamento e atualiza somente quando o processador consome um documento com um tempo de evento posterior a qualquer documento consumido anteriormente. Todos os processadores de stream aplicam marcas d'agua no Atlas Stream Processing.

Você configura um processador de stream com o Windows de 5minutos. Você inicia o processador em 12:00, de modo que as duas primeiras Windows tenham durações de 12:00-12:05 e 12:05-12:10. A tabela a seguir ilustra qual Windows capturará quais eventos dados atrasos variáveis, com e sem marcas d''d'gua.

Hora do evento
Tempo em processamento
Tempo de janela (sem marcas d'gua)
Tempo de janela (marcas d'água)

12:00

12:00

12:00-12:05

12:00-12:05

12:01

12:03

12:00-12:05

12:00-12:05

12:02

12:05

12:05-12:10

12:00-12:05

12:01

12:06

12:05-12:10

12:00-12:05

12:06

12:07

12:05-12:10

12:05-12:10

Sem marcas d'gua, a janela 12:00-12:05 fecha às 12:05 de acordo com o relógio do sistema da instância de processamento de fluxo e a janela 12:05-12:10 abre imediatamente. Assim, embora a origem tenha gerado quatro dos documentos durante o intervalo 12:00-12:05, a janela relevante captura apenas dois documentos.

Com marcas d' d'gua, a janela 12:00-12:05 no fecha em 12:05 porque entre os documentos que ingere at esse ponto, a hora do evento mais recente — e, portanto, o valor da marca d'gua — é 12:03. A janela 12:00-12:05 não fecha até 12:07 no relógio do sistema, quando o processador de fluxo ingere um documento com uma hora de evento de 12:05, avança a marca d''d'gua para essa hora e abre a janela 12:05-12:10. Cada janela captura todos os documentos apropriados.

Ao ler do Apache Kafka, o Atlas espera que todas as partições passem a marca d'gua. Se uma partição ficar ociosa e não produzir eventos com carimbos de data/hora posteriores à marca d'gua, a janela não será fechada nem produzirá resultados. Para resolver isso, defina partitionIdleTimeout para garantir que as partições ociosas não interrompam a progresso das marcas d'gua. Para saber mais, consulte o $source estágio (processamento de stream).

Se as diferenças entre o tempo do evento e o tempo de processamento variarem o suficiente, os documentos poderão chegar a um processador de fluxo depois que a marca d''d'''d''gua tiver avançado o suficiente para fechar a janela esperada. Para mitigar isso, o Atlas Stream Processing suporta o Lateness permitido, uma configuração que atrasa o fechamento de uma janela por um intervalo definido em relação à marca d'gua.

Enquanto as marcas d'água são propriedades dos processadores de fluxo, o atraso permitido é uma propriedade de uma janela e afeta somente quando essa janela é fechada. Se a marca d'água do processador de fluxo avançar a um ponto que acione uma nova janela, o atraso permitido mantém as janelas anteriores abertas sem evitar isso.

Você configura um processador de fluxo com 5minutos girando Windows. Você inicia o processador em 12:00, de modo que as duas primeiras Windows tenham durações de 12:00-12:05 e 12:05-12:10. Você definiu um atraso permitido de 2 minutos.

A tabela abaixo reflete a ordem na qual o processador de fluxo ingere os documentos descritos.

Hora do evento
Marca d'água
Tempo de atraso permitido
Janela de tempo

12:00

12:00

11:58

12:00-12:05

12:02

12:03

12:01

12:00-12:05

12:01

12:04

12:02

12:00-12:05

12:05

12:05

12:03

12:00-12:15, 12:05-12:10

12:04

12:06

12:04

12:00-12:05, 12:05-12:10

12:07

12:07

12:05

12:05-12:10

Quando a marca d'água avança para 12:05, a janela 12:05-12:10 se abre. No entanto, como o intervalo de atraso permitido é de 2 minutos, de dentro da janela 12:00-12:05, ele é efetivamente apenas 12:03, portanto, permanece aberto. Somente quando a marca d'água avança para 12:07 é que o tempo ajustado chega a 12:05. Nesse ponto, a janela 12:00-12:05 é fechada.

O desacoplamento do comportamento de janela do tempo de processamento por padrão melhora a correção do processamento de fluxo na maioria dos casos. No entanto, uma fonte de dados de streaming pode ter períodos prolongados de inatividade. Nesse cenário, uma janela pode capturar eventos antes do período de ociosidade e não conseguir retornar os resultados processados enquanto espera que a marca d'água avance o suficiente para fechar.

O Atlas Stream Processing permite que os usuários configurem um tempo limite de inatividade para o Windows para mitigar esses cenários usando o tempo de processamento. Um tempo limite de ociosidade é um intervalo que começa quando o tempo de processamento passa do final do intervalo de uma janela aberta e a origem do processador de fluxo está ociosa. Se a fonte permanecer ociosa por um intervalo igual ao tempo limite de ociosidade, a janela será fechada e a marca d''d'''''ausência independente de qualquer ingestão de documento .

Você configura uma janela em cascata com um intervalo de 3 minutos e um tempo limite de ociosidade de 1 minuto. A tabela a seguir ilustra os efeitos do tempo limite de ociosidade durante e após o intervalo de uma janela.

Tempo em processamento
Hora ou status do evento
Marca d'água
Janela de tempo

12:00

12:00

12:00

12:00-12:03

12:01

Fonte ociosa

12:00

12:00-12:03

12:02

Fonte ociosa

12:00

12:00-12:03

12:03

Fonte ociosa

12:00

12:00-12:03

12:04

12:02

12:02

12:00-12:03

12:05

12:05

12:05

12:03-12:06

12:06

Fonte ociosa

12:05

12:03-12:06

12:07

Fonte ociosa

12:00

12:06-12:09

12:08

Fonte ociosa

12:00

12:06-12:09

12:09

12:09

12:09

12:09-12:12

Durante o intervalo 12:00-12:03, a fonte fica inativa por três minutos, mas o processador de stream não fecha a janela porque o tempo de processamento não passou do final do intervalo da janela e a fonte não permanece inativa após o término do intervalo da janela. Quando a marca d'água avança para 12:05, a janela fecha normalmente e a janela 12:03-12:06 se abre.

Quando a fonte fica inativa em 12:06, ela permanece inativa em 12:07, acionando o tempo limite de inatividade e avançando a marca d' d'gua para 12:06.

Voltar

Começar

Nesta página