Para agentes de IA: um índice de documentação está disponível em https://www.mongodb.com/pt-br/docs/llms.txt — as versões de marcação de todas as páginas estão disponíveis anexando .md a qualquer caminho de URL .
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Menu Docs

Desenvolver processadores de stream

Um processador de fluxo Atlas Stream Processing aplica a lógica de um pipeline de agregação de fluxo com nome exclusivo aos seus dados de streaming. O Atlas Stream Processing salva cada definição do processador de fluxo no armazenamento persistente para que elas possam ser reutilizadas. Você só pode usar um determinado processador de fluxo no espaço de trabalho do processamento de fluxos em que sua definição está armazenada.

Para criar e managed um processador de stream, você deve ter:

Muitos comandos do processador de fluxo exigem que você especifique o nome do processador de fluxo relevante na invocação do método. A sintaxe descrita nas seções a seguir assume nomes estritamente alfanuméricos. Se o nome do processador de stream incluir caracteres não alfanuméricos, como hífens (-) ou pontos finais (.), você deverá colocar o nome entre colchetes ([]) e aspas duplas ("") no invocação de método, como em sp.["special-name-stream"].stats().

Observação

O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.

Observação

O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.

Você pode modificar os seguintes elementos de um processador de fluxo existente:

Para modificar um processador de fluxo, siga estas etapas:

1

Consulte Interromper um processador de stream.

2

Consulte as seguintes etapas.

3

Consulte Iniciar um processador de stream.

Por padrão, os processadores modificados restauram a partir do último ponto de verificação. Como alternativa, você pode definir resumeFromCheckpoint=false, caso em que o processador retém apenas estatísticas resumidas. Quando você modifica um processador com janelas abertas, as janelas são completamente recalculadas no pipeline atualizado.

Observação

Se você alterar o nome de um processador de fluxo para o qual você configurou o alerta Estado do processador de fluxo com falha usando um Operator (que contém expressões de correspondência como is, contains e mais), o Atlas não acionará alertas para o processador de fluxo renomeado se a expressão de correspondência não corresponder ao novo nome. Para monitorar o processador de fluxo renomeado, reconfigure o alerta.

Quando a configuração padrão resumeFromCheckpoint=true está ativada, as seguintes limitações se aplicam:

  • Você não pode modificar a etapa $source.

  • Você não pode modificar o intervalo da sua janela.

  • Você não pode remover uma janela.

  • Você só pode modificar um pipeline com uma janela se essa janela contiver um estágio $group ou $sort em seu pipeline interno.

  • Você não pode alterar um tipo de janela existente. Por exemplo, você não pode alterar de um $tumblingWindow para um $hoppingWindow ou vice-versa.

  • Processadores com janelas podem reprocessar alguns dados como resultado do recálculo das janelas.

  • As estatísticas por operador não são mantidas após uma operação de modificação.

Para retornar uma array de resultados de amostra de um processador de stream existente para STDOUT com mongosh, use o método sp.processor.sample(). Por exemplo, as seguintes amostras de comando de um processador de fluxo denominado proc01.

sp.proc01.sample()

Esse comando é executado continuamente até que você o cancele usando CTRL-C ou até que as amostras retornadas atinjam cumulativamente 40 MB de tamanho. O processador de fluxo relata documentos inválidos na amostra em um documento _dlqMessage do seguinte formato:

{
_dlqMessage: {
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
workspaceName: '<workspaceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

Você pode usar essas mensagens para diagnosticar problemas de higiene de dados sem definir uma coleção de filas de letras mortas.

Observação

O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.