'match' not operating as expected within watch(pipeline) functionality on nested fields

Hi,

I’m trying to set up a listener to a Mongo DB collection using the inbuilt change streams functionality. I’m coming into issues when I set up my watch on the collection when filtering for nested fields. I’m using PyMongo and my Mongo DB is hosted on Atlas.

Here is my code…

import pymongo

client = pymongo.MongoClient(connection_uri)

pipeline = [
    {
        '$match': {
            '$or': [
                {'updateDescription.updatedFields.base_name': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status': {'$exists': True}},
                {'updateDescription.updatedFields.industries': {'$exists': True}},
                {'updateDescription.updatedFields.country': {'$exists': True}},
            ]
        }
    }
]

# Set up the change stream
change_stream = client.change_stream_db.change_stream_collection.watch(pipeline)

# Iterate through the change stream and print each change
for change in change_stream:
    print(change)

Essentially I want to look for updates in the base_name, countries, industries & meta_data.status fields. base_name & countries are simple strings, but industries is a field with an array object, and meta_data is a dictionary type object, with the nested field of status. So far, my watch(pipeline) picks up all updates to countries & base_name, but it’s not able to pick up changes to the more complex fields of industries and meta_data.status.

I have confirmed that the change to those fields is happening. When I remove all filters on what i’m matching - i.e. watch(), and update those complex fields (industries & meta_data.status) the update flows through in the change streams print. But when I filter for them in the match command within the pipeline object, they do not.

I regularly use dot notation with find() and everything seems to work fine (i.e. find(meta_data.status:‘active’) works fine) - but it’s not working in this watch(pipeline) i’ve specified above. Is there a way to do this with dot notation, or do I have to use the from bson.son import SON module? if so, how do I do this?

Any advice would be greatly appreciated. Thanks!

This is an example of when I just have watch() setup and I change the meta_data.status field…

{'_id': {'_data': '82642A99A8000000072B022C0100296E5A10048332797CF83843B4B13C30209360370E463C5F6964003C7279786A74676E677376000004'}, 'operationType': 'update', 'clusterTime': Timestamp(1680513448, 7), 'ns': {'db': 'change_stream_db', 'coll': 'change_stream_collection'}, 'documentKey': {'_id': 'ryxjtgngsv'}, 'updateDescription': {'updatedFields': {'meta_data.status': 'finished_updated'}, 'removedFields': [], 'truncatedArrays': []}}

When I have the filter in match, there is nothing printed to the console.

Hi @Paul_Chynoweth and welcome to the community forum!!

Based on the above pipeline shared, I tried with the following sample data:

replset [direct: primary] test> db.CS.find()
[
  {
    _id: ObjectId("642d0f15d064d6c29aba53e1"),
    updateDescription: {
      updatedFields: {
        base_name: 'Acme Corporation',
        meta_data: {
          status: 'active',
          created_at: ISODate("2022-01-01T00:00:00.000Z")
        },
        industries: [ 'technology', 'manufacturing' ],
        country: 'USA'
      }
    }
  }
]

and with the following change stream code in python

import pymongo
from pymongo import MongoClient
conn = pymongo.MongoClient("localhost:8000")
db = conn["test"]
collection = db["CS"]
cursor = db.CS.watch()
for change1 in cursor:
     print(change)

and I was able to see the changes in the change stream for all updates on all fields.

However,

Can you share your sample document which could give more clarity to understand the issue further.

Regards
Aasawari

Hi Aasawari,

Thanks for helping out & yes certainly. Here is a sample document…

{
    "_id": "pnsfnrkoth",
    "base_name": "mjzzbvxqgk",
    "country": "ktybyozypo",
    "meta_data": {
        "cleaning_required": true,
        "status": "",
        "status_domain_keyword_search_scraper": "in_process",
        "status_domain_scraper": "finished",
        "status_domain_search": "finished"
    },
    "industries": [
        "industry3",
        "industry1"
    ]
}

Let me know if that helps you to further understand/debug.

And yeah when I run collection.watch() it picks up all changes, including changes to meta_data.status or industries. But when I filter specifically for those updated in the pipeline, I don’t see any change stream events coming through.

Thanks,
Paul

@Aasawari (Forgot to tag you directly)

Hi @Paul_Chynoweth

Based on the sample data and the match pipeline shared, the fields industries and country are not nested fields and hence the dot notation is not applicable to them.
Can you confirm if I am missing something to understand it correctly.

I tried to use my sample data mentioned above, with the following following update query :

db.CS.updateOne( { _id: ObjectId("642d0f15d064d6c29aba53e1") }, { $set: { "meta_data.status": "Non active" } })

and I was able to see the changes in the change stream as:

{‘_id’: {‘_data’: ‘8264351662000000012B022C0100296E5A1004239EF740FA834282B31EF07C8525DCCE46645F69640064642D0F15D064D6C29ABA53E10004’}, ‘operationType’: ‘update’, ‘clusterTime’: Timestamp(1681200738, 1), ‘wallTime’: datetime.datetime(2023, 4, 11, 8, 12, 18, 682000), ‘ns’: {‘db’: ‘test’, ‘coll’: ‘CS’}, ‘documentKey’: {‘_id’: ObjectId(‘642d0f15d064d6c29aba53e1’)}, ‘updateDescription’: {‘updatedFields’: {‘meta_data’: {‘status’: ‘Non active’}}, ‘removedFields’: , ‘truncatedArrays’: }}

with the following python code:

pipelineX = [
      {"$match": {
            "$or": [
                {"updateDescription.updatedFields.base_name": {"$exists": True}},
                {"updateDescription.updatedFields.meta_data.status": {"$exists": True}},
                {"updateDescription.updatedFields.industries": {"$exists": True}},
                {"updateDescription.updatedFields.country": {"$exists": True}}
            ]
        }}
    ]

cursor = db.CS.watch(pipeline=pipelineX)

for change1 in cursor:
        print(change1)

Can you help me with your update command and the sample document shared.

Regards
Aasawari

Hi @Paul_Chynoweth,

I guess my response from earlier today didn’t post… In the pipeline, you’re using $or operator to match updates on any of the specified fields. However, when using $or operator, you need to wrap each condition in a separate dictionary object. Here’s how the pipeline should look:

pipeline = [
    {
        '$match': {
            '$or': [
                {'updateDescription.updatedFields.base_name': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status': {'$exists': True}},
                {'updateDescription.updatedFields.industries': {'$exists': True}},
                {'updateDescription.updatedFields.country': {'$exists': True}},
            ]
        }
    }
]

Regarding your issue with not being able to pick up changes to the industries and meta_data.status fields, I suspect it could be due to the way you’re specifying the nested fields in the pipeline.

For the industries field, you’re using dot notation to specify the nested field. However, since industries is an array field, you need to use the array field syntax to match updates to this field. Here’s how you can modify the pipeline to match updates to the industries field:

pipeline = [
    {
        '$match': {
            '$or': [
                {'updateDescription.updatedFields.base_name': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status': {'$exists': True}},
                {'updateDescription.updatedFields.industries.0': {'$exists': True}},
                {'updateDescription.updatedFields.country': {'$exists': True}},
            ]
        }
    }
]

For the meta_data.status field, the dot notation should work fine as long as the nested field exists. If it still doesn’t work, you can try specifying the nested field using the array field syntax as well. Here’s how you can modify the pipeline to match updates to the meta_data.status field:

pipeline = [
    {
        '$match': {
            '$or': [
                {'updateDescription.updatedFields.base_name': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status.0': {'$exists': True}},
                {'updateDescription.updatedFields.country': {'$exists': True}},
            ]
        }
    }
]

I hope this helps. Let me know if you have any further questions or if you’re still having issues. Also, the below overall would correct your pipeline as it adds your missing fields, and generally may actually solve your issue entirely on its own.


pipeline = [
    {
        '$match': {
            '$or': [
                {'updateDescription.updatedFields.base_name': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status': {'$exists': True}},
                {'updateDescription.updatedFields.industries': {'$exists': True}},
                {'updateDescription.updatedFields.country': {'$exists': True}},
                {'updateDescription.updatedFields.industries': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status': {'$exists': True}},
            ]
        }
    }
]

A lot of the other stuff isn’t really necessary, it kind of bloats what you’re doing.

Hi @Aasawari ,

Thanks for your response. I have a similar update command in pymongo:

client.change_stream_db.change_stream_collection.update_one({"_id": "bshkdhnicr"}, {"$set": {"meta_data.status": "Non active"}})

But under this pipeline (as suggested by @Brock )

pipeline = [
    {
        '$match': {
            '$or': [
                {'updateDescription.updatedFields.base_name': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status': {'$exists': True}},
                {'updateDescription.updatedFields.industries': {'$exists': True}},
                {'updateDescription.updatedFields.country': {'$exists': True}},
                {'updateDescription.updatedFields.industries.0': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status.0': {'$exists': True}},
            ]
        }
    }
]

There is still no change stream event. When I clear the pipeline and have the watch listening for all changes and I run the same update command, I get an event flowing through the change stream.

I have recorded my struggles here in this loom:
https://www.loom.com/share/f818b30f739c4a17b88430fee6d2054f
(I’ll delete this loom in a few days for security reasons. There is no credentials or anything but conscious of having a video on a discussion forum)

@Brock Thanks for providing advice. I couldn’t quite work out what the difference between your first code block and mine, but I then tried with your suggestion of adding in the .0 after meta_data.status and the updated events still didn’t flow through.

Hi @Paul_Chynoweth,

It looks like you are experiencing issues with the change stream in MongoDB. Based on the code you have provided, it seems like you are attempting to update a document and expecting a change stream event to be triggered, but the event is not being received.

One thing to check is whether the change stream is actually set up correctly. You can try inserting a new document and see if the change stream event is triggered for that insert. If the insert event is being received but the update event is not, then it’s likely an issue with the update operation.

Another thing to check is the filter for the change stream. In your pipeline, you are filtering for updates to specific fields, but if the update is not modifying those fields, then the event will not be triggered. You can try removing the filter to see if the update event is being received at all.

Here is an example of setting up a change stream in Python using the pymongo driver:

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db = client["mydatabase"]
collection = db["mycollection"]

pipeline = [{'$match': {'operationType': 'update'}}]

with collection.watch(pipeline) as stream:
    for change in stream:
        print(change)

This code sets up a change stream on the mycollection collection and listens for update events. If an update event is received, it will be printed to the console.

I hope this helps you in troubleshooting your issue. Let me know if you have any further questions!

Thanks @Brock ,

So basically starting from scratch I have this code here:

# Connect to Mongo
client = pymongo.MongoClient(mongo_connection_uri_dev)
mongodb_db = client[mongo_database_name]
mongodb_collection = mongodb_db[mongo_collection_name]

change_stream = client.change_stream_db.change_stream_collection.watch()

# Iterate through the change stream and print each change
for change in change_stream:
    print(change)

Now when I insert this document…

client.change_stream_db.change_stream_collection.insert_one(
    {
        "_id": "pwdhlhfjok",
        "base_name": "zlskzvugui",
        "country": "vebeonoigb",
        "meta_data": {
            "cleaning_required": False,
            "status": "sdm_triggered",
            "status_domain_keyword_search_scraper": "finished",
            "status_domain_scraper": "finished",
            "status_domain_search": "finished",
        },
        "industries": ["industry2"],
    }
)

I receive the insert event in my stream :white_check_mark: This is shown below…

{'_id': {'_data': '826437B02F000000062B022C0100296E5A10048332797CF83843B4B13C30209360370E463C5F6964003C707764686C68666A6F6B000004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1681371183, 6), 'wallTime': datetime.datetime(2023, 4, 13, 7, 33, 3, 69000), 'fullDocument': {'_id': 'pwdhlhfjok', 'base_name': 'zlskzvugui', 'country': 'vebeonoigb', 'meta_data': {'cleaning_required': False, 'status': 'sdm_triggered', 'status_domain_keyword_search_scraper': 'finished', 'status_domain_scraper': 'finished', 'status_domain_search': 'finished'}, 'industries': ['industry2']}, 'ns': {'db': 'change_stream_db', 'coll': 'change_stream_collection'}, 'documentKey': {'_id': 'pwdhlhfjok'}}

So that’s all good. Now keeping my change stream to pick up all events (client.change_stream_db.change_stream_collection.watch() ) When I update the field meta_data.status I receive the below event from the change stream…

# Code to update the above document...
client.change_stream_db.change_stream_collection.update_one({"_id": "pwdhlhfjok"}, {"$set": {"meta_data.status": "On Hold"}})

# Change Stream event fired...
{
    "_id": {
        "_data": "826437B1860000002E2B022C0100296E5A10048332797CF83843B4B13C30209360370E463C5F6964003C707764686C68666A6F6B000004"
    },
    "operationType": "update",
    "clusterTime": Timestamp(1681371526, 46),
    "wallTime": datetime.datetime(2023, 4, 13, 7, 38, 46, 786000),
    "ns": {"db": "change_stream_db", "coll": "change_stream_collection"},
    "documentKey": {"_id": "pwdhlhfjok"},
    "updateDescription": {
        "updatedFields": {"meta_data.status": "On Hold"},
        "removedFields": [],
        "truncatedArrays": [],
    },
}

So I can confirm that when client.change_stream_db.change_stream_collection.watch() is set up, I receive change stream events when documents are inserted :white_check_mark: and when nested fields (i.e. meta_data.status) are updated :white_check_mark: .

But things change when I add in my pipeline filter object…

# Connect to Mongo
client = pymongo.MongoClient(mongo_connection_uri_dev)
mongodb_db = client[mongo_database_name]
mongodb_collection = mongodb_db[mongo_collection_name]

# Set up the change stream
pipeline = [
    {
        '$match': {
            '$or': [
                {'updateDescription.updatedFields.base_name': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data.status': {'$exists': True}},
                {'updateDescription.updatedFields.meta_data': {'$exists': True}},
                {'updateDescription.updatedFields.industries': {'$exists': True}},
                {'updateDescription.updatedFields.country': {'$exists': True}},
                {'updateDescription.updatedFields.industries.0': {'$exists': True}}, # I have tried removing this too
                {'updateDescription.updatedFields.meta_data.status.0': {'$exists': True}}, # i have tried removing this too
            ]
        }
    }
]
change_stream = client.change_stream_db.change_stream_collection.watch(pipeline)

# Iterate through the change stream and print each change
for change in change_stream:
    print(change)

Ok so now I try to update the same document and the same field meta_data.status with the below code…

client.change_stream_db.change_stream_collection.update_one({"_id": "pwdhlhfjok"}, {"$set": {"meta_data.status": "Active"}})

And I receive no change event.

So, I then test to see if I change a simple field like base_name which is a simple string, and I receive a change stream event…

# update to base name
client.change_stream_db.change_stream_collection.update_one({"_id": "pwdhlhfjok"}, {"$set": {"base_name": "Base Name 1"}})

# Event printed in Change Stream...
{
    "_id": {
        "_data": "826437B2FF000000112B022C0100296E5A10048332797CF83843B4B13C30209360370E463C5F6964003C707764686C68666A6F6B000004"
    },
    "operationType": "update",
    "clusterTime": Timestamp(1681371903, 17),
    "wallTime": datetime.datetime(2023, 4, 13, 7, 45, 3, 265000),
    "ns": {"db": "change_stream_db", "coll": "change_stream_collection"},
    "documentKey": {"_id": "pwdhlhfjok"},
    "updateDescription": {
        "updatedFields": {"base_name": "Base Name 1"},
        "removedFields": [],
        "truncatedArrays": [],
    },
}

So when I have the pipeline filter it works for simple fields like base_name (and I’ve tested this on country which is also a simple string field) and the pipeline filter works :white_check_mark: . BUT when I update complex fields (meta_data.status or industries - which is an array) the filter doesn’t pick it up :x: . There must be something within the pipeline filter for those nested fields. What’s strange is when i don’t have the pipeline filters, and just have the change_stream = client.change_stream_db.change_stream_collection.watch() It has no problem picking up a change event when an update to the meta_data.status field happens.

TL;DR

  • change_stream = client.change_stream_db.change_stream_collection.watch() picks up inserts & updates to regular fields :white_check_mark:
  • change_stream = client.change_stream_db.change_stream_collection.watch() picks up updates to nested/complex fields (meta_data.status, industries) :white_check_mark:
  • change_stream = client.change_stream_db.change_stream_collection.watch(pipeline) picks up updates to regular fields :white_check_mark:
  • change_stream = client.change_stream_db.change_stream_collection.watch(pipeline) Does not pick up updates to nested/complex fields (meta_data.status, industries) :x:

Let me know if that’s not clear! Happy to provide more updates :pray:

Hey @Aasawari I might tag you directly here again too. Any advice or help would be amazing!

Alright, let’s kick this up a notch… You can wrap the with statement in a try block and catch any exceptions that are raised. You can also use the logging module to log any errors or events that occur.

Here’s some code that includes error handling and logging, hopefully this will pull whatever it is that’s going on or at least get visibility of the issue:

import pymongo
import logging

# Set up logging
logging.basicConfig(filename='change_stream.log', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')

# Connect to MongoDB
client = pymongo.MongoClient("mongodb://localhost:27017/")

# Select the database and collection
db = client["mydatabase"]
collection = db["mycollection"]

# Set up the change stream
pipeline = [
    {
        '$match': {
            'updateDescription.updatedFields': {
                '$or': [
                    {'base_name': {'$exists': True}},
                    {'meta_data.status': {'$exists': True}},
                    {'meta_data': {'$exists': True}},
                    {'industries': {'$exists': True}},
                    {'country': {'$exists': True}},
                    {'industries.0': {'$exists': True}},
                    {'meta_data.status.0': {'$exists': True}},
                ]
            }
        }
    }
]

try:
    with collection.watch(pipeline) as stream:
        for change in stream:
            logging.info(f"Received change event: {change}")
            # Do something with the change event
except pymongo.errors.PyMongoError as e:
    logging.error(f"Encountered error: {e}")

I modified the code to look for errors. The logs will be written to a file named change_stream.log in the current directory. You can change the filename and log location to wherever you want to.

NOTE: You should also handle any errors that occur when connecting to the MongoDB server or selecting the database and collection. These can catch these by using the pymongo.errors.PyMongoError exception.

By doing this, we can see if there’s any errors. Which there has to be something going on that we aren’t seeing…

To ensure it works correctly, I suggest the following steps:

  1. Verify that the pipeline filter object is constructed correctly and all the required fields are included. One way to do this is to use the MongoDB Compass GUI to create and test the pipeline filter object.

  2. Ensure that the MongoDB server version supports Change Streams and that the necessary permissions are granted for the user account used to connect to the server.

  3. Check if there are any errors or exceptions in the Python console when running the code.

  4. Try to use a different filter criteria or change the data in the database to see if the change stream events are being captured correctly.

  5. If the problem persists, try to recreate the code in a clean environment to eliminate any potential issues with dependencies or configurations.

There’s definitely something not being seen that’s going on here, because using the scripts to make similar metadata on a local MDB it’s working just fine… Please verify steps 1 thru 5 to make sure we aren’t missing something.

Hi @Brock,

Thanks for the advice. I think i’ve narrowed it down to a potential syntax problem with Pymongo/Pipeline object - but i’m still not sure. Your point 1 seems to be where the problem is.

Essentially from your numbered dot points:

  • 2. Ensure that the MongoDB server version supports Change Streams and that the necessary permissions are granted for the user account used to connect to the server. → I’m running version 6.0.5 and the credentials used are Atlas Admin - so should be no problems there.
  • 3. Check if there are any errors or exceptions in the Python console when running the code. → There were no errors or relevant info in the logging statements when updates were made to meta_data.status or industries. There were info statements logged with updates to base_name and country.
  • 4. Try to use a different filter criteria or change the data in the database to see if the change stream events are being captured correctly. → I tried on a new database and a new cluster. I added new events and updated them. Same issues presented themselves. I have played around with the filter criteria quite a bit.
  • 5. If the problem persists, try to recreate the code in a clean environment to eliminate any potential issues with dependencies or configurations. → I have not tried this yet.

But your suggestion to use the Compass GUI (I used the Atlas gui instead) revealed something to me (point 1 of your message). Firstly when I tried to run in the Aggregation tab…

{
  $or:[{base_name:"ewuvebqyoh"}, {meta_data.status:"active"}]
}
I received a syntax related error...

So, wrapping the meta_data.status in quotes the filtering now works…

But why i’m still unsure that this is the key problem boils down to two things:

  1. When I tried wrapping the meta_data.status in quotes within my pymongo script - I still didn’t receive a change event (and no errors or logging info) when an update to the meta_data.status field occurred. I can confirm that the update did occurr on the Mongo DB collection & that when I removed the filtering in the pipeline (so collection.watch() ) the update event flowed through.

Note: I’ve also tried simplifying the pipeline object to not include so many fields (i.e. just meta_data.status or just meta_data.status & country for example)
Code:

# Connect to Mongo
client = pymongo.MongoClient(mongo_connection_uri_dev)
mongodb_db = client[mongo_database_name]
mongodb_collection = mongodb_db[mongo_collection_name]

pipeline = [
    {
        "$match": {
            "$or": [
                {"operationType": "insert"},
                {"updateDescription.updatedFields.meta_data": {"$exists": True}},
                {"updateDescription.updatedFields.industries.0": {"$exists": True}},
                {"updateDescription.updatedFields.meta_data.status": {"$exists": True}},
                {"updateDescription.updatedFields.'meta_data.status'": {"$exists": True}},
                {"updateDescription.updatedFields.base_name": {"$exists": True}},
                {"updateDescription.updatedFields.'meta_data.status_domain_scraper'": {"$exists": True}},
                {"updateDescription.updatedFields.industries": {"$exists": True}},
                {"updateDescription.updatedFields.country": {"$exists": True}}
            ]
        }
    }
]

logging.basicConfig(filename='change_stream.log', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')


try:
    with mongodb_collection.watch(pipeline) as stream:
        for change in stream:
            print(change)
            logging.info(f"Received change event: {change}")
            # Do something with the change event
except pymongo.errors.PyMongoError as e:
    logging.error(f"Encountered error: {e}")
  1. The other problem field i’ve been experiencing (mentioned above) is the industries field which is an array. When I filter for that in the aggregation pipeline, it seems to work fine in the Atlas gui - but i’m still not receiving any events in the above code when I update one element of the array. Again i’ve tested removing the pipeline object and just doing collection.watch() and any update to the industries array does flow through. So definitely something to do with the filtering.

If it was a syntax error, then I would expect to see similar issues in the Atlas gui when I look for changes to the industries field, since in my pymongo script i’m not seeing any change stream events when that field is updated.

Do you think this syntax error could all be related somehow?

Thanks again for the help. This is a serious head scratcher for me.

This is actually quite bizarre, my on premise deployment of 6.0.5 it’s working without the modification you had to make, but yes, in Atlas GUI I’m having to make the same change you are.

But also, something that’s odd is I’m not getting errors either with dummy data I made for this either, this is either a bug, or syntax problem we’re not seeing. But something is executing, it’s going through but not generating what we’re wanting it to.

Can I get some kind of sample data to compare to? To my knowledge the Atlas shouldn’t be different from on premise, because it’s just a gateway GUI to MongoDB behind the dashboard. There shouldn’t be any differences or any reasons to have to modify from one platform to another.

But yeah, the pipeline is executing, but it’s not generating anything and I didn’t realize that because when I executed it, there were no errors pulled up so just called it good. But yes, I’m realizing it’s not showing things either and I’m wondering if it’s config setup at this point, but then Atlas shouldn’t be having config issues or any other services in the way of the data populating.

Overall, there absolutely should be a visible change event, and you’re right, it’s not populating one.

@Paul_Chynoweth Let’s take a different approach on this, I think we should try to isolate some things to process of elimination and see if it’s a cause for this behavior.

Because I think we have a conflict with meta_data.status field, as I’m realizing we have both “updateDescription.updatedFields.meta_data.status” and “updateDescription.updatedFields.‘meta_data.status’” that’s being considered in the filter criteria, and I’m kind of wondering if this is why we are seeing this issue now.

Right now I’m trying to figure out a good way to just eliminate one or the other of these without breaking the pipeline, but I’m really thinking this might be the problem. So I’m thinking eliminate one or the other and see if that’s what we need to do now.

Because I strongly believe now it’s just not sorting/filtering this right for the output, so it’s not making the change possibly, this needs more testing but I do think we’re close. As far as why on prem doesn’t throw errors or Atlas throwing errors for this, which tells me it is in fact doing something, just isn’t doing what we want it to do. But syntactically is correct otherwise we should get errors.

Filtering for just the meta data.

 const pipeline = [
   {
     $match: {
       operationType: "update",
       "updateDescription.updatedFields.meta_data.status": { $exists: true }
     }
   }
];

For some reason MongoDB connector in VS code removes the quotes, the deployment via GitHub to atlas isn’t throwing back errors, I’m waiting for the change event though.

I can check for Industries too, as a control variable to see if there’s a difference by doing:

const pipeline = [
  {
    $match: {
      operationType: "update",
      "updateDescription.updatedFields.industries": { $exists: true }
    }
  }
];

What I’d like to do, is see which field individually isn’t populating what we’re wanting it to populate.

Thanks @Brock , it does look more and more likely that it is some sort of syntax error or some sort of issue in the filtering process.

So with this document here…

{
    "_id": "pnsfnrkoth",
    "base_name": "mjzzbvxqgk",
    "country": "ktybyozypo",
    "meta_data": {
        "cleaning_required": true,
        "status": "",
        "status_domain_keyword_search_scraper": "in_process",
        "status_domain_scraper": "finished",
        "status_domain_search": "finished"
    },
    "industries": [
        "industry3",
        "industry1"
    ]
}

I’m going to make 3 changes to the document using your pipeline code you sent through. I’ll make an update to base_name, meta_data.status & industries

Change to Base Name

Lisenting Code
import os

import pymongo
from bson.json_util import dumps
import logging


# === Connect to MongoDB
mongo_connection_uri_dev = os.environ["MONGO_CONNECTION_URI_DEV"]
mongo_database_name = os.environ["MONGO_DATABASE"]
mongo_collection_name = os.environ["MONGO_COLLECTION_NAME"]



# Connect to Mongo
client = pymongo.MongoClient(mongo_connection_uri_dev)
mongodb_db = client[mongo_database_name]
mongodb_collection = mongodb_db[mongo_collection_name]

# Set up the change stream
logging.basicConfig(filename='change_stream.log', level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')

# pipeline object
pipeline = [
  {
    "$match": {
      "operationType": "update",
      "updateDescription.updatedFields.base_name": { "$exists": True }
    }
  }
];


try:
    with mongodb_collection.watch(pipeline) as stream:
        for change in stream:
            print(change)
            logging.info(f"Received change event: {change}")
except pymongo.errors.PyMongoError as e:
    logging.error(f"Encountered error: {e}")
Update Code
connection_uri = mongo_connection_uri_dev
client = pymongo.MongoClient(connection_uri)
mongodb_db = client["change_stream_db"]
mongodb_collection = mongodb_db["change_stream_collection"]
mongodb_collection.update_one({"_id": "pnsfnrkoth"}, {"$set": {"base_name": "demo"}})
Atlas Screenshot

Change Stream Event
{'_id': {'_data': '82643EA4C0000000192B022C0100296E5A10048332797CF83843B4B13C30209360370E463C5F6964003C706E73666E726B6F7468000004'
    }, 'operationType': 'update', 'clusterTime': Timestamp(1681827008,
    25), 'wallTime': datetime.datetime(2023,
    4,
    18,
    14,
    10,
    8,
    122000), 'ns': {'db': 'change_stream_db', 'coll': 'change_stream_collection'
    }, 'documentKey': {'_id': 'pnsfnrkoth'
    }, 'updateDescription': {'updatedFields': {'base_name': 'demo'
        }, 'removedFields': [], 'truncatedArrays': []
    }
}

All good there.

Change to meta_data.status

Lisenting Code

Same code just different pipeline object…

# pipeline object
pipeline = [
  {
    "$match": {
      "operationType": "update",
      "updateDescription.updatedFields.meta_data.status": { "$exists": True }
    }
  }
]
Update Code
mongodb_collection.update_one({"_id": "pnsfnrkoth"}, {"$set": {"meta_data.status": "demo"}})
Atlas Screenshot

Change Stream Event

No change stream event recorded.

I tried again with this listening code…(updating the meta_data.status to demo2)

# pipeline object
pipeline = [
  {
    "$match": {
      "operationType": "update",
      "updateDescription.updatedFields.'meta_data.status'": { "$exists": True }
    }
  }
]

But still no luck.
image

Change to Industries

Lisenting Code
pipeline = [
  {
    "$match": {
      "operationType": "update",
      "updateDescription.updatedFields.industries": { "$exists": True }
    }
  }
]
Update Code
mongodb_collection.update_one({"_id": "pnsfnrkoth"}, {"$set": {"industries.0": "demo"}})
Atlas Screenshot

Change Stream Event

No change stream event.

So it doesn’t look like those pipeline changes are making any different unfortunately.

I can confirm that when I just have

"$match": {
      "operationType": "update",
    }

It picks up all of the updates (including updates to meta_data.status & to industries). I sent an update to industries when it was the above match/pipeline and received this change stream event…

{'_id': {'_data': '82643EA7FF0000002F2B022C0100296E5A10048332797CF83843B4B13C30209360370E463C5F6964003C706E73666E726B6F7468000004'}, 'operationType': 'update', 'clusterTime': Timestamp(1681827839, 47), 'wallTime': datetime.datetime(2023, 4, 18, 14, 23, 59, 552000), 'ns': {'db': 'change_stream_db', 'coll': 'change_stream_collection'}, 'documentKey': {'_id': 'pnsfnrkoth'}, 'updateDescription': {'updatedFields': {'industries.0': 'demo industry'}, 'removedFields': [], 'truncatedArrays': []}}

So I tried again with this pipeline…

# listener
pipeline = [
  {
    "$match": {
      "operationType": "update",
      "updateDescription.updatedFields.industries.0": { "$exists": True }
    }
  }
]

# update
 mongodb_collection.update_one({"_id": "pnsfnrkoth"}, {"$set": {"industries.0": "demo industry a"}})

And there was no change stream event printed to the console.

So from my perspective it’s something to do with…

updateDescription.updatedFields.

and filtering on that object when there is nested objects (i.e. meta_data.status or industries or industries.0)

I hope that gives you more data and more context, and allows you to rule out that theory!

THANK YOU SO MUCH!!! I’M AN IDIOT!!!

Try this:

pipeline = [
    {
        "$match": {
            "operationType": "update",
            "$or": [
                {"updateDescription.updatedFields.base_name": {"$exists": True}},
                {"updateDescription.updatedFields.meta_data.status": {"$exists": True}},
                {"updateDescription.updatedFields.industries": {"$exists": True}}
            ]
        }
    }
]

I think we needed the $or,

I’m a dufus, sorry about that! I forgot to throw in $or into the mix, the importance of $or is that it’s an operator to match updates to any of the fields. I didn’t catch until you mentioned that.

Also, someone worth stalking for Aggregations and Pipelines, is a guy named Adam Harrison. The dude is extremely knowledgeable on these things. I use a lot of his stuff to figure out aggregation and pipeline issues.

That said, also going by MongoDB Aggregation Pipeline | MongoDB and MongoDB: The Definitive Guide: Powerful and Scalable Data Storage ISBN13: 978-1491954461 which grossly needs to be updated for the new upcoming 7.0 since it’s presently on 4.2…

I think the following may also be a good way to go about troubleshooting the change streams if this $or doesn’t fix this. Because otherwise, invoking what we have shouldn’t be having issues from the documentation I’ve been reading, unless there’s something benign I’m glossing over. I really don’t see how there could be any issues like are being described.

Because the change stream is executing, but not doing anything which is the weird thing. If this persists even after the following example it’s definitely support ticket worthy.

const MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb+srv://<username>:<password>@<cluster-address>/test?retryWrites=true&w=majority';
const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });

client.connect(err => {
  const collection = client.db("test").collection("testColl");
  const changeStream = collection.watch();

  changeStream.on("change", function(change) {
    console.log("A document has been " + change.operationType + "d.");
    console.log("Updated fields: ", change.updateDescription.updatedFields);
  });
});

If it is something benign, then it definitely needs to go into changes to the docs to reflect it because otherwise this is crazy.

But what I"m trying to do with the above, is make something give an output of some kind, but I’m really edging on the fact it may just be that I’m an idiot and completely forgot the $or.

Hey @Brock,

To be honest I don’t think you’re an idiot at all, thanks for helping with this.

But i’m still unsure of one thing. So bringing in the $or operator means that i pick up all updates OR updates to that specific field. What i’m ultimately after is updates to specific field. I’d like my pipeline to pick up only updates to the meta_data.status field - and not all updates. If I add in the OR operator, it will pick up all updates regardless of which field is being updated.

Also, if it was just the $or, then we would see the same probelm we see for meta_data.status as we would for the simple base_name. But this pipeline below worked perfectly well in picking up updates to base_name (and it didn’t have the $or operator):

pipeline = [
  {
    "$match": {
      "operationType": "update",
      "updateDescription.updatedFields.base_name": { "$exists": True }
    }
  }
];

So unless i’m missing something, I don’t think having the $or solves the issue as to why it works for a simple field like base_name ("updateDescription.updatedFields.base_name") but doesn’t work for a nested field like meta_data.status ("updateDescription.updatedFields.meta_data.status").

Let me know if i’m missing something!

Thanks again,
Paul

@Paul_Chynoweth that logic is sound actually, I’m honestly starting to think there might be an output being generated, but it’s not being put into view.

I honestly think this is worth opening a support ticket for the ability to see what’s going on under the hood. In Atlas GUI that in fact will not show anything, but on my local MongoDB I’m getting changes with it coming back to me.

I’ve been screensharing with about a dozen DBAs who’ve been using MDB for the last 9 to 11 years as a favor for a corporation I’m doing a quick consult gig for to change some things. and they aren’t finding anything wrong with your query.

In fact they are using almost the same pipelines for something else that they have going on (Lead said I can disclose that) and it’s very similar to yours. The only difference is everything with this company is on prem with exception to R&D labs they have for a few products to see how Atlas fairs in costs and performance vs their on prem.

Being a very large corp that handles over 300,000 concurrent players daily, globally and their last outage for on-prem was almost a year ago, so I’d imagine they have a clue of what they are doing too when it comes to these things.

This is worth opening a support ticket on, and requesting why the output isn’t generating or why changesctream isn’t happening. The TSE will be able to use internal tools to look at what’s actually happening under the hood, and why it’s not showing up in Atlas.

Hey Paul, could you tell me what version of MongoDB is running on your Atlas? You state above 6.0.5.

In a M0 Cluster so it doesn’t cost you anything, could you throw some dummy data down on a 5.X or 4.4 and tell me if you get an output?

That’s good thinking @Brock ,

I tried to create a new Cluster on version 5.

Frustratingly after it’s created it defaults back to 6…

Is that expected behaviour?

And yeah definitely seems like a support ticket. Is that something I create on my end?

Hi @Paul_Chynoweth yes, you would make it yourself. And that’s actually interesting it’s not letting you specify a version other than 6.

That’s very interesting actually.

Hey @Paul_Chynoweth did you ever get to the bottom this? I’m experiencing the same issue (v7.0.1). An unfiltered pipeline triggers a the change stream fine, but trying to match a nested field doesn’t trigger.

{ $match: {
    'updateDescription.updatedFields.subscription.enabled': { $exists: true }
}}