Definição
db.collection.watch( pipeline, options )Importante
Método mongosh
Esta página documenta um método
mongosh. Esta não é a documentação de comandos de banco de dados nem drivers específicos de linguagem, como Node.js.Para o comando do banco de dados, consulte o comando
aggregatecom o estágio de agregação$changeStream.Para drivers de API do MongoDB, consulte a documentação do driver do MongoDB específica da linguagem.
Somente para conjuntos de réplica e clusters fragmentados
Abre um mudar cursor de fluxo na coleção.
ParâmetroTipoDescriçãopipelinearray
Opcional. Um pipeline de agregação que consiste em um ou mais dos seguintes estágios de agregação :
Especifique um pipeline para filtrar/modificar o resultado dos eventos de alteração.
A partir do MongoDB 4.2, os fluxos de alteração lançarão uma exceção se o pipeline de agregação do fluxo de alterações modificar o campo _id de um evento.
optionsdocumento
Opcional. Opções adicionais que modificam o comportamento
watch()do.O documento
optionspode conter os seguintes campos e valores:CampoTipoDescriçãoresumeAfterdocumento
Opcional. Direciona para tentar retomar as notificações a partir da operação especificada no token de
watch()retomada.Cada documento de evento de fluxo de alteração inclui um token de currículo como o campo
_id. Passe todo o_idcampo do documento do evento de alteração que representa a operação que você deseja retomar depois.resumeAfteré mutuamente exclusivo comstartAfterestartAtOperationTime.startAfterdocumento
Opcional. Direciona para tentar iniciar um novo fluxo de alterações após a operação especificada no token de
watch()retomada. Permite que as notificações sejam retomadas após um evento de invalidação.Cada documento de evento de fluxo de alteração inclui um token de currículo como o campo
_id. Passe todo o_idcampo do documento do evento de alteração que representa a operação que você deseja retomar depois.startAfteré mutuamente exclusivo comresumeAfterestartAtOperationTime.fullDocumentstring
Opcional. Por padrão, retorna o delta dos campos modificados por uma operação de atualização, em vez de todo o documento
watch()atualizado.Defina
fullDocumentcomo"updateLookup"para direcionar a procurar a versão mais atual confirmada por maioria do documento atualizado.watch()watch()retorna umfullDocumentcampo com a pesquisa do documentoupdateDescription, além do delta.A partir do MongoDB 6.0, você pode definir
fullDocumentcomo:"whenAvailable"para produzir o documento pós-imagem, se disponível, depois que o documento foi inserido, substituído ou atualizado."required"para gerar a pós-imagem do documento após o documento ter sido inserido, substituído ou atualizado. Cria um erro se a pós-imagem não estiver disponível.
fullDocumentBeforeChangestring
Opcional.
A partir do MongoDB 6.0, você pode usar o novo campo
fullDocumentBeforeChangee defini-lo como:"whenAvailable"para gerar a pré-imagem do documento, se disponível, antes do documento ser substituído, atualizado ou excluído."required"para gerar a pré-imagem do documento antes do documento ser substituído, atualizado ou excluído. Gera um erro se a pré-imagem não estiver disponível."off"para suprimir a pré-imagem do documento."off"é o padrão.
batchSizeint
Opcional. O número máximo de documentos que podem ser retornados em cada lote de um fluxo de alterações. Por padrão,
watch()tem um tamanho de lote inicial de101documentos ou 16 mebibytes (MiB) de documentos. Os lotes subsequentes têm um tamanho máximo de 16 mebibytes. Essa opção pode impor um limite menor do que 16 MiB, mas não um limite maior. Quando definido, obatchSizeé o menor debatchSizedocumentos ou 16 MiB em documentos.Tem a mesma funcionalidade que
cursor.batchSize().maxAwaitTimeMSint
Opcional. A quantidade máxima de tempo em milissegundos que o servidor aguarda por novas alterações de dados para relatar ao cursor do fluxo de alterações antes de retornar um lote vazio.
Padrão para
1000milissegundos.collationdocumento
Opcional. Passe um documento de agrupamento para especificar um agrupamento para o cursor do change stream.
O padrão é
simplecomparação binária se omitida.showExpandedEventsbooleano
Opcional. A partir do MongoDB 6.0, os change streams suportam notificações de alteração para eventos DDL, como os eventos createIndexes e dropIndexes. Para incluir eventos expandidos em um fluxo de alteração, crie o cursor do fluxo de alteração usando a opção
showExpandedEvents.Novidades na versão 6.0.
startAtOperationTimeTimestamp
Opcional. O ponto de partida para o fluxo de alterações. Se o ponto de partida especificado estiver no passado, ele deverá estar no intervalo de tempo do oplog. Para verificar o intervalo de tempo do oplog, consulte
rs.printReplicationInfo().startAtOperationTimeé mutuamente exclusivo comresumeAfterestartAfter.Retorna: Um cursor que permanece aberto enquanto a conexão com a implantação do MongoDB permanecer aberta e a coleção existir. Consulte Alterar eventos para obter exemplos de documentos de eventos de alteração. Dica
Compatibilidade
Esse método está disponível em implantações hospedadas nos seguintes ambientes:
MongoDB Atlas: o serviço totalmente gerenciado para implantações do MongoDB na nuvem
Observação
Este comando é aceito em todos os clusters do MongoDB Atlas. Para obter informações sobre o suporte do Atlas a todos os comandos, consulte Comandos não suportados.
MongoDB Enterprise: a versão autogerenciada e baseada em assinatura do MongoDB
MongoDB Community: uma versão com código disponível, de uso gratuito e autogerenciada do MongoDB
Disponibilidade
Implantação
db.collection.watch() está disponível para implementações de conjunto de réplica e
cluster fragmentado:
Em um conjunto de réplicas, você pode emitir
db.collection.watch()em qualquer membro portador de dados.Em um cluster fragmentado, você deve emitir
db.collection.watch()em uma instânciamongos.
Mecanismo de armazenamento
Você só pode usar db.collection.watch() com o mecanismo de armazenamento Wired Tiger.
Leia o suporte do Concern majority
A partir do MongoDB 4.2, os fluxos de alteração estão disponíveis independentemente do suporte à "majority" de leitura; ou seja, o suporte a majority de leitura pode ser habilitado (padrão) ou desabilitado para usar fluxos de alteração.
No MongoDB 4.0 e versões anteriores, os fluxos de alteração estarão disponíveis somente se "majority" suporte a preocupações de leitura estiver habilitado (padrão).
Comportamento
db.collection.watch()notifica apenas sobre alterações de dados que persistiram para a maioria dos membros portadores de dados.O cursor do fluxo de alterações permanece aberto até que ocorra uma das seguintes situações:
O cursor está explicitamente fechado.
Ocorre um evento de invalidar; por exemplo, um drop de coleção ou renomear.
A conexão com a implantação do MongoDB fecha ou expira. Consulte Comportamento para obter mais informações.
Se a implantação for um cluster fragmentado, uma remoção de fragmento pode fazer com que um cursor de fluxo de alteração aberto seja fechado e o cursor de fluxo de alterações fechado pode não ser totalmente retomável.
Retomabilidade
Ao contrário dos drivers do MongoDB, omongosh não tenta retomar automaticamente um cursor de fluxo de alterações após um erro. Os drivers do MongoDB fazem uma tentativa de retomar automaticamente um cursor de fluxo de alterações após determinados erros.
db.collection.watch() usa informações armazenadas no oplog para produzir a descrição do evento de alteração e gerar um token de retomada associado a essa operação. Se a operação identificada pelo token de retomada passada para a opção resumeAfter ou startAfter já tiver sido descartada do oplog, db.collection.watch() não poderá retomar o fluxo de alterações.
Consulte Retomar um fluxo de alterações para obter mais informações sobre como retomar um fluxo de alterações.
Observação
Não é possível usar
resumeAfterpara retomar um fluxo de alterações depois que um evento de invalidação (por exemplo, um descarte ou renomeação de coleção) fechar o fluxo. Em vez disso, você pode usar startAfter para iniciar um novo fluxo de alterações após um evento de invalidação.Se a implantação for um cluster fragmentado, uma remoção de fragmento pode fazer com que um cursor de fluxo de alteração aberto seja fechado e o cursor de fluxo de alterações fechado pode não ser totalmente retomável.
Observação
Não é possível usar resumeAfter para retomar um fluxo de alterações depois que um evento de invalidação (por exemplo, um descarte ou renomeação de coleção) fechar o fluxo. Em vez disso, você pode usar startAfter para iniciar um novo fluxo de alterações após um evento de invalidação.
Pesquisa completa de documentos de operações de atualização
Por padrão, o cursor de fluxo de alterações retorna alterações/deltas de campos específicos para operações de atualização. Você também pode configurar o fluxo de alterações para procurar e retornar a versão atual de compromisso majoritário do documento alterado. Dependendo de outras operações de gravação que podem ter ocorrido entre a atualização e a pesquisa, o documento retornado pode diferir significativamente do documento no momento da atualização.
Dependendo do número de alterações aplicadas durante a operação de atualização e o tamanho do documento completo, há o risco de que o tamanho do documento do evento de alteração para uma operação de atualização seja maior do que os 16MB que são o limite de documentos BSON. Se isso ocorrer, o servidor fechará o cursor de fluxo de alterações e retornará um erro.
Controle de acesso
Ao executar com controle de acesso, o usuário deve ter as ações de privilégio do find e changeStream no recurso de coleção. Ou seja, um usuário deve ter uma função que conceda o seguinte privilégio:
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
O papel do read embutido fornece os privilégios apropriados.
Iteração do cursor
O MongoDB oferece várias maneiras de iterar em um cursor.
O método cursor.hasNext() bloqueia e aguarda o próximo evento. Para monitorar o cursor watchCursor e iterar sobre os eventos, use hasNext() assim:
while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } }
O método cursor.tryNext() não está bloqueando. Para monitorar o cursor watchCursor e iterar sobre os eventos, use tryNext() assim:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Exemplos
Abrir um fluxo de alterações
A operação a seguir abre um cursor do fluxo de alterações contra a coleção data.sensors:
watchCursor = db.getSiblingDB("data").sensors.watch()
Itere o cursor para verificar novos eventos. Use o método cursor.isClosed() com o método cursor.tryNext() para garantir que o loop só saia se o cursor do fluxo de alteração estiver fechado e não houver objetos restantes no lote mais recente:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Para obter a documentação completa sobre a saída do fluxo de alterações, consulte Eventos de alteração.
Observação
Você não pode usar isExhausted() com alterar colunas.
Fluxo de alterações com pesquisa completa de atualização de documentos
Defina a opção fullDocument como "updateLookup" para direcionar o cursor de fluxo de alteração para pesquisar a versão da maioria mais atual comprometida do documento associada a um evento de fluxo de alteração de atualização.
A operação a seguir abre um cursor de fluxo de alterações contra a coleção data.sensors usando a opção fullDocument : "updateLookup".
watchCursor = db.getSiblingDB("data").sensors.watch( [], { fullDocument : "updateLookup" } )
Itere o cursor para verificar novos eventos. Use o método cursor.isClosed() com o método cursor.tryNext() para garantir que o loop só saia se o cursor do fluxo de alteração estiver fechado e não houver objetos restantes no lote mais recente:
while (!watchCursor.isClosed()) { let next = watchCursor.tryNext() while (next !== null) { printjson(next); next = watchCursor.tryNext() } }
Para qualquer operação de atualização, o evento de alteração retorna o resultado da pesquisa do documento no campo fullDocument.
Para obter um exemplo da saída de atualização completa do documento, consulte alterar evento de atualização de fluxo.
Para obter a documentação completa sobre a saída do fluxo de alterações, consulte Eventos de alteração.
Altere fluxos com pré e pós-imagens de documentos
A partir do MongoDB 6.0, você pode usar eventos change stream para produzir a versão de um documento antes e depois das alterações (pré e pós-imagens do documento):
A pré-imagem é o documento antes de ser substituído, atualizado ou excluído. Não há pré-imagem para um documento inserido.
A pós-imagem é o documento após ter sido inserido, substituído ou atualizado. Não há pós-imagem para um documento excluído.
Habilite
changeStreamPreAndPostImageso para uma collection utilizando,db.createCollection()createcollModou. Por exemplo, ao usar ocollModcomando:db.runCommand( { collMod: <collection>, changeStreamPreAndPostImages: { enabled: true } } )
As imagens pré e pós não estarão disponíveis para um change stream se as imagens forem:
Não habilitadas na coleção no momento de uma operação de atualização ou exclusão de documento.
Removido após o tempo de retenção pré e pós-imagem definido em
expireAfterSeconds.O exemplo a seguir define
expireAfterSecondspara100segundos em um cluster inteiro:use admin db.runCommand( { setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 100 } } } } ) Observação
O
setClusterParametercomando não é suportado em clusters MongoDB Atlas . Para obter informações sobre o suporte do Atlas para todos os comandos, consulte Comandos não suportados no Atlas.O exemplo a seguir retorna as configurações atuais do
changeStreamOptions, incluindoexpireAfterSeconds:db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) Se um change stream for removido do oplog, as imagens pré e pós correspondentes também serão excluídas, independentemente do tempo de retenção pré e pós-imagem
expireAfterSeconds.
Considerações adicionais:
Habilitar pré e pós-imagens consome espaço de armazenamento e adiciona tempo de processamento. Ative as imagens anteriores e posteriores somente se precisar delas.
Limite o tamanho do evento do fluxo de alterações para menos de 16 mebibytes. Para limitar o tamanho do evento, você pode:
Limite o tamanho do documento a 8 megabytes. Você pode solicitar imagens pré e pós simultaneamente na saída do change stream se outros campos de evento de change stream, como
updateDescriptionnão forem grandes.Solicite apenas pós-imagens na saída do fluxo de alterações para documentos de até 16 mebibytes se outros campos de evento de fluxo de alterações como
updateDescriptionnão forem grandes.Solicite somente pré-imagens na saída do change stream para documentos de até 16 mebibytes se:
as atualizações do documento afetam apenas uma pequena fração da estrutura ou conteúdo do documento, e
não causa um evento de alteração
replace. Um eventoreplacesempre inclui o pós-imagem.
Para solicitar uma pré-imagem, defina
fullDocumentBeforeChangecomorequiredouwhenAvailableemdb.collection.watch(). Para solicitar uma pós-imagem, definafullDocumentusando o mesmo método.As pré-imagens são escritas na coleção
config.system.preimages.A coleção
config.system.preimagespode ficar grande. Para limitar o tamanho da coleção, você pode definirexpireAfterSecondstempo para as pré-imagens, conforme mostrado anteriormente.As pré-imagens são removidas de forma assíncrona por um processo de plano de fundo.
Importante
Funcionalidade incompatível com versões anteriores
A partir do MongoDB 6.0, se você estiver usando imagens anteriores e posteriores de documentos para change streams, deverá desabilitar changeStreamPreandPostImages para cada coleção usando o collMod comando antes de poder fazer o downgrade para uma versão anterior do MongoDB.
Dica
Para alterar eventos de transmissão e saída, consulte Alterar eventos.
Para observar alterações em uma coleção, consulte
db.collection.watch().Para obter exemplos completos com a saída do change stream, consulte Fluxos de alterações com imagens pré e pós-documento.
criar coleta
Crie uma coleção do temperatureSensor que tenha changeStreamPreAndPostImages habilitado:
db.createCollection( "temperatureSensor", { changeStreamPreAndPostImages: { enabled: true } } )
Preencha a coleção temperatureSensor com leituras de temperatura:
db.temperatureSensor.insertMany( [ { "_id" : 0, "reading" : 26.1 }, { "_id" : 1, "reading" : 25.9 }, { "_id" : 2, "reading" : 24.3 }, { "_id" : 3, "reading" : 22.4 }, { "_id" : 4, "reading" : 24.6 } ] )
As seções a seguir mostram exemplos de fluxo de alterações para pré e pós-imagens de documentos que usam a coleção temperatureSensor.
Alterar fluxo com pré-imagem do documento
Você usa a configuração fullDocumentBeforeChange: "whenAvailable" para produzir o documento antes da imagem, se disponível. A pré-imagem é o
documento antes de ser substituído, atualizado ou excluído. Não há pré-imagem para um documento inserido.
O exemplo a seguir cria um cursor de fluxo de alteração para a coleção temperatureSensor usando fullDocumentBeforeChange:
"whenAvailable":
watchCursorFullDocumentBeforeChange = db.temperatureSensor.watch( [], { fullDocumentBeforeChange: "whenAvailable" } )
O exemplo a seguir usa o cursor para verificar novos eventos do fluxo de alterações:
while ( !watchCursorFullDocumentBeforeChange.isClosed() ) { if ( watchCursorFullDocumentBeforeChange.hasNext() ) { printjson( watchCursorFullDocumentBeforeChange.next() ); } }
No exemplo:
O loop
whileserá executado até que o cursor seja fechado.hasNext()retornatruese o cursor tiver documentos.
O exemplo a seguir atualiza o campo reading para um documento temperatureSensor:
db.temperatureSensor.updateOne( { _id: 2 }, { $set: { reading: 22.1 } } )
Após a atualização do documento temperatureSensor, o evento de alteração produz a pré-imagem do documento no campo fullDocumentBeforeChange. A pré-imagem contém o campo temperatureSensor documento reading antes de ser atualizado. Por exemplo:
{ "_id" : { "_data" : "82624B21...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1649090957, 1), "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 2 }, "updateDescription" : { "updatedFields" : { "reading" : 22.1 }, "removedFields" : [ ], "truncatedArrays" : [ ] }, "fullDocumentBeforeChange" : { "_id" : 2, "reading" : 24.3 } }
Dica
Para obter detalhes sobre a saída da atualização do documento, consulte alterar eventos de atualização do fluxo.
Para obter detalhes de saída do fluxo de alterações, consulte Eventos de Alteração.
Alterar fluxo com pós-imagem do documento
Você usa a configuração fullDocument: "whenAvailable" para produzir o documento pós-imagem, se disponível. A pós-imagem é o documento após ter sido inserido, substituído ou atualizado. Não há pós-imagem para um documento
excluído.
O exemplo a seguir cria um cursor de fluxo de alteração para a coleção temperatureSensor usando fullDocument:
"whenAvailable":
watchCursorFullDocument = db.temperatureSensor.watch( [], { fullDocument: "whenAvailable" } )
O exemplo a seguir usa o cursor para verificar novos eventos do fluxo de alterações:
while ( !watchCursorFullDocument.isClosed() ) { if ( watchCursorFullDocument.hasNext() ) { printjson( watchCursorFullDocument.next() ); } }
No exemplo:
O loop
whileserá executado até que o cursor seja fechado.hasNext()retornatruese o cursor tiver documentos.
O exemplo a seguir atualiza o campo reading para um documento temperatureSensor:
db.temperatureSensor.updateOne( { _id: 1 }, { $set: { reading: 29.5 } } )
Após a atualização do documento temperatureSensor, o evento de alteração produz o documento pós-imagem no campo fullDocument. A pós-imagem contém o campo do documento temperatureSensor reading depois de ter sido atualizado. Por exemplo:
{ "_id" : { "_data" : "8262474D...", "_typeBits" : BinData(0,"QA==") }, "operationType" : "update", "clusterTime" : Timestamp(1648840090, 1), "fullDocument" : { "_id" : 1, "reading" : 29.5 }, "ns" : { "db" : "test", "coll" : "temperatureSensor" }, "documentKey" : { "_id" : 1 }, "updateDescription" : { "updatedFields" : { "reading" : 29.5 }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Dica
Para obter detalhes sobre a saída da atualização do documento, consulte alterar eventos de atualização do fluxo.
Para obter detalhes de saída do fluxo de alterações, consulte Eventos de Alteração.
Alterar fluxo com filtro de aggregation pipeline
Observação
A partir do MongoDB 4.2, os fluxos de alteração lançarão uma exceção se o pipeline de agregação do fluxo de alterações modificar o campo _id de um evento.
A operação a seguir abre um cursor de fluxo de alterações na coleção data.sensors usando um pipeline de agregação para filtrar somente os eventos insert:
watchCursor = db.getSiblingDB("data").sensors.watch( [ { $match : {"operationType" : "insert" } } ] )
Itere o cursor para verificar novos eventos. Use o método cursor.isClosed() com o método cursor.hasNext() para garantir que o loop só saia se o cursor do fluxo de alteração estiver fechado e não houver objetos restantes no lote mais recente:
while (!watchCursor.isClosed()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }
O cursor do fluxo de alterações retorna apenas eventos de mudança onde operationType é insert. Para obter documentação completa sobre a saída do change stream, consulte Change Events.
Retomando um fluxo de alterações
Cada documento retornado por um cursor do fluxo de alterações inclui um token de retomada como o campo _id. Para retomar um fluxo de alterações, passe todo o documento _id do evento de alteração do qual você deseja retomar para a opção resumeAfter ou startAfter de watch().
A operação a seguir retoma um cursor de fluxo de alterações em relação à coleção data.sensors usando um token de retomar. Isso
pressupõe que a operação que gerou o token de continuação não
foi eliminado do oplog do cluster.
let watchCursor = db.getSiblingDB("data").sensors.watch(); let firstChange; while (!watchCursor.isClosed()) { if (watchCursor.hasNext()) { firstChange = watchCursor.next(); break; } } watchCursor.close(); let resumeToken = firstChange._id; resumedWatchCursor = db.getSiblingDB("data").sensors.watch( [], { resumeAfter : resumeToken } )
Itere o cursor para verificar novos eventos. Use o método cursor.isClosed() com o método cursor.hasNext() para garantir que o loop só saia se o cursor do fluxo de alteração estiver fechado e não houver objetos restantes no lote mais recente:
while (!resumedWatchCursor.isClosed()){ if (resumedWatchCursor.hasNext()){ print(resumedWatchCursor.next()); } }
Consulte Resume a Change Stream para obter a documentação completa sobre como retomar um fluxo de alterações.