Menu Docs

Página inicial do DocsDesenvolver aplicaçõesManual do MongoDB

Fluxos de alterações

Nesta página

  • Disponibilidade
  • Conecte
  • Assista a uma coleta, banco de dados ou implementação
  • Considerações sobre o desempenho do change stream
  • Abrir um change stream
  • Modificar o resultado do change stream
  • Consulte o documento completo para saber sobre operações de atualização
  • Retomar um change stream
  • Casos de uso
  • Controle de acesso
  • Notificação de evento
  • Agrupamentos
  • Alterar fluxos e documentos órfãos
  • Altere fluxos com pré e pós-imagens de documentos

Os change streams permitem que os aplicativos acessem alterações de dados em tempo real sem a complexidade prévia e o risco de acionar manualmente o oplog. Os aplicativos podem usar change streams para se inscrever em todas as alterações de dados em uma única collection, um banco de dados ou um sistema inteiro e reagir imediatamente a elas. Como os change streams usam o framework de aggregation, os aplicativos também podem filtrar por alterações específicas ou transformar as notificações à vontade.

A partir do MongoDB 5.1, os change streams são otimizados, proporcionando uma utilização mais eficiente dos recursos e uma execução mais rápida de alguns estágios do aggregation pipeline.

Os change streams estão disponíveis para conjuntos de réplicas e clusters fragmentados:

As conexões para um change stream podem usar listas de sementes de DNS com a opção de conexão +srv ou listar os servidores individualmente na connection string.

Se o driver perder a conexão com um change stream ou a conexão ficar inativa, ele tentará restabelecer uma conexão com o change stream por meio de outro nó no cluster que tenha uma read preference correspondente. Se o driver não conseguir encontrar um nó com a read preference correta, ele lançará uma exceção.

Para obter mais informações, consulte Formato URI da connection string.

Você pode abrir change stream em:

Alvo
Descrição
Uma collection

Você pode abrir um cursor de change stream para uma única collection (exceto collections system ou qualquer collection nos bancos de dados admin, local e config).

Os exemplos nesta página usam os MongoDB drivers para abrir e trabalhar com um change stream cursor para uma única coleção. Consulte também o mongosh método db.collection.watch().

Um banco de dados

A partir do MongoDB 4.0, você pode abrir um cursor de change stream para um único banco de dados (excluindo o banco de dados admin, local e config) para acompanhar as alterações em todas as suas coleções que não sejam do sistema.

Para saber sobre o método do driver MongoDB, consulte a documentação do driver. Consulte também o método { mongosh db.watch().

Um sistema

A partir do MongoDB 4.0, você pode abrir um cursor de change stream para um sistema (um conjunto de réplicas ou um cluster fragmentado) para observar alterações em todas as coleções que não são do sistema em todos os bancos de dados, exceto admin, local e config.

Para saber sobre o método do driver MongoDB, consulte a documentação do driver. Consulte também o método { mongosh Mongo.watch().

Observação

Alterar exemplos de fluxo

Os exemplos nesta página usam os drivers MongoDB para ilustrar como abrir um cursor de fluxo de alteração para uma coleta e trabalhar com o cursor de fluxo de alteração.

Se a quantidade de change streams ativos abertos em um banco de dados exceder o tamanho do pool de conexões, você poderá enfrentar latência de notificação. Cada change stream usa uma conexão e uma operação getMore no change stream pelo período de tempo que aguarda o próximo evento. Para evitar problemas de latência, você deve garantir que o tamanho do pool seja maior do que o número de change stream abertos. Para obter detalhes, consulte a configuração maxPoolSize.

Quando um change stream é aberto em um cluster fragmentado:

  • O mongos cria change streams individuais em cada shard. Esse comportamento ocorre independentemente de o change stream ter como alvo um determinado intervalo de chaves de shard.

  • Quando o mongos recebe resultados do fluxo de mudança, ele classifica e filtra esses resultados. Se necessário, o mongos também executa uma consulta fullDocument .

Para obter o melhor desempenho, limite o uso de queries $lookup em change streams.

Para abrir um fluxo de alteração:

  • Para um conjunto de réplicas, você pode emitir a operação de change stream a partir de qualquer membro portador de dados.

  • Para um cluster fragmentado, você deve emitir a operação de change stream a partir do mongos.

O exemplo a seguir abre um change stream para uma collection e itera sobre o cursor para recuperar os documentos do change stream. [1]


➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem dos exemplos nesta página.


Para recuperar o evento de alteração de dados do cursor, repita o cursor do change stream. Para obter informações sobre o change stream, consulte Eventos de alteração.

O change stream permanece aberto até que ocorra uma das seguintes situações:

  • O cursor está explicitamente fechado.

  • Ocorre um evento de invalidar; por exemplo, um drop de coleção ou renomear.

  • A conexão com a implantação do MongoDB fecha ou expira. Consulte Comportamentos do cursor para obter mais informações.

  • Se a implantação for um cluster fragmentado, uma remoção de fragmento pode fazer com que um cursor de fluxo de alteração aberto seja fechado e o cursor de fluxo de alterações fechado pode não ser totalmente retomável.

Observação

O ciclo de vida de um cursor não fechado depende da linguagem.

[1] A partir de MongoDB 4.0, você pode especificar um startAtOperationTime para abrir o cursor em um determinado ponto no tempo. Se o ponto de partida especificado estiver no passado, ele deverá estar no intervalo de tempo do oplog.

➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem dos exemplos nesta página.


Dica

O campo _id do documento do evento de fluxo de alterações atua como o resume token. Não use o pipeline para modificar ou remover o campo _id do evento de fluxo de alteração.

A partir do MongoDB 4.2, os change streams lançarão uma exceção se o aggregation pipeline do change stream modificar o campo _id de um evento.

Consulte Mudar eventos para obter mais informações sobre o formato do documento de resposta do change stream.

Por padrão, os change streams retornam apenas o delta dos campos durante a operação de atualização. No entanto, você pode configurar o change stream para retornar a versão mais atual comprometida por maioria do documento atualizado.


➤ Use o menu suspenso Selecione a linguagem no canto superior direito para definir a linguagem dos exemplos nesta página.


Observação

Se houver uma ou mais operações confirmadas pela maioria que modificaram o documento atualizado após a operação de atualização, mas antes da pesquisa, o documento completo retornado pode diferir significativamente do documento no momento da operação de atualização.

No entanto, os deltas incluídos no documento do change stream sempre descrevem corretamente as alterações da collection monitorada que se aplicaram a esse evento do change stream.

Consulte Mudar eventos para obter mais informações sobre o formato do documento de resposta do change stream.

Os change streams são retomáveis especificando um token de retomada para resumeAfter ou startAfter ao abrir o cursor.

Você pode retomar uma transmissão de alteração após um evento específico passando um token de currículo para resumeAfter ao abrir o cursor.

Consulte Tokens de resumo para saber mais.

Importante

  • O oplog deve ter histórico suficiente para localizar a operação associada ao token ou ao registro de data e hora, se o registro de data e hora estiver no passado.

  • Não é possível usar resumeAfter para retomar um fluxo de alterações depois que um evento de invalidação (por exemplo, uma queda ou renomeação de coleção) fechar o fluxo. Iniciando no MongoDB 4.2, você pode usar startAfter para iniciar um novo fluxo de alterações após um evento de invalidação.

Novidades na versão 4.2.

Você pode iniciar um novo change stream após um evento específico passando um token de resumo para startAfter ao abrir o cursor. Ao contrário de resumeAfter, startAfter pode retomar as notificações após um evento de invalidar criando um novo change stream.

Consulte Tokens de resumo para saber mais.

Importante

  • O oplog deve ter histórico suficiente para localizar a operação associada ao token ou ao registro de data e hora, se o registro de data e hora estiver no passado.

O token de resumo está disponível em várias fontes:

Fonte
Descrição
Cada notificação de change stream inclui um token de resumo no campo _id.

O estágio de aggregation $changeStream inclui um token de resumo no campo cursor.postBatchResumeToken.

Este campo aparece somente ao utilizar o comando aggregate.

O comando getMore inclui um token de currículo no campo cursor.postBatchResumeToken.

Alterado na versão 4.2: A partir do MongoDB 4.2, os change streams lançarão uma exceção se o pipeline de agregação modificar o campo _id de um evento.

Dica

O MongoDB fornece um "snippet", uma extensão para mongosh, que decodifica tokens de retomada codificados em hexadecimal.

Você pode instalar e executar o resumetoken trecho de mongosh:

snippet install resumetoken
decodeResumeToken('<RESUME TOKEN>')

Você também pode executar resumetoken a partir da linha de comando (sem usar o )mongosh se npm o estiver instalado em seu sistema:

npx mongodb-resumetoken-decoder <RESUME TOKEN>

Consulte o seguinte para obter mais detalhes sobre:

As notificações de alteração de evento incluem um token de currículo no campo _id:

{
"_id": {
"_data": "82635019A0000000012B042C0100296E5A1004AB1154ACACD849A48C61756D70D3B21F463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064635019A078BE67426D7CF4D2000004"
},
"operationType": "insert",
"clusterTime": Timestamp({ "t": 1666193824, "i": 1 }),
"collectionUUID": new UUID("ab1154ac-acd8-49a4-8c61-756d70d3b21f"),
"wallTime": ISODate("2022-10-19T15:37:04.604Z"),
"fullDocument": {
"_id": ObjectId("635019a078be67426d7cf4d2"'),
"name": "Giovanni Verga"
},
"ns": {
"db": "test",
"coll": "names"
},
"documentKey": {
"_id": ObjectId("635019a078be67426d7cf4d2")
}
}

Ao utilizar o comando aggregate, o estágio de agregação do $changeStream inclui um token de currículo no campo cursor.postBatchResumeToken :

{
"cursor": {
"firstBatch": [],
"postBatchResumeToken": {
"_data": "8263515EAC000000022B0429296E1404"
},
"id": Long("4309380460777152828"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp({ "t": 1666277036, "i": 1 }),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666277036, "i": 1 })
}

O comando getMore também inclui um token de resumo no campo cursor.postBatchResumeToken:

{
"cursor": {
"nextBatch": [],
"postBatchResumeToken": {
"_data": "8263515979000000022B0429296E1404"
},
"id": Long("7049907285270685005"),
"ns": "test.names"
},
"ok": 1,
"$clusterTime": {
"clusterTime": Timestamp( { "t": 1666275705, "i": 1 } ),
"signature": {
"hash": Binary(Buffer.from("0000000000000000000000000000000000000000", "hex"), 0),
"keyId": Long("0")
}
},
"operationTime": Timestamp({ "t": 1666275705, "i": 1 })
}

Os change streams podem beneficiar arquiteturas com sistemas comerciais dependentes, informando os sistemas downstream quando as mudanças nos dados forem duráveis. Por exemplo, os change streams podem economizar tempo dos desenvolvedores ao implementar serviços de extração, transformação e carga (ETL), sincronização entre plataformas, funcionalidade de colaboração e serviços de notificação.

Para implantações que impõem autenticação e autorização:

  • Para abrir um fluxo de alteração em relação a uma coleta específica, os aplicativos devem ter privilégios que concedam ações do changeStream e find na coleta correspondente.

    { resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
  • Para abrir um change stream em um único banco de dados, os aplicativos devem ter privilégios que concedam ações do changeStream e find em todas as coleções não-system no banco de dados.

    { resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
  • Para abrir um change stream em um sistema inteiro, os aplicativos devem ter privilégios que concedem ações do changeStream e find em todas as coleções não-system para todos os bancos de dados na sistema.

    { resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }

Os fluxos de alterações notificam apenas as alterações de dados que persistiram para a maioria dos membros portadores de dados no conjunto de réplicas. Isso garante que as notificações sejam acionadas somente por alterações confirmadas pela maioria que sejam duráveis em cenários de falha.

Por exemplo, considere um conjunto de réplicas de 3 membros com um cursor de change stream aberto em relação ao primary. Se um cliente emitir uma operação de inserção, o change stream só notificará o aplicativo da alteração de dados depois que essa inserção persistir para a maioria dos membros portadores de dados.

Se uma operação estiver associada a uma transação, o documento do evento de alteração incluirá o txnNumber e o lsid.

A partir do MongoDB 4.2, os fluxos de alteração usam comparações binárias simple, a menos que um agrupamento explícito seja fornecido. Em versões anteriores, os fluxos de alteração abertos em uma única coleção (db.collection.watch()) herdariam o agrupamento padrão dessa coleção.

A partir do MongoDB 5.3, durante a migração de intervalo, os eventos de fluxo de alterações não são gerados para atualizações de documentos órfãos.

A partir do MongoDB 6.0, você pode usar eventos change stream para produzir a versão de um documento antes e depois das alterações (pré e pós-imagens do documento):

  • A pré-imagem é o documento antes de ser substituído, atualizado ou excluído. Não há pré-imagem para um documento inserido.

  • A pós-imagem é o documento após ter sido inserido, substituído ou atualizado. Não há pós-imagem para um documento excluído.

  • Habilite changeStreamPreAndPostImages para uma coleção usando db.createCollection(), create ou collMod.

As imagens pré e pós não estarão disponíveis para um change stream se as imagens forem:

  • Não habilitadas na coleção no momento de uma operação de atualização ou exclusão de documento.

  • Removido após o tempo de retenção pré e pós-imagem definido em expireAfterSeconds.

    • O exemplo a seguir define expireAfterSeconds para 100 segundos em um cluster inteiro:

      use admin
      db.runCommand( {
      setClusterParameter:
      { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } }
      } )
    • O exemplo a seguir define expireAfterSeconds para 100 segundos em uma collection específica:

      use admin
      db.getSiblingDB("my_collection")
      .sensors.watch({ changeStreamOptions:
      { preAndPostImages: { expireAfterSeconds: 100 } } })
    • O exemplo a seguir retorna as configurações atuais do changeStreamOptions, incluindo expireAfterSeconds:

      db.adminCommand( { getClusterParameter: "changeStreamOptions" } )
    • Definir expireAfterSeconds para off usa a política de retenção: pré-imagens e pós-imagens são retidas até os eventos de change streams serem removidos do oplog.

    • Se um change stream for removido do oplog, as imagens pré e pós correspondentes também serão excluídas, independentemente do tempo de retenção pré e pós-imagem expireAfterSeconds .

Considerações adicionais:

  • Habilitar pré e pós-imagens consome espaço de armazenamento e adiciona tempo de processamento. Ative as imagens anteriores e posteriores somente se precisar delas.

  • Limite o tamanho do evento de transmissão da alteração para menos de 16 megabytes. Para limitar o tamanho do evento, você pode:

    • Limite o tamanho do documento a 8 megabytes. Você pode solicitar imagens pré e pós simultaneamente na saída do change stream se outros campos de evento de change stream, como updateDescription não forem grandes.

    • Solicite apenas pós-imagens na saída de fluxo de alteração para documentos de até 16 megabytes se outros campos de evento de fluxo de alteração como updateDescription não forem grandes.

    • Solicite somente pré-imagens na saída do change stream para documentos de até 16 megabytes se:

      • as atualizações do documento afetam apenas uma pequena fração da estrutura ou conteúdo do documento, e

      • não causa um evento de alteração replace . Um evento replace sempre inclui o pós-imagem.

  • Para solicitar uma pré-imagem, defina fullDocumentBeforeChange como required ou whenAvailable em db.collection.watch(). Para solicitar uma pós-imagem, defina fullDocument usando o mesmo método.

  • As pré-imagens são escritas na coleção config.system.preimages.

    • A coleção config.system.preimages pode ficar grande. Para limitar o tamanho da coleção, você pode definir expireAfterSeconds tempo para as pré-imagens, conforme mostrado anteriormente.

    • As pré-imagens são removidas de forma assíncrona por um processo de plano de fundo.

Importante

Funcionalidade incompatível com versões anteriores

A partir do MongoDB 6.0, se você estiver usando imagens anteriores e posteriores de documentos para change streams, deverá desabilitar changeStreamPreandPostImages para cada coleção usando o collMod comando antes de poder fazer o downgrade para uma versão anterior do MongoDB.

Dica

Veja também:

Para obter exemplos completos com a saída do fluxo de alterações, consulte Fluxos de alterações com imagens pré e pós-documento.

← Limitações de coleta de séries temporais