Mongo 4.4
According to documentation when insertion occurs the respective changestream
event is supposed to contain an inserted document inside fullDocument
field which is not the case for me for some reason. The same thing happens to ns
field which is supposed to contain names of database and collection, and to documentKey
field. The document is inserted without any issues. In another test I’ve noticed that handler tied to update
operation does not fire on document update.
Database’s replica set is being run locally at localhost:27017
, localhost:27020
. I’ve also tried using Atlas’s sandbox cluster which gave me the same results.
Code
// ds is the connection to discord, required for doing stuff inside handlers
func iterateChangeStream(stream *mongo.ChangeStream, ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
defer stream.Close(ctx)
defer cancel() // for graceful crashing
for stream.Next(ctx) {
var event bson.M
err := stream.Decode(&event)
if err != nil {
log.Print(errors.Errorf("Failed to decode event: %w\n", err))
return
}
rv := reflect.ValueOf(event["operationType"]) // getting operation type
opType, ok := rv.Interface().(string)
if !ok {
log.Print("String expected in operationType\n")
return
}
// event["fullDocument"] will be empty even when handling insertion
// models.Player is a struct representing a document of the collection
// I'm watching over
doc, ok := event["fullDocument"].(models.Player)
if !ok {
log.Print("Failed to convert document into Player type")
return
}
handlerCtx := context.WithValue(ctx, "doc", doc)
// handlerToEvent maps operationType to respective handler
go handlerToEvent[opType](ds, handlerCtx, cancel)
}
}
func WatchEvents(ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
pipeline := mongo.Pipeline{
bson.D{{
"$match",
bson.D{{
"$or", bson.A{
bson.D{{"operationType", "insert"}},
bson.D{{"operationType", "delete"}},
bson.D{{"operationType", "invalidate"}},
},
}},
}},
}
// mongo instance is initialized on program startup and stored in a global variable
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
stream, err := db.Instance.Collection.Watch(ctx, pipeline, opts)
if err != nil {
log.Panic(err)
}
defer stream.Close(ctx)
iterateChangeStream(stream, ds, ctx, cancel)
}