Overview
En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en sus datos en tiempo real. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una colección, base de datos o implementación.
Al utilizar la biblioteca PHP de MongoDB, puede llamar a la watch() Método para devolver una instancia de MongoDB\ChangeStream. Luego, puede iterar a través de la instancia MongoDB\ChangeStream para monitorear cambios en los datos, como actualizaciones, inserciones y eliminaciones.
Tip
Atlas Stream Processing
Como alternativa a los flujos de cambios, puede utilizar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.
Datos de muestra
Los ejemplos de esta guía utilizan la restaurants colección sample_restaurants de la base de datos de los conjuntos de datos de ejemplo de Atlas. Para acceder a esta colección desde su aplicación PHP, cree una MongoDB\Client instancia de que se conecte a un clúster de Atlas y asigne el siguiente valor a su $collection variable:
$collection = $client->sample_restaurants->restaurants;
Tip
Para aprender cómo crear una implementación gratuita de MongoDB y cargar los conjuntos de datos de muestra, consulte la guía de introducción a MongoDB.
Algunos ejemplos utilizan la función toJSON() para representar eventos de cambio, que son documentos BSON, como JSON extendido. Para usar esta función, pegue el siguiente código en el archivo de su aplicación:
function toJSON(object $document): string { return MongoDB\BSON\Document::fromPHP($document)->toRelaxedExtendedJSON(); }
Abre un flujo de cambios
Para abrir un flujo de cambios, llame al método watch(). La instancia en la que llama al método watch() determina el alcance de los eventos que supervisa el flujo de cambios. Puede llamar al método watch() en instancias de las siguientes clases:
MongoDB\Client:Supervisar todos los cambios en la implementación de MongoDBMongoDB\Database:Monitorear los cambios en todas las colecciones de la base de datosMongoDB\Collection:Monitorear cambios en la colección
El siguiente ejemplo abre un flujo de cambios en la colección restaurants y genera los cambios a medida que ocurren:
$changeStream = $collection->watch(); $changeStream->rewind(); while (true) { $changeStream->next(); if ($changeStream->valid()) { continue; } $event = $changeStream->current(); echo toJSON($event), PHP_EOL; if ($changeStream->current()['operationType'] === 'invalidate') { break; } }
Para empezar a observar los cambios, ejecute el código anterior. Luego, en un shell independiente, modifique la colección restaurants. El siguiente ejemplo actualiza un documento cuyo valor de campo name es 'Blarney Castle':
$result = $collection->updateOne( ['name' => 'Blarney Castle'], ['$set' => ['cuisine' => 'Irish']], );
Al actualizar la colección, la aplicación de flujo de cambios imprime el cambio a medida que se produce. El evento de cambio impreso se parece al siguiente:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : { "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Modificar la salida del flujo de cambios
Para modificar la salida del flujo de cambios, puede pasar etapas de pipeline en un arreglo como parámetro al método watch(). Puedes incluir las siguientes etapas en el arreglo:
$addFieldso$set: Agrega nuevos campos a los documentos$match:Filtra los documentos$project: Proyecta un subconjunto de los campos del documento$replaceWitho$replaceRoot: reemplaza el documento de entrada con el documento especificado$redact:Restringe el contenido de los documentos$unset:Elimina campos de los documentos
El siguiente ejemplo pasa una canalización que incluye la etapa $match al método watch(). Esto indica al método watch() que genere eventos solo cuando se realizan operaciones de actualización:
$pipeline = [['$match' => ['operationType' => 'update']]]; $changeStream = $collection->watch($pipeline); $changeStream->rewind(); while (true) { $changeStream->next(); if ($changeStream->valid()) { continue; } $event = $changeStream->current(); echo toJSON($event), PHP_EOL; if ($changeStream->current()['operationType'] === 'invalidate') { break; } }
Tip
Operaciones con Constructores
Puedes usar un patrón de constructor para crear la canalización del flujo de cambios. Para obtener más información, consulta Guía deoperaciones con constructores.
Modificar watch() el comportamiento
Para modificar el comportamiento del método watch(), puede pasar una matriz de opciones como parámetro a watch(). La siguiente tabla describe opciones útiles que puede configurar en la matriz:
Opción | Descripción |
|---|---|
| Specifies whether to show the full document after the change, rather
than showing only the changes made to the document. To learn more about
this option, see the Include Pre-Images and Post-Images section of this
guide. |
| Specifies whether to show the full document as it was before the change, rather
than showing only the changes made to the document. To learn more about
this option, see Include Pre-Images and Post-Images. |
| Instructs watch() to start a new change stream after the
operation specified in the resume token. This field allows notifications to
resume after an invalidate event.Each change stream event document includes a resume token as the _id
field. Pass the entire _id field of the change event document that
represents the operation you want to resume after.This option is mutually exclusive with resumeAfter and startAtOperationTime. |
| Instructs the change stream to only provide changes that occurred at or after
the specified timestamp. This option is mutually exclusive with startAfter and resumeAfter. |
| Sets the collation to use for the change stream cursor. To learn more,
see the Collation section of this page. |
Para obtener una lista completa de las watch() opciones, consulte MongoDB\Collection::watch() en la documentación de la API.
Incluir imágenes previas y posteriores
Importante
Puede habilitar imágenes previas y posteriores en colecciones solo si su implementación usa MongoDB v6.0 o posterior.
De forma predeterminada, al realizar una operación en una colección, el evento de cambio correspondiente incluye solo el delta de los campos modificados por dicha operación. Para ver el documento completo antes o después de un cambio, especifique las opciones fullDocumentBeforeChange o fullDocument en un parámetro de matriz como watch().
La preimagen es la versión completa de un documento antes de un cambio. Para incluir la preimagen en el evento de flujo de cambios, configure la fullDocumentBeforeChange opción con uno de los siguientes valores:
MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLEEl evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, este campo de evento de cambio tiene un valornull.MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED: El evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el servidor genera un error.
La imagen posterior es la versión completa de un documento después de un cambio. Para incluirla en el evento de flujo de cambios, configure la fullDocument opción con uno de los siguientes valores:
MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP:El evento de cambio incluye una copia de todo el documento modificado desde algún tiempo después del cambio.MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLEEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, este campo de evento de cambio tiene un valornull.MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIREDEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, el servidor genera un error.
El siguiente ejemplo llama al método watch() en una colección e incluye la imagen posterior de los documentos actualizados configurando la opción fullDocument:
$options = ['fullDocument' => MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP]; $changeStream = $collection->watch([], $options); $changeStream->rewind(); while (true) { $changeStream->next(); if ($changeStream->valid()) { continue; } $event = $changeStream->current(); echo toJSON($event), PHP_EOL; if ($changeStream->current()['operationType'] === 'invalidate') { break; } }
Con la aplicación de flujo de cambios ejecutándose en un shell separado, actualizar un documento en la restaurants colección mediante el ejemplo de actualización anterior imprime un evento de cambio similar a la siguiente salida:
{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : { "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" : "202-24", "coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "cuisine" : "Irish", "grades" : [ ...], "name" : "Blarney Castle", "restaurant_id" : "40366356" }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ], "truncatedArrays" : [ ] } }
Tip
Para obtener más información sobre imágenes previas y posteriores, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual de MongoDB Server.
Intercalación
Para especificar una intercalación para su operación, pase un parámetro de matriz $options que establezca la opción collation en el método de la operación. Asigne la opción collation a una matriz que configure las reglas de intercalación.
La siguiente tabla describe los campos que se pueden configurar para la intercalación:
Campo | Descripción |
|---|---|
| (Required) Specifies the International Components for Unicode (ICU) locale. For a
list of supported locales, see Collation Locales and Default Parameters
in the MongoDB Server manual. Data Type: string |
| (Optional) Specifies whether to include case comparison. When set to true, the comparison behavior depends on the value of
the strength field:- If strength is 1, the PHP library compares basecharacters and case. - If strength is 2, the PHP library compares basecharacters, diacritics, other secondary differences, and case. - If strength is any other value, this field is ignored.When set to false, the PHP library doesn't include case comparison at
strength level 1 or 2.Data Type: boolDefault: false |
| (Optional) Specifies the sort order of case differences during tertiary
level comparisons. Data Type: stringDefault: "off" |
| (Optional) Specifies the level of comparison to perform, as defined in the
ICU documentation. Data Type: intDefault: 3 |
| (Optional) Specifies whether the driver compares numeric strings as numbers. If set to true, the PHP library compares numeric strings as numbers.
For example, when comparing the strings "10" and "2", the library uses the
strings' numeric values and treats "10" as greater than "2".If set to false, the PHP library compares numeric strings
as strings. For example, when comparing the strings "10" and "2", the library
compares one character at a time and treats "10" as less than "2".For more information, see Collation Restrictions
in the MongoDB Server manual. Data Type: boolDefault: false |
| (Optional) Specifies whether the library considers whitespace and punctuation as base
characters for comparison purposes. Data Type: stringDefault: "non-ignorable" |
| (Optional) Specifies which characters the library considers ignorable when
the alternate field is set to "shifted".Data Type: stringDefault: "punct" |
| (Optional) Specifies whether strings containing diacritics sort from the back of the string
to the front. Data Type: boolDefault: false |
Para obtener más información sobre la intercalación y los posibles valores para cada campo, consulte la entrada Intercalación en el manual de MongoDB Server.
Información Adicional
Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB Server.
Documentación de la API
Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API: