Um processador de stream do Atlas Stream Processing aplica a lógica de um agregação pipeline de stream nomeado exclusivamente aos seus dados de streaming. O Atlas Stream Processing salva cada definição do processador de stream no armazenamento persistente para que ela possa ser reutilizada. Você só pode usar um determinado processador de fluxo no espaço de trabalho de processamento de fluxo em que sua definição está armazenada.
Pré-requisitos
Para criar e managed um processador de stream, você deve ter:
Um usuário de banco
atlasAdminde dados com a função para criar e executar processadores de streamUm cluster do Atlas
Considerações
Muitos comandos do processador de fluxo exigem que você especifique o nome do processador de fluxo relevante na invocação do método. A sintaxe descrita nas seções a seguir assume nomes estritamente alfanuméricos. Se o nome do processador de stream incluir caracteres não alfanuméricos, como hífens (-) ou pontos finais (.), você deverá colocar o nome entre colchetes ([]) e aspas duplas ("") no invocação de método, como em sp.["special-name-stream"].stats().
Crie um processador de stream interativamente
Você pode criar um processador de fluxo interativamente com o sp.process() método em. Os processadores de fluxo que você cria interativamente exibem o seguinte mongosh comportamento:
Gravar documentos de saída e dead letter queue (DLQ) no shell
Começam a ser executados imediatamente após a criação
Execute por 10 minutos ou até que o usuário os pare
Não persista depois de parar
Os processadores de fluxo que você cria interativamente são destinados à prototipagem. Para criar um processador de fluxo persistente, consulte Criar um processador de fluxo.
sp.process() tem a seguinte sintaxe:
sp.process(<pipeline>)
Campo | Tipo | necessidade | Descrição |
|---|---|---|---|
| array | Obrigatório | Transmitir o pipeline de agregação que você deseja aplicar aos seus dados de streaming. |
Para criar um processador de stream interativamente:
Conecte-se ao seu espaço de trabalho de processamento de fluxo.
Use a string de conexão associada ao seu espaço de trabalho de processamento de fluxo para se conectar mongosh usando.
Exemplo
O comando a seguir se conecta a um espaço de trabalho de processamento de fluxo como um usuário chamado streamOwner usando a059 autenticação x.:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Forneça sua senha de usuário quando solicitado.
Defina um pipeline.
No prompt mongosh, atribua uma array contendo as fases de agregação que você quer aplicar a uma variável chamada pipeline.
O exemplo a seguir usa o tópico stuff na conexão myKafka no registro de conexão como o $source, corresponde a registros em que o campo temperature tem um valor de 46 e emite as mensagens processadas para o output tópico da conexão mySink no registro de conexão:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
Crie um processador de fluxo
Para criar um processador de stream que persista até você descartá-lo:
A API de Administração do Atlas fornece um endpoint para criar um processador de fluxo.
Para criar um processador de stream na UI do Atlas , vá para a página Stream Processing do seu projeto do Atlas e clique em Configure no painel do seu espaço de trabalho de processamento de streams.
Você pode escolher entre utilizar o Construtor Visual ou o editorJSON do para configurar seu processador de fluxo:
Clique em Create with visual builder.
Se houver processadores de fluxo existentes em seu espaço de trabalho de processamento de fluxo, clique no botão + Create stream processor e selecione Visual Builder nas opções suspensas.
O Construtor Visual abre com um formulário onde você pode configurar seu processador de fluxo.
Adicione uma conexão de origem.
No campo Source, selecione uma conexão na lista suspensa Connection para usar como origem do seu processador de stream.
Isso abre uma caixa de texto JSON onde você pode configurar o estágio source para seu processador de stream. Para saber mais sobre a sintaxe de estágio source, consulte $source.
Exemplo
O seguinte estágio do source opera em dados em tempo real da conexão do sample_stream_solar pré-configurado:
{ "$source": { "connectionName": "sample_stream_solar" } }
Adicione estágios de agregação ao pipeline do processador de fluxo.
No painel Start building your pipeline, clique no botão do estágio de agregação que você deseja adicionar ao seu pipeline. Isso abre uma caixa de texto onde você pode configurar o estágio de agregação selecionado no formato JSON.
Se o seu estágio de agregação não estiver listado, clique em + Custom stage para definir um estágio de agregação suportado no formato JSON. Para saber mais sobre os estágios de agregação de processamento de fluxo e sua sintaxe, consulte Estágios de aggregation pipeline.
Exemplo
O estágio $match a seguir corresponde a todos os documentos no fluxo de sample_stream_solar pré-configurado em que o campo obs.watts é maior que 300:
{ "$match": { "obs.watts": { "$gt": 300 } } }
(Opcional) Configurar estágios de agregação adicionais.
Para adicionar estágios de agregação adicionais ao seu pipeline, clique no botão + Add stage below abaixo do último estágio do seu pipeline e selecione o estágio de agregação que deseja adicionar ou clique em Custom stage para definir um estágio de agregação suportado diferente. Isso abre uma caixa de texto onde você pode configurar o novo estágio no formato JSON.
Adicione uma conexão de coletor.
No campo Sink, selecione uma conexão de destino na lista suspensa Connection.
No campo Sink, selecione uma conexão na lista suspensa Connection para gravar seus dados processados.
Isso abre uma caixa de texto JSON onde você pode configurar o estágio merge para seu processador de stream. Para saber mais sobre a sintaxe de estágio merge, consulte $merge.
Exemplo
O seguinte estágio sink grava dados processados na collection demoDb.demoColl em uma conexão chamada conexão demoConnection:
{ "$merge": { "into": { "connectionName": "demoConnection", "db": "demoDb", "coll": "demoColl" } } }
Clique em Use JSON editor.
Se houver processadores de fluxo existentes em seu espaço de trabalho de processamento de fluxo, clique no botão + Create stream processor e selecione Visual Builder nas opções suspensas.
O editor JSON abre com uma caixa de texto onde você pode configurar seu processador de fluxo no formato JSON.
Defina o processador de fluxo.
Especifique a definição JSON para seu processador de fluxo na caixa de texto do editor JSON. Essa definição deve incluir um nome para seu processador de stream e uma agregação pipeline que comece com um estágio $source e termine com o estágio $merge. Você pode incluir qualquer número de estágios de agregação adicionais entre os estágios $source e $merge.
Para saber mais sobre os estágios de agregação de processamento de fluxo e sua sintaxe, consulte Estágios de aggregation pipeline.
Exemplo
A definição JSON a seguir cria um processador de fluxo chamado solarDemo que usa um estágio $tumblingWindow com um estágio $group aninhado para agregar dados em tempo real da conexão sample_stream_solar pré-configurada em intervalos de 10segundos e grava os dados processados em uma coleção em uma conexão chamada mongodb1.
{ "name": "solarDemo", "pipeline": [ { "$source": { "connectionName": "sample_stream_solar" } }, { "$tumblingWindow": { "interval": { "size": 10, "unit": "second" }, "pipeline": [ { "$group": { "_id": "$group_id", "max_watts": { "$max": "$obs.watts" }, "min_watts": { "$min": "$obs.watts" } } } ] } }, { "$merge": { "into": { "connectionName": "mongodb1", "db": "solarDb", "coll": "solarColl" } } } ] }
Para criar um novo processador de fluxo com mongosh, use o método sp.createStreamProcessor(). Tem a seguinte sintaxe:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | Tipo | necessidade | Descrição |
|---|---|---|---|
| string | Obrigatório | Nome lógico para o processador de stream. Isso deve ser exclusivo dentro do espaço de trabalho do processamento de fluxo. Este nome deve conter apenas caracteres alfanuméricos. |
| array | Obrigatório | Transmitir o pipeline de agregação que você deseja aplicar aos seus dados de streaming. |
| objeto | Opcional | objeto que define várias configurações opcionais para o processador de fluxo. |
| objeto | Condicional | Objeto que atribui uma fila de mensagens não entregues |
| string | Condicional | Etiqueta legível por humanos que identifica uma conexão em seu registro de conexões. Esta conexão deve fazer referência a um cluster do Atlas. Este campo é necessário se você definir o campo |
| string | Condicional | Nome de um reconhecimento de data center Atlas no cluster especificado em |
| string | Condicional | Nome de uma collection no reconhecimento de data center especificado no |
| string | Opcional | A camada do pod para o qual o Atlas Stream Processing atribui o processador. Se você não declarar esta opção, o Atlas Stream Processing atribuirá o processador a um pod da camada padrão do espaço de trabalho de processamento de fluxo. Para saber mais, consulte Níveis. |
Conecte-se ao seu espaço de trabalho de processamento de fluxo.
Use a string de conexão associada ao seu espaço de trabalho de processamento de fluxo para se conectar mongosh usando.
No painel do seu espaço de trabalho de processamento de fluxo, clique em Connect.
Na caixa de diálogo Connect to your workspace, selecione a aba Shell.
Copie a string de conexão exibida na caixa de diálogo. Ele tem o seguinte formato, onde
<atlas-stream-processing-url>é a URL do seu espaço de trabalho de processamento de fluxo e<username>é o nome de usuário de um usuário de banco de dados com oatlasAdminpapel:mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> --password <password> Cole a string de conexão no seu terminal e substitua o espaço reservado
<password>pelas credenciais do usuário. Pressione Enter para executá-lo e conectar-se à sua área de trabalho de processamento de fluxo.
Exemplo
O comando a seguir se conecta a um espaço de trabalho de processamento de fluxo como um usuário chamado streamOwner usando a autenticação x..059
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Forneça sua senha de usuário quando solicitado.
Defina um pipeline.
No prompt mongosh, atribua uma array contendo as fases de agregação que você quer aplicar a uma variável chamada pipeline.
O exemplo de pipeline a seguir usa o tópico stuff na conexão myKafka no registro de conexão como $source, corresponde aos registros em que o campo temperature tem um valor de 46 e emite as mensagens processadas para o output tópico da conexão mySink no registro de conexão:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
(Opcional) Defina um DLQ.
No prompt mongosh , atribua um objeto contendo as seguintes propriedades do seu DLQ:
connectionName
Nome do Banco de Dados
Nome da Coleção
O exemplo a seguir define um DLQ na conexão cluster01 , na collection de reconhecimento de data center metadata.dlq .
deadLetter = { dlq: { connectionName: "cluster01", db: "metadata", coll: "dlq" } }
Inicie um processador de fluxo
Observação
O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.
Para iniciar um processador de fluxo:
A API de administração do Atlas fornece os endpoints Iniciar um processador de fluxo e Iniciar um processador de fluxo com opções para iniciar um processador de fluxo.
Para iniciar um processador de stream sem opções, use Iniciar um processador de stream
Para iniciar um processador de fluxo e modificar as propriedades do
$sourceestágio, use Iniciar um processador de fluxo com opções Esse endpoint permite a modificação dasstartAfterstartAtOperationTimepropriedades ou.
Para iniciar um processador de fluxo na interface do usuário do Atlas , vá para a página Stream Processing do seu projeto do Atlas e clique em Configure no painel do seu espaço de trabalho de processamento de fluxo para visualizar a lista de processadores de fluxo definidos para ele.
Em seguida, clique no ícone Start do seu processador de stream.
Para iniciar um processador de stream com,mongosh sp.processor.start() use o método. Tem a seguinte sintaxe:
sp.processor.start(<options>)
Onde <options> pode ser um dos seguintes:
Opção | Descrição |
|---|---|
| |
| |
| A camada do pod para o qual o Atlas Stream Processing atribui o processador. Se você não declarar esta opção, o Atlas Stream Processing atribuirá o processador a um pod da camada padrão do espaço de trabalho de processamento de fluxo. Para saber mais, consulte Níveis. |
Por exemplo, para iniciar um processador de fluxo denominado proc01, execute o seguinte comando:
sp.proc01.start()
{ "ok" : 1 }
Este método retorna { "ok": 1 } se o processador de stream existir e não estiver em execução no momento. Se você invocar sp.processor.start() para um processador de fluxo que não seja STOPPED, mongosh retornará um erro.
Interromper um processador de fluxo
Observação
O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.
Para interromper um processador de fluxo:
A API de Administração do Atlas fornece um endpoint para interromper um processador de fluxo.
Para pausar um processador de fluxo na interface do usuário do Atlas , vá para a página Stream Processing do seu projeto Atlas e clique em Configure no painel do seu espaço de trabalho de processamento de fluxo para visualizar a lista de processadores de fluxo definidos para ele.
Em seguida, clique no ícone Pause do seu processador de stream.
Para interromper um processador de fluxo existente com mongosh, use o método sp.processor.stop().
Por exemplo, para parar um processador de stream denominado proc01, execute o seguinte comando:
sp.proc01.stop()
{ "ok" : 1 }
Este método retorna { "ok": 1 } se o processador de stream existir e estiver em execução no momento. Se você invocar sp.processor.stop() para um processador de fluxo que não seja running, mongosh retornará um erro.
Modificar um processador de fluxo
Você pode modificar os seguintes elementos de um processador de fluxo existente:
Para modificar um processador de fluxo, siga estas etapas:
Por padrão, os processadores modificados restauram a partir do último ponto de verificação. Como alternativa, você pode definir resumeFromCheckpoint=false, caso em que o processador retém apenas estatísticas resumidas. Quando você modifica um processador com janelas abertas, as janelas são completamente recalculadas no pipeline atualizado.
Observação
Se você alterar o nome de um processador de fluxo para o qual você configurou o alerta Estado do processador de fluxo com falha usando um Operator (que contém expressões de correspondência como is, contains e mais), o Atlas não acionará alertas para o processador de fluxo renomeado se a expressão de correspondência não corresponder ao novo nome. Para monitorar o processador de fluxo renomeado, reconfigure o alerta.
Limitações
Quando a configuração padrão resumeFromCheckpoint=true está ativada, as seguintes limitações se aplicam:
Você não pode modificar a etapa
$source.Você não pode modificar o intervalo da sua janela.
Você não pode remover uma janela.
Você só pode modificar um pipeline com uma janela se essa janela contiver um estágio
$groupou$sortem seu pipeline interno.Você não pode alterar um tipo de janela existente. Por exemplo, você não pode alterar de um
$tumblingWindowpara um$hoppingWindowou vice-versa.Processadores com janelas podem reprocessar alguns dados como resultado do recálculo das janelas.
Para modificar um processador de fluxo:
A API de Administração do Atlas fornece um ponto de extremidade para modificar um processador de fluxo.
Requer mongosh v2.3.4+.
Use o comando sp.<streamprocessor>.modify() para modificar um processador de fluxo existente. <streamprocessor> deve ser o nome de um processador de fluxo parado definido para o espaço de trabalho de processamento de fluxo atual.
Por exemplo, para modificar um processador de stream chamado proc01, execute o seguinte comando:
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional }})
Adicione um estágio a um pipeline existente
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
Modificar a fonte de entrada de um processador de fluxo
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
Remover uma fila de letras mortas de um processador de fluxo.
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
Modifique um processador de fluxo com uma janela
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
Descartar um processador de fluxo
Para descartar um processador de fluxo:
A API de Administração do Atlas fornece um endpoint para excluir um processador de fluxo.
Para excluir um processador de fluxo na interface do usuário do Atlas , vá para a página Stream Processing do seu projeto Atlas e clique em Configure no painel do seu espaço de trabalho de processamento de fluxo para visualizar a lista de processadores de fluxo definidos para ele.
Em seguida, clique no ícone Delete () do seu processador de stream. Na caixa de diálogo de confirmação exibida, digite o nome do processador de fluxo (solarDemo) para confirmar que você deseja excluí-lo e clique em Delete.
Para excluir um processador de fluxo existente com mongosh, use o método sp.processor.drop().
Por exemplo, para eliminar um processador de fluxo denominado proc01, execute o seguinte comando:
sp.proc01.drop()
Este método retorna:
truese o processador de fluxo existir.falsese o processador de fluxo não existir.
Quando você descarta um processador de fluxo, todos os recursos que o Atlas Stream Processing provisionou para ele são destruídos, junto com todo o estado salvo.
Liste os processadores de fluxo disponíveis
Para listar todos os processadores de fluxo disponíveis:
A API de Administração do Atlas fornece um endpoint para listar todos os processadores de fluxo disponíveis.
Para visualizar a lista de processadores de fluxo definidos para seu espaço de trabalho de processamento de fluxo na UI do Atlas , vá para a página Stream Processing do seu projeto do Atlas e clique em Configure no painel do seu espaço de trabalho de processamento de fluxo.
A lista de processadores de fluxo e seus status são exibidos.
Para listar todos os processadores de fluxo disponíveis no espaço de trabalho de processamento de fluxo atual mongosh com, use o método. Isso retorna uma lista de documentos que contêm o nome, a hora de início, o estado atual e o pipeline associados a cada processador sp.listStreamProcessors() de stream. Tem a seguinte sintaxe:
sp.listStreamProcessors(<filter>)
<filter> é um documento que especifica por quais campos filtrar a lista.
Exemplo
O exemplo a seguir mostra um valor de retorno para uma solicitação não filtrada:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
Se você executar o comando novamente no mesmo espaço de trabalho de processamento de fluxo, filtrando por um "state" de "running", verá a seguinte saída:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
Amostra de um processador de fluxo
Para retornar uma array de resultados de amostra de um processador de stream existente para STDOUT com mongosh, use o método sp.processor.sample(). Por exemplo, as seguintes amostras de comando de um processador de fluxo denominado proc01.
sp.proc01.sample()
Esse comando é executado continuamente até que você o cancele usando CTRL-C ou até que as amostras retornadas atinjam cumulativamente 40 MB de tamanho. O processador de fluxo relata documentos inválidos na amostra em um documento _dlqMessage do seguinte formato:
{ _dlqMessage: { errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', workspaceName: '<workspaceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
Você pode usar essas mensagens para diagnosticar problemas de higiene de dados sem definir uma coleção de filas de letras mortas.
Ver estatísticas de um processador de fluxo
Observação
O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.
Para visualizar as estatísticas de um processador de fluxo:
A API de Administração do Atlas fornece um endpoint para visualizar as estatísticas de um processador de fluxo.
Para visualizar o monitoramento do seu processador de fluxo, vá para a página Stream Processing do seu projeto Atlas e abra a guia Monitoring. Em seguida, selecione seu processador de stream na lista suspensa Stream processor no canto superior esquerdo da página.
Para retornar um documento resumindo o status atual de um processador de fluxo existente com mongosh, use o método sp.processor.stats(). Tem a seguinte sintaxe:
sp.<streamprocessor>.stats({options: {<options>}})
Onde options é um documento opcional com os seguintes campos:
Campo | Tipo | Descrição |
|---|---|---|
| inteiro | Unidade a ser usada para o tamanho dos itens na saída. Por padrão, o Atlas Stream Processing exibe o tamanho do item em bytes. Para exibir em KB, especifique um |
| booleano | Sinalizador que especifica o nível de verbosidade do documento de saída. Se definido como |
O documento de saída tem os seguintes campos:
Campo | Tipo | Descrição |
|---|---|---|
| string | O namespace no qual o processador de stream está definido. |
| objeto | Um documento que descreve o estado operacional do processador de fluxo. |
| string | O nome do processador de fluxo. |
| string | O status do processador de fluxo. Este campo pode ter os seguintes valores:
|
| inteiro | A escala na qual o campo de tamanho é exibido. Se definido como |
| inteiro | O número de documentos publicados no stream. Um documento é considerado "publicado" no fluxo quando passa pelo estágio |
| inteiro | O número de bytes ou kilobytes publicados no stream. Os bytes são considerados "publicados" no fluxo quando passam pelo estágio |
| inteiro | O número de documentos processados pelo fluxo. Um documento é considerado "processado" pelo fluxo quando passa por todo o pipeline. |
| inteiro | O número de bytes ou kilobytes processados pelo stream. Os bytes são considerados "processados" pelo fluxo quando passam por todo o pipeline. |
| inteiro | O número de documento enviados para a fila de mensagens não entregues (DLQ). |
| inteiro | O número de bytes ou kilobytes enviados para a fila de mensagens não entregues (DLQ). |
| inteiro | A diferença, em segundos, entre o tempo do evento representado pelo token de retomada do fluxo de alterações mais recente e o evento mais recente no oplog. |
| token | O token de retomada do fluxo de alterações mais recente. Só se aplica a processadores de fluxo com uma fonte do fluxo de alterações. |
| documento | Estatísticas de latência para o processador de fluxo como um todo. O Atlas Stream Processing retornará este campo somente se você fornecer a opção |
| inteiro | A latência estimada do 50º percentil de todos os documentos processados nos últimos 30 segundos. Se o seu pipeline incluir um estágio de janela, as medições de latência incluirão o intervalo da janela. Por exemplo, se sua etapa |
| inteiro | A latência estimada do 99º percentil de todos os documentos processados nos últimos 30 segundos. Se o seu pipeline incluir um estágio de janela, as medições de latência incluirão o intervalo da janela. Por exemplo, se sua etapa |
| datetime | Tempo decorrido real em que o período de medição de 30 segundos mais recente começou. |
| datetime | Tempo decorrido real em que o período de medição mais recente de 30 segundos terminou. |
| string | Unidade de tempo usada para medir a latência. Este valor é sempre expresso em |
| inteiro | Número de documentos que o processador de fluxo processou no período de medição mais recente de 30 segundos. |
| inteiro | Soma de todas as medições de latência individuais, em microssegundos, realizadas no período de medição mais recente de 30 segundos. |
| inteiro | O número de bytes usados pelo Windows para armazenar o estado do processador. |
| inteiro | O carimbo de data/hora da marca d'Água atual. |
| array | As estatísticas de cada operador no pipeline do processador. O Atlas Stream Processing retorna este campo somente se você passar a opção
|
| inteiro | O uso máximo de memória do operador em bytes ou kilobytes. |
| inteiro | O tempo total de execução do operador em segundos. |
| data | O horário de início da janela mínima aberta. Este valor é opcional. |
| data | O horário de início da janela de tempo máxima aberta. Este valor é opcional. |
| array | Informações de compensação das partições de um broker Apache Kafka. |
| inteiro | O número da partição do tópico do Apache Kafka. |
| inteiro | O deslocamento em que o processador de fluxo está ativado para a partição especificada. Esse valor é igual ao offset anterior que o processador de stream processou mais |
| inteiro | O deslocamento que o processador de fluxo confirmou pela última vez ao agente Apache Kafka e o checkpoint da partição especificada. Todas as mensagens através deste deslocamento são registradas no último checkpoint. |
| booleano | O sinalizador que indica se a partição está inativa. O valor padrão é |
Por exemplo, o seguinte mostra o status de um processador de fluxo denominado proc01 em um espaço de trabalho de processamento de fluxo denominado inst01 com tamanhos de item exibidos em KB:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }