Para agentes de IA: um índice de documentação está disponível em https://www.mongodb.com/pt-br/docs/llms.txt — as versões de markdown de todas as páginas estão disponíveis anexando .md a qualquer caminho de URL.
Menu Docs

Monitore dados com change streams

Neste guia, você pode aprender como usar um fluxo de alterações para monitorar alterações em tempo real em seu banco de dados. Um change stream é um recurso do MongoDB Server que permite que seu aplicativo se inscreva em alterações de dados em uma collection, banco de dados ou sistema.

Dica

Atlas Stream Processing

Como uma alternativa para alterar fluxos, você pode utilizar o Atlas Stream Processing para processar e transformar fluxos de dados. Ao contrário dos change streams, que registram apenas eventos de banco de dados, o Atlas Stream Processing gerencia vários tipos de evento de dados e fornece funcionalidades estendidas de processamento de dados. Para saber mais sobre esse recurso, consulte Atlas Stream Processing na documentação do MongoDB Atlas.

Os exemplos neste guia usam a collection sample_restaurants.restaurants dos conjuntos de dados de amostra do Atlas. Para saber como criar um cluster MongoDB Atlas gratuito e carregar os conjuntos de dados de amostra, consulte Introdução ao PyMongo.

Para abrir um fluxo de alteração, chame o método watch() . A instância na qual você chama o método watch() determina o escopo de eventos que o change stream escuta. Você pode chamar o método watch() nas seguintes classes:

  • MongoClient: Para monitorar todas as alterações no sistema MongoDB

  • Database: Para monitorar alterações em todas as coleções no banco de dados

  • Collection: Para monitorar alterações na coleção

O exemplo a seguir abre um fluxo de alteração na coleção restaurants e gera alterações à medida que ocorrem. Selecione a aba Synchronous ou Asynchronous para ver o código correspondente:

database = client["sample_restaurants"]
collection = database["restaurants"]
with collection.watch() as stream:
for change in stream:
print(change)
database = client["sample_restaurants"]
collection = database["restaurants"]
async with await collection.watch() as stream:
async for change in stream:
print(change)

Para começar a observar as alterações, execute o aplicação. Em seguida, em um aplicação ou shell separado, modifique a coleção restaurants. O exemplo a seguir atualiza um documento com um valor de campo name de Blarney Castle. Selecione a aba Synchronous ou Asynchronous para ver o código correspondente:

database = client["sample_restaurants"]
collection = database["restaurants"]
query_filter = { "name": "Blarney Castle" }
update_operation = { '$set' :
{ "cuisine": "Irish" }
}
result = collection.update_one(query_filter, update_operation)
database = client["sample_restaurants"]
collection = database["restaurants"]
query_filter = { "name": "Blarney Castle" }
update_operation = { '$set' :
{ "cuisine": "Irish" }
}
result = await collection.update_one(query_filter, update_operation)

Quando você atualiza a coleção, o aplicativo de fluxo de alterações imprime a alteração conforme ela ocorre. O evento de alteração impresso é semelhante ao seguinte:

{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}

Você pode passar o parâmetro pipeline para o método watch() para modificar a saída do change stream. Esse parâmetro permite que você observe somente eventos de alteração especificados. Formate o parâmetro como uma lista de objetos que cada representa um estágio de agregação.

Você pode especificar os seguintes estágios no parâmetro pipeline :

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

O exemplo a seguir usa o parâmetro pipeline para abrir um fluxo de alterações que registra somente operações de atualização. Selecione a aba Synchronous ou Asynchronous para ver o código correspondente:

change_pipeline = { "$match": { "operationType": "update" }},
with collection.watch(pipeline=change_pipeline) as stream:
for change in stream:
print(change)
change_pipeline = { "$match": { "operationType": "update" }},
async with await collection.watch(pipeline=change_pipeline) as stream:
async for change in stream:
print(change)

Para saber mais sobre como modificar a saída do change stream, consulte a seção Modificar a saída do change stream no manual do MongoDB Server .

O método watch() aceita parâmetros opcionais, que representam opções que você pode utilizar para configurar a operação. Se você não especificar nenhuma opção, o driver não personalizará a operação.

A tabela seguinte descreve as opções que você pode definir para personalizar o comportamento de watch():

Propriedade
Descrição

pipeline

Uma lista de aggregation pipeline stages que modificam a saída do change stream.

full_document

Especifica se o documento completo deve ser mostrado após a alteração, em vez de mostrar apenas as alterações feitas no documento. Para saber mais sobre essa opção, consulte Incluir pré-imagens e pós-imagens.

full_document_before_change

Especifica se o documento completo deve ser mostrado como estava antes da alteração, em vez de mostrar apenas as alterações feitas no documento. Para saber mais sobre essa opção, consulte Incluir pré-imagens e pós-imagens.

resume_after

Direciona watch() para retomar as alterações de retorno após a operação especificada no token de retomada.
Cada documento de evento de fluxo de alteração inclui um token de currículo como o _id campo. Passe todo o _id campo do documento do evento de alteração que representa a operação que você deseja retomar depois.
resume_after é mutuamente exclusivo com start_after e.start_at_operation_time

start_after

Direciona watch() para iniciar um novo fluxo de alterações após a operação especificada no token de retomada. Permite que as notificações sejam retomadas após um evento de invalidação.
Cada documento de evento de fluxo de alteração inclui um token de currículo como o _id campo. Passe todo o _id campo do documento do evento de alteração que representa a operação que você deseja retomar depois.
start_after é mutuamente exclusivo com resume_after start_at_operation_timee.

start_at_operation_time

Direciona watch() para retornar apenas eventos que ocorrem após o registro de data/hora especificado.
start_at_operation_time é mutuamente exclusivo com resume_after start_aftere.

max_await_time_ms

A quantidade máxima de tempo, em milissegundos, o servidor aguarda novas alterações de dados para relatar ao cursor do fluxo de alterações antes de retornar um lote vazio. Padrão para 1000 milissegundos.

show_expanded_events

A partir do MongoDB Server v6.0, Os fluxos de alterações oferecem suporte a notificações de alteração para eventos de Linguagem de Definição de Dados (DDL), como os eventos createIndexes e dropIndexes . Para incluir eventos expandidos em um fluxo de alteração, crie o cursor do fluxo de alteração e defina esse parâmetro como True.

batch_size

O número máximo de eventos de alteração a serem retornados em cada lote da resposta do MongoDB cluster.

collation

O agrupamento a ser usado para o cursor do change stream.

session

Uma instância de ClientSession.

comment

Um comentário a ser anexado à operação.

Importante

Você pode habilitar pré-imagens e pós-imagens em collections somente se seu sistema usar MongoDB v6.0 ou posterior.

Por padrão, quando você executa uma operação em uma collection, o evento de alteração correspondente inclui somente o delta dos campos modificados por essa operação. Para ver o documento completo antes ou depois de uma alteração, especifique os parâmetros full_document_before_change ou full_document no método watch() .

A pré-imagem é a versão completa de um documento antes de uma alteração. Para incluir a pré-imagem no evento de fluxo de alteração, defina o parâmetro full_document_before_change para um dos seguintes valores:

  • whenAvailable: o evento de alteração inclui uma pré-imagem do documento modificado para eventos de alteração somente se a pré-imagem estiver disponível.

  • required: o evento de alteração inclui uma pré-imagem do documento modificado para eventos de alteração. Se a pré-imagem não estiver disponível, o driver gerará um erro.

A pós-imagem é a versão completa de um documento após uma alteração. Para incluir a pós-imagem no evento de fluxo de alteração, defina o parâmetro full_document para um dos seguintes valores:

  • updateLookup: o evento de alteração inclui uma cópia de todo o documento alterado de algum tempo após a alteração.

  • whenAvailable: O evento de alteração inclui uma pós-imagem do documento modificado para eventos de alteração somente se a pós-imagem estiver disponível.

  • required: o evento de alteração inclui uma pós-imagem do documento modificado para eventos de alteração. Se a pós-imagem não estiver disponível, o driver gerará um erro.

O exemplo a seguir chama o método watch() em uma coleção e inclui a pós-imagem de documentos atualizados especificando o parâmetro fullDocument . Selecione a aba Synchronous ou Asynchronous para ver o código correspondente:

database = client["sample_restaurants"]
collection = database["restaurants"]
with collection.watch(full_document='updateLookup') as stream:
for change in stream:
print(change)
database = client["sample_restaurants"]
collection = database["restaurants"]
async with await collection.watch(full_document='updateLookup') as stream:
async for change in stream:
print(change)

Com o aplicativo change stream em execução, atualizar um documento na collection restaurants usando o exemplo de atualização anterior imprime um evento de alteração semelhante ao seguinte:

{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens',
'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'},
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}

Para saber mais sobre pré e pós-imagens, consulte Change Streams com pré e pós-imagens de documentos no manual do MongoDB Server .

Para saber mais sobre fluxos de alterações, consulte Change Streams de alterações no manual do MongoDB Server .

Para saber mais sobre qualquer um dos métodos ou tipos discutidos neste guia, consulte a seguinte documentação da API: