Changestream event doesn't contain copy of inserted document inside fullDocument field

Mongo 4.4

According to documentation when insertion occurs the respective changestream event is supposed to contain an inserted document inside fullDocument field which is not the case for me for some reason. The same thing happens to ns field which is supposed to contain names of database and collection, and to documentKey field. The document is inserted without any issues. In another test I’ve noticed that handler tied to update operation does not fire on document update.

Database’s replica set is being run locally at localhost:27017, localhost:27020. I’ve also tried using Atlas’s sandbox cluster which gave me the same results.

Code

// ds is the connection to discord, required for doing stuff inside handlers
func iterateChangeStream(stream *mongo.ChangeStream, ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
    defer stream.Close(ctx)
    defer cancel() // for graceful crashing

    for stream.Next(ctx) {
        var event bson.M
        err := stream.Decode(&event)
        if err != nil {
            log.Print(errors.Errorf("Failed to decode event: %w\n", err))
            return
        }

        rv := reflect.ValueOf(event["operationType"]) // getting operation type
        opType, ok := rv.Interface().(string)
        if !ok {
            log.Print("String expected in operationType\n")
            return
        }
        
        // event["fullDocument"] will be empty even when handling insertion
        // models.Player is a struct representing a document of the collection
        // I'm watching over
        doc, ok := event["fullDocument"].(models.Player)
        if !ok {
            log.Print("Failed to convert document into Player type")
            return
        }
        handlerCtx := context.WithValue(ctx, "doc", doc)
        // handlerToEvent maps operationType to respective handler
        go handlerToEvent[opType](ds, handlerCtx, cancel)
    }
}

func WatchEvents(ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {

    pipeline := mongo.Pipeline{
        bson.D{{
            "$match",
            bson.D{{
                "$or", bson.A{
                    bson.D{{"operationType", "insert"}},
                    bson.D{{"operationType", "delete"}},
                    bson.D{{"operationType", "invalidate"}},
                },
            }},
        }},
    }
    // mongo instance is initialized on program startup and stored in a global variable
    opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
    stream, err := db.Instance.Collection.Watch(ctx, pipeline, opts)
    if err != nil {
        log.Panic(err)
    }
    defer stream.Close(ctx)

    iterateChangeStream(stream, ds, ctx, cancel)
}

Forgot to add that my issue might be related to: https://jira.mongodb.org/browse/SERVER-49161
If you know how to enable changestream optimization feature flag, let me know.

@Bobroslav_Zhmishenko when you say that event["fullDocument"] is empty, do you mean the value for the "fullDocument" key is empty or that the type assertion to models.Player returns “ok == false”?

If it’s the latter, I think the issue may be with the type assertion here:

doc, ok := event["fullDocument"].(models.Player)

Since the type passed to stream.Decode() is a bson.M, the decoded ChangeStream event document should be of type bson.M or bson.D, not models.Player. If you want to decode the document into a models.Player, you need to pass a struct with a type models.Player so the BSON unmarshaler knows to create that type.

For example, you could define a struct with fields that match the expected change stream document:

type CSEvent struct {
	OperationType string        `bson:"operationType"`
	FullDocument  models.Player `bson:"fullDocument"`
}

Then use that struct instead of a bson.M to call Decode() in iterateChangeStream():

var event CSEvent
err := stream.Decode(&event)
// ...

If you want to confirm you’re getting some document, you can inspect the contents and type of value of event["fullDocument"] like this:

fmt.Printf(
	"(%T): %v\n",
	event["fullDocument"],
	event["fullDocument"])
3 Likes

Thank you and sorry for late response. This topic hasn’t got any answers for a week and I forgot about it.
Yes, event["fullDocument"] is empty.
If I recall correctly ok equals false.
I’ll try what you suggested tomorrow.

It worked. Thank you a lot.

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.