Visão geral
Neste guia, você pode aprender como usar um fluxo de alterações para monitorar alterações em tempo real em seu banco de dados. Um change stream é um recurso do MongoDB Server que permite que seu aplicativo se inscreva em alterações de dados em uma collection, banco de dados ou sistema.
Um fluxo de mudança gera novos eventos de mudança, fornecendo acesso a alterações de dados em tempo real. Você pode abrir um fluxo de alteração em uma coleção, banco de dados ou objeto de cliente.
Dados de amostra
Os exemplos nesta aba usam o seguinte struct Course
como um modelo para documentos na coleção courses
:
type Course struct { Title string Enrollment int32 }
Para executar os exemplos neste guia, carregue estes documentos na coleção courses
no banco de dados do db
utilizando o seguinte trecho:
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
Dica
Bancos de Dados e Coleções Inexistentes
Se o banco de dados e a collection necessários não existirem quando você executar uma operação de escrita, o servidor implicitamente os criará.
Cada documento contém uma descrição de um curso universitário que inclui o nome do curso e o número máximo de matrículas, correspondentes aos campos title
e enrollment
em cada documento.
Observação
Cada saída de exemplo mostra os valores truncados _data
, clusterTime
e ObjectID
porque o impulsionador os gera exclusivamente.
Abrir um fluxo de alterações
Você pode observar alterações no MongoDB usando o método Watch()
nos seguintes objetos:
Collection: monitore as alterações em uma collection específica
Banco de dados: Monitore alterações em todas as collections em um banco de dados
MongoClient: Monitore alterações em todos os bancos de dados
Para cada objeto, o método Watch()
abre um change stream para emitir documentos de evento de mudança quando eles ocorrem.
O método Watch()
exige um parâmetro de contexto e um parâmetro de pipeline. Para retornar todas as alterações, passe um objeto Pipeline
vazio.
O método Watch()
recebe opcionalmente uma aggregation pipeline que consiste em uma array de estágios de aggregation como o primeiro parâmetro. Os estágios de aggregation filtram e transformam os eventos de mudança.
Exemplo
O exemplo a seguir abre um fluxo de alteração na coleção courses
e imprime os eventos de fluxo de alteração conforme ocorrem:
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Se você modificar a coleção courses
em um programa ou shell separado, este código imprimirá as alterações conforme elas ocorrerem. Inserir um documento com um valor de title
de "Advanced Screenwriting"
e um valor de enrollment
de 20
resulta no seguinte evento de alteração:
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
Para ver um exemplo totalmente executável, consulte a seção Abrir um Exemplo de Fluxo de Alterações: Arquivo Completo neste guia.
Filtrar eventos de alteração
Use o parâmetro de pipeline para modificar a saída do fluxo de alteração. Esse parâmetro permite que você observe apenas determinados eventos de alteração. Formate o parâmetro pipeline como uma array de documentos, com cada documento representando um estágio de agregação.
Você pode usar os seguintes estágios de pipeline neste parâmetro:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
O exemplo seguinte abre um fluxo de alteração no banco de dados do db
, mas somente assiste para novas operações de exclusão:
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Observação
O método Watch()
foi chamado no banco de dados db
, então o código gera novas operações de exclusão em qualquer coleção dentro desse banco de dados.
Abrir um exemplo de fluxo de alteração: arquivo completo
Observação
Exemplo de configuração
Esse exemplo se conecta a uma instância do MongoDB usando um URI de conexão. Para saber mais sobre como se conectar à sua instância do MongoDB, consulte o guia Criar um MongoClient. Este exemplo também utiliza a coleção do restaurants
no banco de dados do sample_restaurants
incluído nos conjuntos de dados de amostra do Atlas. Você pode carregá-los em seu banco de dados na camada grátis do MongoDB Atlas seguindo o Guia de Introdução ao Atlas.
O exemplo a seguir abre um fluxo de alteração na coleção restaurants
e imprime documentos inseridos:
coll := client.Database("sample_restaurants").Collection("restaurants") // Creates instructions to watch for insert operations pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}} // Creates a change stream that receives change events cs, err := coll.Watch(context.TODO(), pipeline) if err != nil { panic(err) } defer cs.Close(context.TODO()) fmt.Println("Waiting For Change Events. Insert something in MongoDB!") // Prints a message each time the change stream receives an event for cs.Next(context.TODO()) { var event bson.M if err := cs.Decode(&event); err != nil { panic(err) } output, err := json.MarshalIndent(event["fullDocument"], "", " ") if err != nil { panic(err) } fmt.Printf("%s\n", output) } if err := cs.Err(); err != nil { panic(err) }
Veja um exemplo totalmente executável.
Resultado esperado
Depois de executar o exemplo completo, execute o exemplo de arquivo completo Inserir um documento em um shell diferente. Ao executar a operação de inserção, você verá uma saída semelhante a esta:
// results truncated { "_id": ..., "name": "8282", "cuisine": "Korean" }
Importante
Quando terminar de trabalhar com este exemplo de uso, certifique-se de encerrá-lo fechando o terminal.
Configurar opções de change stream
Utilize o parâmetro options
para modificar o comportamento do método Watch()
.
Você pode especificar as seguintes opções para o método Watch()
:
ResumeAfter
StartAfter
FullDocument
FullDocumentBeforeChange
BatchSize
MaxAwaitTime
Collation
StartAtOperationTime
Comment
ShowExpandedEvents
Custom
CustomPipeline
Para obter mais informações sobre essas opções, consulte o db.collection.watch() entrada no manual do servidor.
Pré-imagens e pós-imagens
Quando você executa qualquer operação CRUD em uma coleção, por padrão, o documento de evento de alteração correspondente contém somente o delta dos campos modificados pela operação. Você pode visualizar o documento completo antes e depois de uma alteração, além do delta, especificando as configurações no parâmetro options
do método Watch()
.
Se você quiser ver a pós-imagem de um documento, a versão completa do documento após uma alteração, defina o campo FullDocument
do parâmetro options
para um dos seguintes valores:
UpdateLookup
: o documento de alteração de evento inclui uma cópia de todo o documento alterado.WhenAvailable
: O documento de evento de alteração inclui uma pós-imagem do documento modificado para eventos de alteração se a pós-imagem estiver disponível.Required
: a saída é a mesma deWhenAvailable
, mas o driver gera um erro no servidor se a pós-imagem não estiver disponível.
Se você quiser ver a pré-imagem de um documento, a versão completa do documento antes de uma alteração, defina o campo FullDocumentBeforeChange
do parâmetro options
para um dos seguintes valores:
WhenAvailable
: o documento de alteração de evento inclui uma pré-imagem do documento modificado para alterar eventos se a pré-imagem estiver disponível.Required
: a saída é a mesma deWhenAvailable
, mas o driver gera um erro no servidor se a pré-imagem não estiver disponível.
Importante
Para acessar pré e pós-imagens de documento , você deve habilitar changeStreamPreAndPostImages
para a coleção. Consulte a seção Alterar fluxos do guia de comando do banco de dados collMod no manual do MongoDB Server para obter instruções e mais informações.
Observação
Não há nenhuma pré-imagem para um documento inserido e nenhuma pós-imagem para um documento excluído.
Exemplo
O exemplo a seguir chama o método Watch()
na coleção courses
. Ele especifica um valor para o campo FullDocument
do parâmetro options
para gerar uma cópia de todo o documento modificado, em vez de apenas os campos alterados:
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
Atualizar o valor de enrollment
do documento com o title
de "World
Fiction"
de 35
para 30
resulta no seguinte evento de alteração:
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
Sem especificar a opção FullDocument
, a mesma operação de atualização não gera mais o valor "fullDocument"
no documento de evento de alteração.
Informações adicionais
Para obter mais informações sobre fluxos de alterações, consulte Fluxos de alterações no manual do servidor.
Documentação da API
Para saber mais sobre o método Watch()
, consulte a seguinte documentação da API: