Overview
In this guide, you can learn how to use a change stream to monitor real-time changes to your data. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.
When using the MongoDB PHP Library, you can call the watch() method to return an instance of MongoDB\ChangeStream. Then, you can iterate through the MongoDB\ChangeStream instance to monitor data changes, such as updates, insertions, and deletions.
Tip
Atlas Stream Processing
As an alternative to change streams, you can use Atlas Stream Processing to process and transform streams of data. Unlike change streams, which register only database events, Atlas Stream Processing manages multiple data event types and provides extended data processing capabilities. To learn more about this feature, see Atlas Stream Processing in the MongoDB Atlas documentation.
Sample Data
The examples in this guide use the restaurants collection in the sample_restaurants database from the Atlas sample datasets. To access this collection from your PHP application, instantiate a MongoDB\Client that connects to an Atlas cluster and assign the following value to your $collection variable:
$collection = $client->sample_restaurants->restaurants;
Tip
To learn how to create a free MongoDB deployment and load the sample datasets, see the MongoDB Get Started guide.
Some examples use the toJSON() function to represent change events, which are BSON documents, as Extended JSON. To use this function, paste the following code into your application file:
function toJSON(object $document): string { return MongoDB\BSON\Document::fromPHP($document)->toRelaxedExtendedJSON(); }
Open a Change Stream
To open a change stream, call the watch() method. The instance on which you call the watch() method determines the scope of events that the change stream monitors. You can call the watch() method on instances of the following classes:
MongoDB\Client: Monitor all changes in the MongoDB deploymentMongoDB\Database: Monitor changes in all collections in the databaseMongoDB\Collection: Monitor changes in the collection
The following example opens a change stream on the restaurants collection and outputs changes as they occur:
$changeStream = $collection->watch(); $changeStream->rewind(); while (true) { $changeStream->next(); if ($changeStream->valid()) { continue; } $event = $changeStream->current(); echo toJSON($event), PHP_EOL; if ($changeStream->current()['operationType'] === 'invalidate') { break; } }
To begin watching for changes, run the preceding code. Then, in a separate shell, modify the restaurants collection. The following example updates a document that has a name field value of 'Blarney Castle':
$result = $collection->updateOne( ['name' => 'Blarney Castle'], ['$set' => ['cuisine' => 'Irish']], );
When you update the collection, the change stream application prints the change as it occurs. The printed change event resembles the following output:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : { "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Modify the Change Stream Output
To modify the change stream output, you can pass pipeline stages in an array as a parameter to the watch() method. You can include the following stages in the array:
$addFieldsor$set: Adds new fields to documents$match: Filters the documents$project: Projects a subset of the document fields$replaceWithor$replaceRoot: Replaces the input document with the specified document$redact: Restricts the contents of the documents$unset: Removes fields from documents
The following example passes a pipeline that includes the $match stage to the watch() method. This instructs the watch() method to output events only when update operations occur:
$pipeline = [['$match' => ['operationType' => 'update']]]; $changeStream = $collection->watch($pipeline); $changeStream->rewind(); while (true) { $changeStream->next(); if ($changeStream->valid()) { continue; } $event = $changeStream->current(); echo toJSON($event), PHP_EOL; if ($changeStream->current()['operationType'] === 'invalidate') { break; } }
Tip
Operations with Builders
You can use a builder pattern to create the change stream pipeline. To learn more, see the Operations with Builders guide.
Modify watch() Behavior
To modify the behavior of the watch() method, you can pass an options array as a parameter to watch(). The following table describes useful options you can set in the array:
Option | Description |
|---|---|
| Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see the Include Pre-Images and Post-Images section of this guide. |
| Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images. |
| Instructs |
| Instructs the change stream to only provide changes that occurred at or after
the specified timestamp. |
| Sets the collation to use for the change stream cursor. To learn more, see the Collation section of this page. |
For a full list of watch() options, see MongoDB\Collection::watch() in the API documentation.
Include Pre-Images and Post-Images
Important
You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.
By default, when you perform an operation on a collection, the corresponding change event includes only the delta of the fields modified by that operation. To see the full document before or after a change, specify the fullDocumentBeforeChange or the fullDocument options in an array parameter to watch().
The pre-image is the full version of a document before a change. To include the pre-image in the change stream event, set the fullDocumentBeforeChange option to one of the following values:
MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, this change event field has anullvalue.MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the server raises an error.
The post-image is the full version of a document after a change. To include the post-image in the change stream event, set the fullDocument option to one of the following values:
MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP: The change event includes a copy of the entire changed document from some time after the change.MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLE: The change event includes a post-image of the modified document for change events. If the post-image is not available, this change event field has anullvalue.MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIRED: The change event includes a post-image of the modified document for change events. If the post-image is not available, the server raises an error.
The following example calls the watch() method on a collection and includes the post-image of updated documents by setting the fullDocument option:
$options = ['fullDocument' => MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]; $changeStream = $collection->watch([], $options); $changeStream->rewind(); while (true) { $changeStream->next(); if ($changeStream->valid()) { continue; } $event = $changeStream->current(); echo toJSON($event), PHP_EOL; if ($changeStream->current()['operationType'] === 'invalidate') { break; } }
With the change stream application running in a separate shell, updating a document in the restaurants collection by using the preceding update
example prints a change event resembling the following output:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : { "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" : "202-24", "coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "cuisine" : "Irish", "grades" : [ ...], "name" : "Blarney Castle", "restaurant_id" : "40366356" }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Tip
To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.
Collation
To specify a collation for your operation, pass an $options array parameter that sets the collation option to the operation method. Assign the collation option to an array that configures the collation rules.
The following table describes the fields you can set to configure the collation:
Field | Description |
|---|---|
| (Required) Specifies the International Components for Unicode (ICU) locale. For a
list of supported locales, see Collation Locales and Default Parameters
in the MongoDB Server manual. |
| (Optional) Specifies whether to include case comparison. |
| (Optional) Specifies the sort order of case differences during tertiary
level comparisons. |
| (Optional) Specifies the level of comparison to perform, as defined in the
ICU documentation. |
| (Optional) Specifies whether the driver compares numeric strings as numbers. |
| (Optional) Specifies whether the library considers whitespace and punctuation as base
characters for comparison purposes. |
| (Optional) Specifies which characters the library considers ignorable when
the |
| (Optional) Specifies whether strings containing diacritics sort from the back of the string
to the front. |
To learn more about collation and the possible values for each field, see the Collation entry in the MongoDB Server manual.
Additional Information
To learn more about change streams, see Change Streams in the MongoDB Server manual.
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: