Overview
En esta guía, puedes aprender a usar un flujo de cambios para supervisar los cambios en tiempo real de tus datos. Un flujo de cambios es una funcionalidad de MongoDB Server que permite que la aplicación se suscriba a cambios en los datos de una colección, base de datos o implementación.
Cuando uses la librería PHP de MongoDB, puedes llamar a la watch() método para devolver una instancia de MongoDB\ChangeStream. Luego, puedes iterar a través de la instancia de MongoDB\ChangeStream para supervisar los cambios de datos, como actualizaciones, inserciones y eliminaciones.
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puedes usar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de base de datos, Atlas Stream Processing administra múltiples tipos de eventos de datos y ofrece capacidades avanzadas de procesamiento de datos. Para aprender más información sobre esta funcionalidad, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.
Datos de muestra
Los ejemplos de esta guía utilizan la colección restaurants en la base de datos sample_restaurants de los conjuntos de datos de muestra de Atlas. Para acceder a esta colección desde tu aplicación PHP, instancia un MongoDB\Client que se conecte a un clúster Atlas y asigna el siguiente valor a tu variable $collection:
$collection = $client->sample_restaurants->restaurants;
Tip
Para saber cómo crear una implementación gratuita de MongoDB y cargar los conjuntos de datos de ejemplo, consulta la guía MongoDB Primeros Pasos.
Algunos ejemplos utilizan la función toJSON() para representar eventos de cambio, que son documentos BSON, como Extended JSON. Para usar esta función, pegue el siguiente código en su archivo de aplicación:
function toJSON(object $document): string { return MongoDB\BSON\Document::fromPHP($document)->toRelaxedExtendedJSON(); }
Abre un flujo de cambios
Para abrir un flujo de cambios, llame al método watch(). La instancia en la que llama al método watch() determina el alcance de los eventos que supervisa el flujo de cambios. Puede llamar al método watch() en instancias de las siguientes clases:
MongoDB\Client: Supervisa todos los cambios en la implementación de MongoDBMongoDB\Database:Monitorear los cambios en todas las colecciones de la base de datosMongoDB\CollectionSupervise los cambios en la colección
El siguiente ejemplo abre un flujo de cambios en la colección restaurants y genera los cambios a medida que ocurren:
$changeStream = $collection->watch(); $changeStream->rewind(); while (true) { $changeStream->next(); if ($changeStream->valid()) { continue; } $event = $changeStream->current(); echo toJSON($event), PHP_EOL; if ($changeStream->current()['operationType'] === 'invalidate') { break; } }
Para comenzar a observar cambios, ejecuta el código anterior. Luego, en una shell aparte, modifica la colección restaurants. El siguiente ejemplo actualiza un documento que tiene un valor de campo name de 'Blarney Castle':
$result = $collection->updateOne( ['name' => 'Blarney Castle'], ['$set' => ['cuisine' => 'Irish']], );
Cuando actualizas la colección, la aplicación del flujo de cambios imprime el cambio a medida que ocurre. El evento de cambio impreso se asemeja al siguiente resultado:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : { "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Modificar la salida del flujo de cambios
Para modificar la salida del flujo de cambios, puede pasar etapas de pipeline en un arreglo como parámetro al método watch(). Puedes incluir las siguientes etapas en el arreglo:
$addFieldso$set: Agrega nuevos campos a los documentos$matchFiltra los documentos$project: Proyecta un subconjunto de los campos del documento$replaceWitho$replaceRoot: Reemplaza el documento de entrada por el documento especificado$redact:Restringe el contenido de los documentos$unsetRemueve campos de documentos
El siguiente ejemplo pasa una canalización que incluye la etapa $match al método watch(). Esto indica al método watch() que genere eventos solo cuando se realizan operaciones de actualización:
$pipeline = [['$match' => ['operationType' => 'update']]]; $changeStream = $collection->watch($pipeline); $changeStream->rewind(); while (true) { $changeStream->next(); if ($changeStream->valid()) { continue; } $event = $changeStream->current(); echo toJSON($event), PHP_EOL; if ($changeStream->current()['operationType'] === 'invalidate') { break; } }
Tip
Operaciones con Constructores
Puedes utilizar un patrón Builder para crear el flujo de cambios del pipeline. Para obtener más información, consulta el Guía deoperaciones con constructores.
Modificar el comportamiento de watch()
Para modificar el comportamiento del método watch(), puede pasar una matriz de opciones como parámetro a watch(). La siguiente tabla describe opciones útiles que puede configurar en la matriz:
Opción | Descripción |
|---|---|
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see the Include Pre-Images and Post-Images section of this
guide. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Instructs watch() to start a new change stream after the
operation specified in the resume token. This field allows notifications to
resume after an invalidate event.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.This option is mutually exclusive with resumeAfter and startAtOperationTime. |
| Instructs the change stream to only provide changes that occurred at or after
the specified timestamp. This option is mutually exclusive with startAfter and resumeAfter. |
| Sets the collation to use for the change stream cursor. To learn more,
see the Collation section of this page. |
Para obtener una lista completa de las watch() opciones, consulta MongoDB\Collection::watch() en la documentación de la API.
Incluir imágenes preoperativas y postoperatorias
Importante
Puede habilitar pre imágenes y post imágenes en colecciones solo si su implementación utiliza MongoDB v6.0 o posterior.
De forma predeterminada, al realizar una operación en una colección, el evento de cambio correspondiente incluye solo el delta de los campos modificados por dicha operación. Para ver el documento completo antes o después de un cambio, especifique las opciones fullDocumentBeforeChange o fullDocument en un parámetro de matriz como watch().
La preimagen es la versión completa de un documento antes de un cambio. Para incluir la preimagen en el evento de flujo de cambios, se debe establecer la opción fullDocumentBeforeChange en uno de los siguientes valores:
MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLEEl evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, este campo de evento de cambio tiene un valornull.MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED: El evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el servidor genera un error.
La post-imagen es la versión completa de un documento después de un cambio. Para incluir la post-imagen en el evento del flujo de cambios, configure la opción fullDocument en uno de los siguientes valores:
MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP: El evento de cambio incluye una copia de todo el documento cambiado a partir de cierto tiempo después del cambio.MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLEEl evento de cambio incluye una imagen posterior del documentos modificado para eventos de cambio. Si la imagen posterior no está disponible, este campo de evento de cambio tiene un valor denull.MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIREDEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, el servidor genera un error.
El siguiente ejemplo llama al método watch() en una colección e incluye la publicación de imagen de los documentos actualizados estableciendo la opción fullDocument:
$options = ['fullDocument' => MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]; $changeStream = $collection->watch([], $options); $changeStream->rewind(); while (true) { $changeStream->next(); if ($changeStream->valid()) { continue; } $event = $changeStream->current(); echo toJSON($event), PHP_EOL; if ($changeStream->current()['operationType'] === 'invalidate') { break; } }
Con la aplicación de flujo de cambios ejecutándose en un shell separado, al actualizar un documento en la colección restaurants usando el ejemplo de actualización anterior, se imprime un evento de cambio que se asemeja a la siguiente salida:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : { "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" : "202-24", "coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "cuisine" : "Irish", "grades" : [ ...], "name" : "Blarney Castle", "restaurant_id" : "40366356" }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Tip
Para obtener más información sobre pre-imágenes y post-imágenes, consulta Change Streams con preimágenes y postimágenes de documentos en el manual de MongoDB Server.
Intercalación
Para especificar una intercalación para tu operación, pasa un parámetro arreglo $options que establezca la opción collation en el método de la operación. Asigna la opción collation a un arreglo que configure las reglas de intercalación.
La siguiente tabla describe los campos que se pueden configurar para la intercalación:
Campo | Descripción |
|---|---|
| (Required) Specifies the International Components for Unicode (ICU) locale. For a
list of supported locales, see Collation Locales and Default Parameters
in the MongoDB Server manual. Data Type: string |
| (Optional) Specifies whether to include case comparison. When set to true, the comparison behavior depends on the value of
the strength field:- If strength is 1, the PHP library compares basecharacters and case. - If strength is 2, the PHP library compares basecharacters, diacritics, other secondary differences, and case. - If strength is any other value, this field is ignored.When set to false, the PHP library doesn't include case comparison at
strength level 1 or 2.Data Type: boolDefault: false |
| (Optional) Specifies the sort order of case differences during tertiary
level comparisons. Data Type: stringDefault: "off" |
| (Optional) Specifies the level of comparison to perform, as defined in the
ICU documentation. Data Type: intDefault: 3 |
| (Optional) Specifies whether the driver compares numeric strings as numbers. If set to true, the PHP library compares numeric strings as numbers.
For example, when comparing the strings "10" and "2", the library uses the
strings' numeric values and treats "10" as greater than "2".If set to false, the PHP library compares numeric strings
as strings. For example, when comparing the strings "10" and "2", the library
compares one character at a time and treats "10" as less than "2".For more information, see Collation Restrictions
in the MongoDB Server manual. Data Type: boolDefault: false |
| (Optional) Specifies whether the library considers whitespace and punctuation as base
characters for comparison purposes. Data Type: stringDefault: "non-ignorable" |
| (Optional) Specifies which characters the library considers ignorable when
the alternate field is set to "shifted".Data Type: stringDefault: "punct" |
| (Optional) Specifies whether strings containing diacritics sort from the back of the string
to the front. Data Type: boolDefault: false |
Para obtener más información sobre la intercalación y los posibles valores de cada campo, consulta la entrada Collation en el manual del servidor de MongoDB.
Información Adicional
Para saber más sobre las change streams, consulta Change Streams en el manual de MongoDB Server.
Documentación de la API
Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API: