Reagindo às mudanças do banco de dados com o MongoDB Change Streams e Go
Avalie esse Início rápido

Se você está acompanhando minha série de tutoriais de introdução ao Go e MongoDB, deve se lembrar que já fizemos muitas coisas até agora. Demos uma olhada em tudo, desde a interação do CRUD com o banco de dados até a modelagem de dados e muito mais. Para tentar alcançar tudo o que fizemos, você pode dar uma olhada nos seguintes tutoriais da série:
Neste tutorial, vamos explorar change stream no MongoDB e como eles podem ser úteis, todos com a linguagem de programação Go (Golang).
Antes de dar uma olhada no código, vamos dar um passo para trás e entender o que são change streams e por que muitas vezes são necessários.
Imagine este cenário, um dos muitos possíveis:
Você tem um aplicativo que envolve clientes de internet das coisas (IoT). Digamos que esse seja um aplicativo de cerca geográfica e que os clientes de IoT sejam algo que possa trigger a cerca geográfica à medida que entram e saem do alcance. Em vez de fazer com que seu aplicativo execute consultas constantemente para ver se os clientes estão ao alcance, não faria mais sentido observar em tempo real e React quando isso acontece?
Com os MongoDB change stream, você pode criar um pipeline para observar as alterações em um nível de collection, banco de dados ou sistema e escrever lógica em seu aplicativo para fazer algo à medida que os dados chegam com base no pipeline.
Embora haja muitos casos de uso possíveis para change streams, continuaremos com o exemplo que estamos usando em todo o escopo desta série de primeiros passos. Vamos continuar trabalhando com dados de programas e episódios de podcast.
Vamos supor que temos o seguinte código para começar:
1 package main 2 3 import ( 4 "context" 5 "fmt" 6 "os" 7 "sync" 8 9 "go.mongodb.org/mongo-driver/bson" 10 "go.mongodb.org/mongo-driver/mongo" 11 "go.mongodb.org/mongo-driver/mongo/options" 12 ) 13 14 func main() { 15 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI"))) 16 if err != nil { 17 panic(err) 18 } 19 defer client.Disconnect(context.TODO()) 20 21 database := client.Database("quickstart") 22 episodesCollection := database.Collection("episodes") 23 }
O código acima é uma conexão muito básica a um cluster MongoDB, algo que exploramos no tutorialComo se conectar a seu cluster MongoDB com Go.
Para observar as mudanças, podemos fazer algo como o seguinte:
1 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{}) 2 if err != nil { 3 panic(err) 4 }
O código acima observará toda e qualquer alteração nos documentos dentro da coleção
episodes
. O resultado é um cursor sobre o qual podemos iterar indefinidamente para obter os dados à medida que eles chegam.Podemos iterar no cursor e dar sentido aos nossos dados usando o código a seguir:
1 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{}) 2 if err != nil { 3 panic(err) 4 } 5 6 defer episodesStream.Close(context.TODO()) 7 8 for episodesStream.Next(context.TODO()) { 9 var data bson.M 10 if err := episodesStream.Decode(&data); err != nil { 11 panic(err) 12 } 13 fmt.Printf("%v\n", data) 14 }
Se os dados chegassem, eles poderiam se parecer com o seguinte:
1 map[_id:map[_data:825E4EFCB9000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E3B38511C9D4400004117E80004] clusterTime:{1582234809 1} documentKey:map[_id:ObjectID("5e3b38511c9d 2 4400004117e8")] fullDocument:map[_id:ObjectID("5e3b38511c9d4400004117e8") description:The second episode duration:30 podcast:ObjectID("5e3b37e51c9d4400004117e6") title:Episode #3] ns:map[coll:episodes 3 db:quickstart] operationType:replace]
No exemplo acima, criei um
Replace
em um documento específico na collection. Além das informações sobre os dados, também receba o documento completo que inclui a alteração. Os resultados variarão dependendo do operationType
que ocorrer.Embora o código que usamos funcione bem, atualmente ele é uma operação de bloqueio. Se quisermos observar as alterações e continuar a fazer outras coisas, queremos usar uma goroutine para iterar sobre o cursor do fluxo de alterações.
Poderíamos fazer algumas alterações como esta:
1 package main 2 3 import ( 4 "context" 5 "fmt" 6 "os" 7 "sync" 8 9 "go.mongodb.org/mongo-driver/bson" 10 "go.mongodb.org/mongo-driver/mongo" 11 "go.mongodb.org/mongo-driver/mongo/options" 12 ) 13 14 func iterateChangeStream(routineCtx context.Context, waitGroup sync.WaitGroup, stream *mongo.ChangeStream) { 15 defer stream.Close(routineCtx) 16 defer waitGroup.Done() 17 for stream.Next(routineCtx) { 18 var data bson.M 19 if err := stream.Decode(&data); err != nil { 20 panic(err) 21 } 22 fmt.Printf("%v\n", data) 23 } 24 } 25 26 func main() { 27 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI"))) 28 if err != nil { 29 panic(err) 30 } 31 defer client.Disconnect(context.TODO()) 32 33 database := client.Database("quickstart") 34 episodesCollection := database.Collection("episodes") 35 36 var waitGroup sync.WaitGroup 37 38 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{}) 39 if err != nil { 40 panic(err) 41 } 42 waitGroup.Add(1) 43 routineCtx, cancelFn := context.WithCancel(context.Background()) 44 go iterateChangeStream(routineCtx, waitGroup, episodesStream) 45 46 waitGroup.Wait() 47 }
Algumas coisas estão acontecer no código acima. Movemos a iteração de stream para uma função separada a ser usada em uma goroutine. No entanto, a execução do aplicativo resultaria no encerramento rápido do aplicativo, pois a função
main
não será encerrada muito depois da criação da goroutine. Para resolver isso, estamos usando um WaitGroup
. Em nosso exemplo, a funçãomain
aguardará até que o WaitGroup
esteja vazio e o WaitGroup
só ficará vazio quando a goroutine terminar.O uso do
WaitGroup
não é um requisito absoluto, pois há outras maneiras de manter o aplicativo em execução enquanto se observa as alterações. No entanto, dada a simplicidade deste exemplo, isso fazia sentido para ver qualquer alteração no fluxo.Para evitar que a função
iterateChangeStream
seja executada indefinidamente, estamos criando e passando um contexto que pode ser cancelado. Embora não demonstremos o cancelamento da função, pelo menos sabemos que isso pode ser feito.No exemplo anterior, o pipeline de agregação que usamos era o mais básico possível. Em outras palavras, estávamos procurando por toda e qualquer mudança que estivesse acontecendo em nossa coleção específica. Embora isso possa ser bom em muitos cenários, você provavelmente aproveitará mais o uso de um pipeline de agregação mais bem definido.
Veja o exemplo a seguir:
1 matchPipeline := bson.D{ 2 { 3 "$match", bson.D{ 4 {"operationType", "insert"}, 5 {"fullDocument.duration", bson.D{ 6 {"$gt", 30}, 7 }}, 8 }, 9 }, 10 } 11 12 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{matchPipeline})
No exemplo acima, ainda estamos observando as change stream na collection
episodes
. No entanto, desta vez, estamos observando apenas novos documentos que tenham um campoduration
maior que 30. Qualquer outra operação de inserção ou outra operação de change stream não será detectada.Os resultados do código acima, quando uma correspondência é encontrada, podem ter a seguinte aparência:
1 map[_id:map[_data:825E4F03CF000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E4F03A01C9D44000063CCBD0004] clusterTime:{1582236623 1} documentKey:map[_id:ObjectID("5e4f03a01c9d 2 44000063ccbd")] fullDocument:map[_id:ObjectID("5e4f03a01c9d44000063ccbd") description:a quick start into mongodb duration:35 podcast:1234 title:getting started with mongodb] ns:map[coll:episodes db:qui 3 ckstart] operationType:insert]
Com os change stream, você terá acesso a um subconjunto do aggregation pipeline do MongoDB e seus operadores. Você pode saber mais sobre o que está disponível na documentação oficial.
Você acabou de ver como usar o change stream do MongoDB em um aplicativo Go usando o driver MongoDB Go. Como apontado anteriormente, o change stream facilita o React a alterações no banco de dados, na collection e no sistema sem precisar consultar constantemente o cluster. Isso permite que você planeje com eficiência os pipelines de agregação para responder à medida que acontecem em tempo real.
Se você deseja acompanhar os outros tutoriais da série de início rápido do MongoDB com Go, você os encontra abaixo:
Para encerrar a série, o próximo tutorial se concentrará em transações com o driver Go do MongoDB.