Visão geral
Neste guia, você pode aprender como usar um fluxo de alterações para monitorar alterações em tempo real em seu banco de dados. Um change stream é um recurso do MongoDB Server que permite que seu aplicativo se inscreva em alterações de dados em uma collection, banco de dados ou sistema.
Dica
Atlas Stream Processing
Como uma alternativa para alterar fluxos, você pode utilizar o Atlas Stream Processing para processar e transformar fluxos de dados. Ao contrário dos change streams, que registram apenas eventos de banco de dados, o Atlas Stream Processing gerencia vários tipos de evento de dados e fornece funcionalidades estendidas de processamento de dados. Para saber mais sobre esse recurso, consulte Atlas Stream Processing na documentação do MongoDB Atlas.
Dados de amostra
Os exemplos neste guia usam a collection sample_restaurants.restaurants dos conjuntos de dados de amostra do Atlas. Para saber como criar um cluster MongoDB Atlas gratuito e carregar os conjuntos de dados de amostra, consulte Introdução ao PyMongo.
Abrir um fluxo de alterações
Para abrir um fluxo de alteração, chame o método watch() . A instância na qual você chama o método watch() determina o escopo de eventos que o change stream escuta. Você pode chamar o método watch() nas seguintes classes:
MongoClient: Para monitorar todas as alterações no sistema MongoDBDatabase: Para monitorar alterações em todas as coleções no banco de dadosCollection: Para monitorar alterações na coleção
O exemplo a seguir abre um fluxo de alteração na coleção restaurants e gera alterações à medida que ocorrem. Selecione a aba Synchronous ou Asynchronous para ver o código correspondente:
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch() as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch() as stream: async for change in stream: print(change)
Para começar a observar as alterações, execute o aplicação. Em seguida, em um aplicação ou shell separado, modifique a coleção restaurants. O exemplo a seguir atualiza um documento com um valor de campo name de Blarney Castle. Selecione a aba Synchronous ou Asynchronous para ver o código correspondente:
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = collection.update_one(query_filter, update_operation)
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = await collection.update_one(query_filter, update_operation)
Quando você atualiza a coleção, o aplicativo de fluxo de alterações imprime a alteração conforme ela ocorre. O evento de alteração impresso é semelhante ao seguinte:
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
Modificar a saída change stream
Você pode passar o parâmetro pipeline para o método watch() para modificar a saída do change stream. Esse parâmetro permite que você observe somente eventos de alteração especificados. Formate o parâmetro como uma lista de objetos que cada representa um estágio de agregação.
Você pode especificar os seguintes estágios no parâmetro pipeline :
$addFields$match$project$replaceRoot$replaceWith$redact$set$unset
O exemplo a seguir usa o parâmetro pipeline para abrir um fluxo de alterações que registra somente operações de atualização. Selecione a aba Synchronous ou Asynchronous para ver o código correspondente:
change_pipeline = { "$match": { "operationType": "update" }}, with collection.watch(pipeline=change_pipeline) as stream: for change in stream: print(change)
change_pipeline = { "$match": { "operationType": "update" }}, async with await collection.watch(pipeline=change_pipeline) as stream: async for change in stream: print(change)
Para saber mais sobre como modificar a saída do change stream, consulte a seção Modificar a saída do change stream no manual do MongoDB Server .
Modificar Comportamento do watch()
O método watch() aceita parâmetros opcionais, que representam opções que você pode utilizar para configurar a operação. Se você não especificar nenhuma opção, o driver não personalizará a operação.
A tabela seguinte descreve as opções que você pode definir para personalizar o comportamento de watch():
Propriedade | Descrição |
|---|---|
| Uma lista de aggregation pipeline stages que modificam a saída do change stream. |
| Especifica se o documento completo deve ser mostrado após a alteração, em vez de mostrar apenas as alterações feitas no documento. Para saber mais sobre essa opção, consulte Incluir pré-imagens e pós-imagens. |
| Especifica se o documento completo deve ser mostrado como estava antes da alteração, em vez de mostrar apenas as alterações feitas no documento. Para saber mais sobre essa opção, consulte Incluir pré-imagens e pós-imagens. |
| Direciona |
| Direciona |
| Direciona |
| A quantidade máxima de tempo, em milissegundos, o servidor aguarda novas alterações de dados para relatar ao cursor do fluxo de alterações antes de retornar um lote vazio. Padrão para 1000 milissegundos. |
| A partir do MongoDB Server v6.0, Os fluxos de alterações oferecem suporte a notificações de alteração para eventos de Linguagem de Definição de Dados (DDL), como os eventos |
| O número máximo de eventos de alteração a serem retornados em cada lote da resposta do MongoDB cluster. |
| O agrupamento a ser usado para o cursor do change stream. |
| Uma instância de |
| Um comentário a ser anexado à operação. |
Incluir pré-imagens e pós-imagens
Importante
Você pode habilitar pré-imagens e pós-imagens em collections somente se seu sistema usar MongoDB v6.0 ou posterior.
Por padrão, quando você executa uma operação em uma collection, o evento de alteração correspondente inclui somente o delta dos campos modificados por essa operação. Para ver o documento completo antes ou depois de uma alteração, especifique os parâmetros full_document_before_change ou full_document no método watch() .
A pré-imagem é a versão completa de um documento antes de uma alteração. Para incluir a pré-imagem no evento de fluxo de alteração, defina o parâmetro full_document_before_change para um dos seguintes valores:
whenAvailable: o evento de alteração inclui uma pré-imagem do documento modificado para eventos de alteração somente se a pré-imagem estiver disponível.required: o evento de alteração inclui uma pré-imagem do documento modificado para eventos de alteração. Se a pré-imagem não estiver disponível, o driver gerará um erro.
A pós-imagem é a versão completa de um documento após uma alteração. Para incluir a pós-imagem no evento de fluxo de alteração, defina o parâmetro full_document para um dos seguintes valores:
updateLookup: o evento de alteração inclui uma cópia de todo o documento alterado de algum tempo após a alteração.whenAvailable: O evento de alteração inclui uma pós-imagem do documento modificado para eventos de alteração somente se a pós-imagem estiver disponível.required: o evento de alteração inclui uma pós-imagem do documento modificado para eventos de alteração. Se a pós-imagem não estiver disponível, o driver gerará um erro.
O exemplo a seguir chama o método watch() em uma coleção e inclui a pós-imagem de documentos atualizados especificando o parâmetro fullDocument . Selecione a aba Synchronous ou Asynchronous para ver o código correspondente:
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch(full_document='updateLookup') as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch(full_document='updateLookup') as stream: async for change in stream: print(change)
Com o aplicativo change stream em execução, atualizar um documento na collection restaurants usando o exemplo de atualização anterior imprime um evento de alteração semelhante ao seguinte:
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens', 'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'}, 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
Para saber mais sobre pré e pós-imagens, consulte Change Streams com pré e pós-imagens de documentos no manual do MongoDB Server .
Informações adicionais
Para saber mais sobre fluxos de alterações, consulte Change Streams de alterações no manual do MongoDB Server .
Documentação da API
Para saber mais sobre qualquer um dos métodos ou tipos discutidos neste guia, consulte a seguinte documentação da API: