A Voyage AI se une ao MongoDB para impulsionar aplicativos de AI mais precisos e confiáveis no Atlas.

Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Desenvolvedor do MongoDB
Centro de desenvolvedores do MongoDB
chevron-right
Idiomas
chevron-right
Go
chevron-right

Reagindo às mudanças do banco de dados com o MongoDB Change Streams e Go

Nic Raboy5 min read • Published Feb 01, 2022 • Updated Feb 03, 2023
MongoDBFluxos de alteraçõesGo
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
Logotipo do Início rápido em Go
Se você está acompanhando minha série de tutoriais de introdução ao Go e MongoDB, deve se lembrar que já fizemos muitas coisas até agora. Demos uma olhada em tudo, desde a interação do CRUD com o banco de dados até a modelagem de dados e muito mais. Para tentar alcançar tudo o que fizemos, você pode dar uma olhada nos seguintes tutoriais da série:
Neste tutorial, vamos explorar change stream no MongoDB e como eles podem ser úteis, todos com a linguagem de programação Go (Golang).
Antes de dar uma olhada no código, vamos dar um passo para trás e entender o que são change streams e por que muitas vezes são necessários.
Imagine este cenário, um dos muitos possíveis:
Você tem um aplicativo que envolve clientes de internet das coisas (IoT). Digamos que esse seja um aplicativo de cerca geográfica e que os clientes de IoT sejam algo que possa trigger a cerca geográfica à medida que entram e saem do alcance. Em vez de fazer com que seu aplicativo execute consultas constantemente para ver se os clientes estão ao alcance, não faria mais sentido observar em tempo real e React quando isso acontece?
Com os MongoDB change stream, você pode criar um pipeline para observar as alterações em um nível de collection, banco de dados ou sistema e escrever lógica em seu aplicativo para fazer algo à medida que os dados chegam com base no pipeline.

Criando um change stream do MongoDB em tempo real com Golang

Embora haja muitos casos de uso possíveis para change streams, continuaremos com o exemplo que estamos usando em todo o escopo desta série de primeiros passos. Vamos continuar trabalhando com dados de programas e episódios de podcast.
Vamos supor que temos o seguinte código para começar:
1package main
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "sync"
8
9 "go.mongodb.org/mongo-driver/bson"
10 "go.mongodb.org/mongo-driver/mongo"
11 "go.mongodb.org/mongo-driver/mongo/options"
12)
13
14func main() {
15 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI")))
16 if err != nil {
17 panic(err)
18 }
19 defer client.Disconnect(context.TODO())
20
21 database := client.Database("quickstart")
22 episodesCollection := database.Collection("episodes")
23}
O código acima é uma conexão muito básica a um cluster MongoDB, algo que exploramos no tutorialComo se conectar a seu cluster MongoDB com Go.
Para observar as mudanças, podemos fazer algo como o seguinte:
1episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{})
2if err != nil {
3 panic(err)
4}
O código acima observará toda e qualquer alteração nos documentos dentro da coleçãoepisodes. O resultado é um cursor sobre o qual podemos iterar indefinidamente para obter os dados à medida que eles chegam.
Podemos iterar no cursor e dar sentido aos nossos dados usando o código a seguir:
1episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{})
2if err != nil {
3 panic(err)
4}
5
6defer episodesStream.Close(context.TODO())
7
8for episodesStream.Next(context.TODO()) {
9 var data bson.M
10 if err := episodesStream.Decode(&data); err != nil {
11 panic(err)
12 }
13 fmt.Printf("%v\n", data)
14}
Se os dados chegassem, eles poderiam se parecer com o seguinte:
1map[_id:map[_data:825E4EFCB9000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E3B38511C9D4400004117E80004] clusterTime:{1582234809 1} documentKey:map[_id:ObjectID("5e3b38511c9d
24400004117e8")] fullDocument:map[_id:ObjectID("5e3b38511c9d4400004117e8") description:The second episode duration:30 podcast:ObjectID("5e3b37e51c9d4400004117e6") title:Episode #3] ns:map[coll:episodes
3db:quickstart] operationType:replace]
No exemplo acima, criei um Replace em um documento específico na collection. Além das informações sobre os dados, também receba o documento completo que inclui a alteração. Os resultados variarão dependendo do operationType que ocorrer.
Embora o código que usamos funcione bem, atualmente ele é uma operação de bloqueio. Se quisermos observar as alterações e continuar a fazer outras coisas, queremos usar uma goroutine para iterar sobre o cursor do fluxo de alterações.
Poderíamos fazer algumas alterações como esta:
1package main
2
3import (
4 "context"
5 "fmt"
6 "os"
7 "sync"
8
9 "go.mongodb.org/mongo-driver/bson"
10 "go.mongodb.org/mongo-driver/mongo"
11 "go.mongodb.org/mongo-driver/mongo/options"
12)
13
14func iterateChangeStream(routineCtx context.Context, waitGroup sync.WaitGroup, stream *mongo.ChangeStream) {
15 defer stream.Close(routineCtx)
16 defer waitGroup.Done()
17 for stream.Next(routineCtx) {
18 var data bson.M
19 if err := stream.Decode(&data); err != nil {
20 panic(err)
21 }
22 fmt.Printf("%v\n", data)
23 }
24}
25
26func main() {
27 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(os.Getenv("ATLAS_URI")))
28 if err != nil {
29 panic(err)
30 }
31 defer client.Disconnect(context.TODO())
32
33 database := client.Database("quickstart")
34 episodesCollection := database.Collection("episodes")
35
36 var waitGroup sync.WaitGroup
37
38 episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{})
39 if err != nil {
40 panic(err)
41 }
42 waitGroup.Add(1)
43 routineCtx, cancelFn := context.WithCancel(context.Background())
44 go iterateChangeStream(routineCtx, waitGroup, episodesStream)
45
46 waitGroup.Wait()
47}
Algumas coisas estão acontecer no código acima. Movemos a iteração de stream para uma função separada a ser usada em uma goroutine. No entanto, a execução do aplicativo resultaria no encerramento rápido do aplicativo, pois a funçãomain não será encerrada muito depois da criação da goroutine. Para resolver isso, estamos usando um WaitGroup. Em nosso exemplo, a funçãomain aguardará até que o WaitGroup esteja vazio e o WaitGroup só ficará vazio quando a goroutine terminar.
O uso do WaitGroup não é um requisito absoluto, pois há outras maneiras de manter o aplicativo em execução enquanto se observa as alterações. No entanto, dada a simplicidade deste exemplo, isso fazia sentido para ver qualquer alteração no fluxo.
Para evitar que a funçãoiterateChangeStreamseja executada indefinidamente, estamos criando e passando um contexto que pode ser cancelado. Embora não demonstremos o cancelamento da função, pelo menos sabemos que isso pode ser feito.

Complicando o Change Stream com o Pipeline de Agregação

No exemplo anterior, o pipeline de agregação que usamos era o mais básico possível. Em outras palavras, estávamos procurando por toda e qualquer mudança que estivesse acontecendo em nossa coleção específica. Embora isso possa ser bom em muitos cenários, você provavelmente aproveitará mais o uso de um pipeline de agregação mais bem definido.
Veja o exemplo a seguir:
1matchPipeline := bson.D{
2 {
3 "$match", bson.D{
4 {"operationType", "insert"},
5 {"fullDocument.duration", bson.D{
6 {"$gt", 30},
7 }},
8 },
9 },
10}
11
12episodesStream, err := episodesCollection.Watch(context.TODO(), mongo.Pipeline{matchPipeline})
No exemplo acima, ainda estamos observando as change stream na collectionepisodes. No entanto, desta vez, estamos observando apenas novos documentos que tenham um campoduration maior que 30. Qualquer outra operação de inserção ou outra operação de change stream não será detectada.
Os resultados do código acima, quando uma correspondência é encontrada, podem ter a seguinte aparência:
1map[_id:map[_data:825E4F03CF000000012B022C0100296E5A1004D960EAE47DBE4DC8AC61034AE145240146645F696400645E4F03A01C9D44000063CCBD0004] clusterTime:{1582236623 1} documentKey:map[_id:ObjectID("5e4f03a01c9d
244000063ccbd")] fullDocument:map[_id:ObjectID("5e4f03a01c9d44000063ccbd") description:a quick start into mongodb duration:35 podcast:1234 title:getting started with mongodb] ns:map[coll:episodes db:qui
3ckstart] operationType:insert]
Com os change stream, você terá acesso a um subconjunto do aggregation pipeline do MongoDB e seus operadores. Você pode saber mais sobre o que está disponível na documentação oficial.

Conclusão

Você acabou de ver como usar o change stream do MongoDB em um aplicativo Go usando o driver MongoDB Go. Como apontado anteriormente, o change stream facilita o React a alterações no banco de dados, na collection e no sistema sem precisar consultar constantemente o cluster. Isso permite que você planeje com eficiência os pipelines de agregação para responder à medida que acontecem em tempo real.
Se você deseja acompanhar os outros tutoriais da série de início rápido do MongoDB com Go, você os encontra abaixo:
Para encerrar a série, o próximo tutorial se concentrará em transações com o driver Go do MongoDB.

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Início rápido
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Interagir com o MongoDB em uma função Amazon Web Services Lambda usando o Go


Aug 29, 2024 | 6 min read
Tutorial

Criptografia no nível do campo do lado do cliente (CSFLE) no MongoDB com Golang


Feb 03, 2023 | 15 min read
Tutorial

Simultaneidade e fechamento gracioso do cliente MDB


Sep 04, 2024 | 5 min read
Início rápido

Transações ACID multidocumento no MongoDB com Go


Apr 03, 2024 | 6 min read
Sumário