Docs Menu
Docs Home
/ /

Monitorear datos con flujos de cambios

En esta guía, aprenderá a usar un flujo de cambios para supervisar los cambios en sus datos en tiempo real. Un flujo de cambios es una función de MongoDB Server que permite que su aplicación se suscriba a los cambios en los datos de una colección, base de datos o implementación.

Al utilizar la biblioteca PHP de MongoDB, puede llamar a la watch() Método para devolver una instancia de MongoDB\ChangeStream. Luego, puede iterar a través de la instancia MongoDB\ChangeStream para monitorear cambios en los datos, como actualizaciones, inserciones y eliminaciones.

Tip

Atlas Stream Processing

Como alternativa a los flujos de cambios, puede utilizar Atlas Stream Processing para procesar y transformar flujos de datos. A diferencia de los flujos de cambios, que solo registran eventos de la base de datos, Atlas Stream Processing gestiona múltiples tipos de eventos de datos y ofrece capacidades de procesamiento de datos ampliadas. Para obtener más información sobre esta función, consulte Procesamiento de flujo de Atlas en la documentación de MongoDB Atlas.

Los ejemplos de esta guía utilizan la restaurants colección sample_restaurants de la base de datos de los conjuntos de datos de ejemplo de Atlas. Para acceder a esta colección desde su aplicación PHP, cree una MongoDB\Client instancia de que se conecte a un clúster de Atlas y asigne el siguiente valor a su $collection variable:

$collection = $client->sample_restaurants->restaurants;

Tip

Para aprender cómo crear una implementación gratuita de MongoDB y cargar los conjuntos de datos de muestra, consulte la guía de introducción a MongoDB.

Algunos ejemplos utilizan la función toJSON() para representar eventos de cambio, que son documentos BSON, como JSON extendido. Para usar esta función, pegue el siguiente código en el archivo de su aplicación:

function toJSON(object $document): string
{
return MongoDB\BSON\Document::fromPHP($document)->toRelaxedExtendedJSON();
}

Para abrir un flujo de cambios, llame al método watch(). La instancia en la que llama al método watch() determina el alcance de los eventos que supervisa el flujo de cambios. Puede llamar al método watch() en instancias de las siguientes clases:

  • MongoDB\Client:Supervisar todos los cambios en la implementación de MongoDB

  • MongoDB\Database:Monitorear los cambios en todas las colecciones de la base de datos

  • MongoDB\Collection:Monitorear cambios en la colección

El siguiente ejemplo abre un flujo de cambios en la colección restaurants y genera los cambios a medida que ocurren:

$changeStream = $collection->watch();
$changeStream->rewind();
while (true) {
$changeStream->next();
if ($changeStream->valid()) {
continue;
}
$event = $changeStream->current();
echo toJSON($event), PHP_EOL;
if ($changeStream->current()['operationType'] === 'invalidate') {
break;
}
}

Para empezar a observar los cambios, ejecute el código anterior. Luego, en un shell independiente, modifique la colección restaurants. El siguiente ejemplo actualiza un documento cuyo valor de campo name es 'Blarney Castle':

$result = $collection->updateOne(
['name' => 'Blarney Castle'],
['$set' => ['cuisine' => 'Irish']],
);

Al actualizar la colección, la aplicación de flujo de cambios imprime el cambio a medida que se produce. El evento de cambio impreso se parece al siguiente:

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
{ "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." }, "ns" : { "db" :
"sample_restaurants", "coll" : "restaurants" }, "documentKey" : { "_id" :
{ "$oid" : "..." } }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [ ], "truncatedArrays" : [ ] } }

Para modificar la salida del flujo de cambios, puede pasar etapas de pipeline en un arreglo como parámetro al método watch(). Puedes incluir las siguientes etapas en el arreglo:

  • $addFields o $set: Agrega nuevos campos a los documentos

  • $match:Filtra los documentos

  • $project: Proyecta un subconjunto de los campos del documento

  • $replaceWith o $replaceRoot: reemplaza el documento de entrada con el documento especificado

  • $redact:Restringe el contenido de los documentos

  • $unset:Elimina campos de los documentos

El siguiente ejemplo pasa una canalización que incluye la etapa $match al método watch(). Esto indica al método watch() que genere eventos solo cuando se realizan operaciones de actualización:

$pipeline = [['$match' => ['operationType' => 'update']]];
$changeStream = $collection->watch($pipeline);
$changeStream->rewind();
while (true) {
$changeStream->next();
if ($changeStream->valid()) {
continue;
}
$event = $changeStream->current();
echo toJSON($event), PHP_EOL;
if ($changeStream->current()['operationType'] === 'invalidate') {
break;
}
}

Tip

Operaciones con Constructores

Puedes usar un patrón de constructor para crear la canalización del flujo de cambios. Para obtener más información, consulta Guía deoperaciones con constructores.

Para modificar el comportamiento del método watch(), puede pasar una matriz de opciones como parámetro a watch(). La siguiente tabla describe opciones útiles que puede configurar en la matriz:

Opción
Descripción

fullDocument

Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see the Include Pre-Images and Post-Images section of this guide.

fullDocumentBeforeChange

Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images.

startAfter

Instructs watch() to start a new change stream after the operation specified in the resume token. This field allows notifications to resume after an invalidate event.
Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.
This option is mutually exclusive with resumeAfter and startAtOperationTime.

startAtOperationTime

Instructs the change stream to only provide changes that occurred at or after the specified timestamp.
This option is mutually exclusive with startAfter and resumeAfter.

collation

Sets the collation to use for the change stream cursor. To learn more, see the Collation section of this page.

Para obtener una lista completa de las watch() opciones, consulte MongoDB\Collection::watch() en la documentación de la API.

Importante

Puede habilitar imágenes previas y posteriores en colecciones solo si su implementación usa MongoDB v6.0 o posterior.

De forma predeterminada, al realizar una operación en una colección, el evento de cambio correspondiente incluye solo el delta de los campos modificados por dicha operación. Para ver el documento completo antes o después de un cambio, especifique las opciones fullDocumentBeforeChange o fullDocument en un parámetro de matriz como watch().

La preimagen es la versión completa de un documento antes de un cambio. Para incluir la preimagen en el evento de flujo de cambios, configure la fullDocumentBeforeChange opción con uno de los siguientes valores:

  • MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLEEl evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, este campo de evento de cambio tiene un valor null.

  • MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED: El evento de cambio incluye una preimagen del documento modificado para eventos de cambio. Si la preimagen no está disponible, el servidor genera un error.

La imagen posterior es la versión completa de un documento después de un cambio. Para incluirla en el evento de flujo de cambios, configure la fullDocument opción con uno de los siguientes valores:

  • MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP:El evento de cambio incluye una copia de todo el documento modificado desde algún tiempo después del cambio.

  • MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLEEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, este campo de evento de cambio tiene un valor null.

  • MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIREDEl evento de cambio incluye una imagen posterior del documento modificado. Si la imagen posterior no está disponible, el servidor genera un error.

El siguiente ejemplo llama al método watch() en una colección e incluye la imagen posterior de los documentos actualizados configurando la opción fullDocument:

$options = ['fullDocument' => MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP];
$changeStream = $collection->watch([], $options);
$changeStream->rewind();
while (true) {
$changeStream->next();
if ($changeStream->valid()) {
continue;
}
$event = $changeStream->current();
echo toJSON($event), PHP_EOL;
if ($changeStream->current()['operationType'] === 'invalidate') {
break;
}
}

Con la aplicación de flujo de cambios ejecutándose en un shell separado, actualizar un documento en la restaurants colección mediante el ejemplo de actualización anterior imprime un evento de cambio similar a la siguiente salida:

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" :
{ "$timestamp" : { ... } }, "wallTime" : { "$date" : "..." },
"fullDocument" : { "_id" : { "$oid" : "..." }, "address" : { "building" :
"202-24", "coord" : [ -73.925044200000002093, 40.559546199999999772 ], "street"
: "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens",
"cuisine" : "Irish", "grades" : [ ...], "name" : "Blarney Castle", "restaurant_id" :
"40366356" }, "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : { "$oid" : "..." } }, "updateDescription" :
{ "updatedFields" : { "cuisine" : "Irish" }, "removedFields" : [ ],
"truncatedArrays" : [ ] } }

Tip

Para obtener más información sobre imágenes previas y posteriores, consulte Flujos de cambio con imágenes previas y posteriores de documentos en el manual de MongoDB Server.

Para especificar una intercalación para su operación, pase un parámetro de matriz $options que establezca la opción collation en el método de la operación. Asigne la opción collation a una matriz que configure las reglas de intercalación.

La siguiente tabla describe los campos que se pueden configurar para la intercalación:

Campo
Descripción

locale

(Required) Specifies the International Components for Unicode (ICU) locale. For a list of supported locales, see Collation Locales and Default Parameters in the MongoDB Server manual.

Data Type: string

caseLevel

(Optional) Specifies whether to include case comparison.

When set to true, the comparison behavior depends on the value of the strength field:

- If strength is 1, the PHP library compares base
characters and case.

- If strength is 2, the PHP library compares base
characters, diacritics, other secondary differences, and case.

- If strength is any other value, this field is ignored.

When set to false, the PHP library doesn't include case comparison at strength level 1 or 2.

Data Type: bool
Default: false

caseFirst

(Optional) Specifies the sort order of case differences during tertiary level comparisons.

Data Type: string
Default: "off"

strength


Data Type: int
Default: 3

numericOrdering

(Optional) Specifies whether the driver compares numeric strings as numbers.

If set to true, the PHP library compares numeric strings as numbers. For example, when comparing the strings "10" and "2", the library uses the strings' numeric values and treats "10" as greater than "2".

If set to false, the PHP library compares numeric strings as strings. For example, when comparing the strings "10" and "2", the library compares one character at a time and treats "10" as less than "2".

For more information, see Collation Restrictions in the MongoDB Server manual.

Data Type: bool
Default: false

alternate

(Optional) Specifies whether the library considers whitespace and punctuation as base characters for comparison purposes.

Data Type: string
Default: "non-ignorable"

maxVariable

(Optional) Specifies which characters the library considers ignorable when the alternate field is set to "shifted".

Data Type: string
Default: "punct"

backwards

(Optional) Specifies whether strings containing diacritics sort from the back of the string to the front.

Data Type: bool
Default: false

Para obtener más información sobre la intercalación y los posibles valores para cada campo, consulte la entrada Intercalación en el manual de MongoDB Server.

Para obtener más información sobre los flujos de cambio, consulte Flujos de cambio en el manual de MongoDB Server.

Para aprender más sobre cualquiera de los métodos o tipos analizados en esta guía, consulta la siguiente documentación de API:

Volver

Monitoring

En esta página