Um processador de fluxo Atlas Stream Processing aplica a lógica de um pipeline de agregação de fluxo com nome exclusivo aos seus dados de streaming. O Atlas Stream Processing salva cada definição do processador de fluxo no armazenamento persistente para que elas possam ser reutilizadas. Você só pode usar um determinado processador de fluxo no espaço de trabalho do processamento de fluxos em que sua definição está armazenada.
Pré-requisitos
Para criar e managed um processador de stream, você deve ter:
Um usuário de banco de dados com a função
atlasAdminpara criar e executar processadores de streamUm cluster do Atlas
Considerações
Muitos comandos do processador de fluxo exigem que você especifique o nome do processador de fluxo relevante na invocação do método. A sintaxe descrita nas seções a seguir assume nomes estritamente alfanuméricos. Se o nome do processador de stream incluir caracteres não alfanuméricos, como hífens (-) ou pontos finais (.), você deverá colocar o nome entre colchetes ([]) e aspas duplas ("") no invocação de método, como em sp.["special-name-stream"].stats().
Crie um processador de stream interativamente
Crie um processador de fluxo
Inicie um processador de fluxo
Observação
O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.
Interromper um processador de fluxo
Observação
O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.
Modificar um processador de fluxo
Você pode modificar os seguintes elementos de um processador de fluxo existente:
Nome
Para modificar um processador de fluxo, siga estas etapas:
Por padrão, os processadores modificados restauram a partir do último ponto de verificação. Como alternativa, você pode definir resumeFromCheckpoint=false, caso em que o processador retém apenas estatísticas resumidas. Quando você modifica um processador com janelas abertas, as janelas são completamente recalculadas no pipeline atualizado.
Observação
Se você alterar o nome de um processador de fluxo para o qual você configurou o alerta Estado do processador de fluxo com falha usando um Operator (que contém expressões de correspondência como is, contains e mais), o Atlas não acionará alertas para o processador de fluxo renomeado se a expressão de correspondência não corresponder ao novo nome. Para monitorar o processador de fluxo renomeado, reconfigure o alerta.
Limitações
Quando a configuração padrão resumeFromCheckpoint=true está ativada, as seguintes limitações se aplicam:
Você não pode modificar a etapa
$source.Você não pode modificar o intervalo da sua janela.
Você não pode remover uma janela.
Você só pode modificar um pipeline com uma janela se essa janela contiver um estágio
$groupou$sortem seu pipeline interno.Você não pode alterar um tipo de janela existente. Por exemplo, você não pode alterar de um
$tumblingWindowpara um$hoppingWindowou vice-versa.Processadores com janelas podem reprocessar alguns dados como resultado do recálculo das janelas.
As estatísticas por operador não são mantidas após uma operação de modificação.
Descartar um processador de fluxo
Liste os processadores de fluxo disponíveis
Listar padrões do espaço de trabalho
Amostra de um processador de fluxo
Para retornar uma array de resultados de amostra de um processador de stream existente para STDOUT com mongosh, use o método sp.processor.sample(). Por exemplo, as seguintes amostras de comando de um processador de fluxo denominado proc01.
sp.proc01.sample()
Esse comando é executado continuamente até que você o cancele usando CTRL-C ou até que as amostras retornadas atinjam cumulativamente 40 MB de tamanho. O processador de fluxo relata documentos inválidos na amostra em um documento _dlqMessage do seguinte formato:
{ _dlqMessage: { errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', workspaceName: '<workspaceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
Você pode usar essas mensagens para diagnosticar problemas de higiene de dados sem definir uma coleção de filas de letras mortas.
Ver estatísticas de um processador de fluxo
Observação
O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.