Thanks @Brock ,
So basically starting from scratch I have this code here:
# Connect to Mongo
client = pymongo.MongoClient(mongo_connection_uri_dev)
mongodb_db = client[mongo_database_name]
mongodb_collection = mongodb_db[mongo_collection_name]
change_stream = client.change_stream_db.change_stream_collection.watch()
# Iterate through the change stream and print each change
for change in change_stream:
print(change)
Now when I insert this document…
client.change_stream_db.change_stream_collection.insert_one(
{
"_id": "pwdhlhfjok",
"base_name": "zlskzvugui",
"country": "vebeonoigb",
"meta_data": {
"cleaning_required": False,
"status": "sdm_triggered",
"status_domain_keyword_search_scraper": "finished",
"status_domain_scraper": "finished",
"status_domain_search": "finished",
},
"industries": ["industry2"],
}
)
I receive the insert event in my stream This is shown below…
{'_id': {'_data': '826437B02F000000062B022C0100296E5A10048332797CF83843B4B13C30209360370E463C5F6964003C707764686C68666A6F6B000004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1681371183, 6), 'wallTime': datetime.datetime(2023, 4, 13, 7, 33, 3, 69000), 'fullDocument': {'_id': 'pwdhlhfjok', 'base_name': 'zlskzvugui', 'country': 'vebeonoigb', 'meta_data': {'cleaning_required': False, 'status': 'sdm_triggered', 'status_domain_keyword_search_scraper': 'finished', 'status_domain_scraper': 'finished', 'status_domain_search': 'finished'}, 'industries': ['industry2']}, 'ns': {'db': 'change_stream_db', 'coll': 'change_stream_collection'}, 'documentKey': {'_id': 'pwdhlhfjok'}}
So that’s all good. Now keeping my change stream to pick up all events (client.change_stream_db.change_stream_collection.watch()
) When I update the field meta_data.status
I receive the below event from the change stream…
# Code to update the above document...
client.change_stream_db.change_stream_collection.update_one({"_id": "pwdhlhfjok"}, {"$set": {"meta_data.status": "On Hold"}})
# Change Stream event fired...
{
"_id": {
"_data": "826437B1860000002E2B022C0100296E5A10048332797CF83843B4B13C30209360370E463C5F6964003C707764686C68666A6F6B000004"
},
"operationType": "update",
"clusterTime": Timestamp(1681371526, 46),
"wallTime": datetime.datetime(2023, 4, 13, 7, 38, 46, 786000),
"ns": {"db": "change_stream_db", "coll": "change_stream_collection"},
"documentKey": {"_id": "pwdhlhfjok"},
"updateDescription": {
"updatedFields": {"meta_data.status": "On Hold"},
"removedFields": [],
"truncatedArrays": [],
},
}
So I can confirm that when client.change_stream_db.change_stream_collection.watch()
is set up, I receive change stream events when documents are inserted and when nested fields (i.e. meta_data.status
) are updated .
But things change when I add in my pipeline filter object…
# Connect to Mongo
client = pymongo.MongoClient(mongo_connection_uri_dev)
mongodb_db = client[mongo_database_name]
mongodb_collection = mongodb_db[mongo_collection_name]
# Set up the change stream
pipeline = [
{
'$match': {
'$or': [
{'updateDescription.updatedFields.base_name': {'$exists': True}},
{'updateDescription.updatedFields.meta_data.status': {'$exists': True}},
{'updateDescription.updatedFields.meta_data': {'$exists': True}},
{'updateDescription.updatedFields.industries': {'$exists': True}},
{'updateDescription.updatedFields.country': {'$exists': True}},
{'updateDescription.updatedFields.industries.0': {'$exists': True}}, # I have tried removing this too
{'updateDescription.updatedFields.meta_data.status.0': {'$exists': True}}, # i have tried removing this too
]
}
}
]
change_stream = client.change_stream_db.change_stream_collection.watch(pipeline)
# Iterate through the change stream and print each change
for change in change_stream:
print(change)
Ok so now I try to update the same document and the same field meta_data.status
with the below code…
client.change_stream_db.change_stream_collection.update_one({"_id": "pwdhlhfjok"}, {"$set": {"meta_data.status": "Active"}})
And I receive no change event.
So, I then test to see if I change a simple field like base_name
which is a simple string, and I receive a change stream event…
# update to base name
client.change_stream_db.change_stream_collection.update_one({"_id": "pwdhlhfjok"}, {"$set": {"base_name": "Base Name 1"}})
# Event printed in Change Stream...
{
"_id": {
"_data": "826437B2FF000000112B022C0100296E5A10048332797CF83843B4B13C30209360370E463C5F6964003C707764686C68666A6F6B000004"
},
"operationType": "update",
"clusterTime": Timestamp(1681371903, 17),
"wallTime": datetime.datetime(2023, 4, 13, 7, 45, 3, 265000),
"ns": {"db": "change_stream_db", "coll": "change_stream_collection"},
"documentKey": {"_id": "pwdhlhfjok"},
"updateDescription": {
"updatedFields": {"base_name": "Base Name 1"},
"removedFields": [],
"truncatedArrays": [],
},
}
So when I have the pipeline filter it works for simple fields like base_name
(and I’ve tested this on country
which is also a simple string field) and the pipeline filter works . BUT when I update complex fields (meta_data.status or industries - which is an array) the filter doesn’t pick it up . There must be something within the pipeline filter for those nested fields. What’s strange is when i don’t have the pipeline filters, and just have the change_stream = client.change_stream_db.change_stream_collection.watch()
It has no problem picking up a change event when an update to the meta_data.status field happens.
TL;DR
-
change_stream = client.change_stream_db.change_stream_collection.watch()
picks up inserts & updates to regular fields
-
change_stream = client.change_stream_db.change_stream_collection.watch()
picks up updates to nested/complex fields (meta_data.status
, industries
)
-
change_stream = client.change_stream_db.change_stream_collection.watch(pipeline)
picks up updates to regular fields
-
change_stream = client.change_stream_db.change_stream_collection.watch(pipeline)
Does not pick up updates to nested/complex fields (meta_data.status
, industries
)
Let me know if that’s not clear! Happy to provide more updates