Definición
La $source La etapa especifica una conexión en el
Registro de Conexión para transmitir datos de. Los siguientes tipos de conexión son compatibles:
Apache Kafka corredor
flujo de cambios de colección de MongoDB
Flujo de cambios de la base de datos MongoDB
Flujo de cambios del clúster de MongoDB
AWS Kinesis flujo de datos
Matriz de documentos
Sintaxis
Apache Kafka Broker
Para operar con datos de transmisión desde un agente Apache Kafka, la $source etapa tiene la siguiente forma de prototipo:
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "schemaRegistry": { "connectionName": "<schema-registry-name>", }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
La etapa $source procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción | |
|---|---|---|---|---|
| string | Requerido | Etiqueta que identifica la conexión en el Registro de Conexión, desde donde se ingerirán los datos. | |
| string o arreglo de strings | Requerido | Nombre de uno o más temas de Apache Kafka de los que transmitir mensajes. Si desea transmitir mensajes de más de un tema, especifíquelos en un arreglo. | |
| Documento | Opcional | Documento que define una marca de tiempo autorizada para los mensajes entrantes. Si utilizas
Si no declaras un | |
| Documento | Opcional | Documento especificando la cantidad de tiempo que se permite que una partición esté inactiva antes de que se ignore en los cálculos de marcas de agua. Este campo está desactivado por defecto. Para gestionar las particiones que no avanzan debido a la inactividad, establece un valor para este campo. | |
| entero | Opcional | Número que especifica la duración del tiempo de espera inactivo de la partición. | |
| string | Opcional | Unidad de tiempo para la duración del tiempo de espera por inactividad de la partición. El valor de
| |
| Documento | Opcional | Documento que habilita el uso de un Registro de esquemas para apoyar la lectura desde una fuente serializada en Avro. Para habilitar esta función, debe crear una conexión de Registro de esquema. | |
| string | Condicional | Nombre de la conexión del Registro de esquemas que se utilizará para la deserialización de Avro. | |
| Documento | Opcional | Documento que contiene campos que sobrescriben diversos valores por defecto. | |
| string | Opcional | Especifica con qué evento en el tema de origen de Apache Kafka comenzar la ingestión.
Se establece por defecto en | |
| string | Opcional | ID del grupo de consumidores de Kafka que se asociará con el procesador de flujo. Si se omite, Atlas Stream Processing asocia el espacio de trabajo de procesamiento de flujo con un ID generado automáticamente con el siguiente formato: Atlas Stream Processing genera automáticamente un valor para este parámetro en todos los procesadores de flujos persistentes. Para los procesadores de flujo efímeros definidos con sp.process(), este parámetro solo se configura si lo define manualmente. | |
| booleano | Condicional | Indicador que determina la política de confirmaciones para los desfases de partición de los intermediarios de Kafka. Atlas Stream Processing admite dos políticas de confirmación:
Solo puedes establecer este parámetro si Para un procesador de flujos efímero definido con sp.process(), este parámetro se asigna por defecto a Para obtener más información sobre los desplazamientos al usar Kafka | |
| string | Opcional | Tipo de dato utilizado para deserializar los datos de clave de Apache Kafka. Debe ser uno de los siguientes valores:
Se establece por defecto en | |
| string | Opcional | Cómo gestionar los errores encontrados al deserializar los datos clave de Apache Kafka. Debe ser uno de los siguientes valores:
|
Nota
Atlas Stream Processing requiere que los documentos en el flujo de datos de origen sean válidos json o ejson. Atlas Stream Processing establece que los documentos que no cumplen este requisito se asignen a tu fila de letra muerta si has configurado una.
MongoDB colección Change Stream
Un flujo de cambios de la colección de Atlas permite que las aplicaciones accedan a los cambios de datos en tiempo real en una sola colección. Para aprender a abrir un flujo de cambios en una colección, consulta Change Streams.
Al utilizar un flujo $source de cambios, recomendamos configurar el clúster de origen con una ventana de registro de 24 operaciones de al menos horas.
Para operar con datos de transmisión desde un flujo de cambios de una colección Atlas, la etapa $source tiene la siguiente forma de prototipo:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "coll" : ["<source-coll>",...], "initialSync": { "enable": <boolean>, "parallelism": <integer> }, "readPreference": "<read-preference>", "readPreferenceTags": [ {"<key>": "<value>"}, . . . ] "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }], "maxAwaitTimeMS": <time-ms>, } } }
La etapa $source procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Condicional | Etiqueta que identifica la conexión en el Registro de Conexión, desde donde se ingerirán los datos. |
| Documento | Opcional | Documento que define una marca de tiempo autorizada para los mensajes entrantes. Si utilizas
Si no declaras un |
| string | Requerido | Nombre de la base de datos MongoDB alojada en la instancia de Atlas especificada por |
| string o arreglo de strings | Requerido | Nombre de una o más colecciones de MongoDB alojadas en la instancia de Atlas especificada por |
| Documento | Opcional | Documento que contiene campos relacionados con la funcionalidad de Atlas Stream Processing Si habilitas IMPORTANTE: Solo puedes utilizar |
| booleano | Condicional | Determina si se activa |
| entero | Opcional | Determina el nivel de paralelismo con el que se procesará la operación Cada procesador de flujo tiene un valor máximo de paralelismo acumulado determinado por su nivel. El paralelismo acumulativo de un procesador de flujo se calcula de la siguiente manera:
Donde Por ejemplo, si tu etapa Si un procesador de flujo supera el paralelismo acumulado máximo para su nivel, Atlas Stream Processing genera un error y le informa del nivel mínimo de procesador necesario para el nivel de paralelismo deseado. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para obtener más información, consulte Procesamiento de flujo. |
| Documento | Opcional | Preferencia de lectura para Se establece por defecto en |
| Documento | Opcional | Lea las etiquetas de preferencia de lectura para |
| Documento | Opcional | Documento que contiene campos que sobrescriben diversos valores por defecto. |
| token | Condicional | El evento de cambio después del cual la fuente comienza a reporte. Esto toma la forma de un token de currículum. Puedes utilizar solo uno de |
| timestamp | fecha | Condicional | El operation time después del cual la fuente debe comenzar a reportar. Puedes utilizar solo uno de Acepta valores MongoDB Extended JSON |
| string | Condicional | Configuración que controla si una fuente de flujo de cambios debe devolver un documento completo o sólo las modificaciones cuando se produce una actualización. Debe ser uno de los siguientes:
Para utilizar este campo con una colección de flujo de cambios, debe habilitar la Preimágenes y Postimágenes de flujo de cambios en esa colección. |
| booleano | Condicional | Configuración que controla si una fuente de flujo de cambios devuelve el documento completo del evento de cambio, incluidos todos los metadatos, o solo el contenido de Para utilizar este campo con una colección de flujo de cambios, debe habilitar la Preimágenes y Postimágenes de flujo de cambios en esa colección. |
| string | Opcional | Especifica si una fuente de flujo de cambios debe incluir el documento completo en su estado original "antes de los cambios" en la salida. Debe ser uno de los siguientes:
Si no se especifica un valor para Para utilizar este campo con una colección de flujo de cambios, debe habilitar la Preimágenes y Postimágenes de flujo de cambios en esa colección. |
| Documento | Opcional | Especifica un pipeline de agregación para filtrar la salida del flujo de cambios antes de pasarla para procesamiento adicional. Este pipeline debe respetar los parámetros que se describen en Modifica la salida del change stream. IMPORTANTE: Cada evento de cambio incluye los campos |
| entero | Opcional | Tiempo máximo, en milisegundos, para esperar a que se reporten nuevos cambios de datos al cursor del flujo de cambios antes de devolver un lote vacío. Se establece por defecto en |
Flujo de cambios del Base de datos MongoDB
Un flujo de cambios de base de datos de Atlas permite que las aplicaciones accedan a cambios de datos en tiempo real en una única base de datos. Para aprender a abrir un flujo de cambios en una base de datos, consulta Change Streams.
Al utilizar un flujo $source de cambios, recomendamos configurar el clúster de origen con una ventana de registro de 24 operaciones de al menos horas.
Para operar sobre datos en transmisión de un flujo de cambios de la base de datos Atlas, la etapa $source tiene la siguiente forma de prototipo:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
La etapa $source procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Condicional | Etiqueta que identifica la conexión en el Registro de Conexión, desde donde se ingerirán los datos. |
| Documento | Opcional | Documento que define una marca de tiempo autorizada para los mensajes entrantes. Si utilizas
Si no declaras un |
| string | Requerido | Nombre de la base de datos MongoDB alojada en la instancia de Atlas especificada por |
| Documento | Opcional | Documento que contiene campos que sobrescriben diversos valores por defecto. |
| token | Condicional | El evento de cambio después del cual la fuente comienza a reporte. Esto toma la forma de un token de currículum. Puedes utilizar solo uno de |
| fecha y hora | Condicional | El operation time después del cual la fuente debe comenzar a reportar. Puedes utilizar solo uno de Acepta valores MongoDB Extended JSON |
| string | Condicional | Configuración que controla si una fuente de flujo de cambios debe devolver un documento completo o sólo las modificaciones cuando se produce una actualización. Debe ser uno de los siguientes:
Si no especifica un valor para fullDocument, el valor predeterminado será Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos. |
| booleano | Condicional | Configuración que controla si una fuente de flujo de cambios devuelve el documento completo del evento de cambio, incluidos todos los metadatos, o solo el contenido de Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos. |
| string | Opcional | Especifica si una fuente de flujo de cambios debe incluir el documento completo en su estado original "antes de los cambios" en la salida. Debe ser uno de los siguientes:
Si no se especifica un valor para Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos. |
| Documento | Opcional | Especifica una canalización de agregación para filtrar la salida del flujo de cambios en el punto de origen. Esta canalización debe cumplir los parámetros descritos en Modificar la salida del flujo de cambios. IMPORTANTE: Cada evento de cambio incluye los campos |
| entero | Opcional | Tiempo máximo, en milisegundos, para esperar a que se reporten nuevos cambios de datos al cursor del flujo de cambios antes de devolver un lote vacío. Se establece por defecto en |
Fuente de flujo de cambios en clúster de MongoDB
Al utilizar un flujo $source de cambios, recomendamos configurar el clúster de origen con una ventana de registro de 24 operaciones de al menos horas.
Para operar con datos de transmisión de un flujo de cambios de un clúster Atlas completo, la etapa $source tiene la siguiente forma de prototipo:
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
La etapa $source procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Condicional | Etiqueta que identifica la conexión en el Registro de Conexión, desde donde se ingerirán los datos. |
| Documento | Opcional | Documento que define una marca de tiempo autorizada para los mensajes entrantes. Si utilizas
Si no declaras un |
| Documento | Opcional | Documento que contiene campos que sobrescriben diversos valores por defecto. |
| token | Condicional | El evento de cambio después del cual la fuente comienza a reporte. Esto toma la forma de un token de currículum. Puedes utilizar solo uno de |
| fecha | marca temporal | Condicional | El operation time después del cual la fuente debe comenzar a reportar. Puedes utilizar solo uno de Acepta valores MongoDB Extended JSON |
| string | Condicional | Configuración que controla si una fuente de flujo de cambios debe devolver un documento completo o sólo las modificaciones cuando se produce una actualización. Debe ser uno de los siguientes:
Si no especifica un valor para fullDocument, el valor predeterminado será Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos. |
| booleano | Condicional | Configuración que controla si una fuente de flujo de cambios devuelve el documento completo del evento de cambio, incluidos todos los metadatos, o solo el contenido de Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos. |
| string | Opcional | Especifica si una fuente de flujo de cambios debe incluir el documento completo en su estado original "antes de los cambios" en la salida. Debe ser uno de los siguientes:
Si no se especifica un valor para Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos. |
| Documento | Opcional | Especifica una canalización de agregación para filtrar la salida del flujo de cambios en el punto de origen. Esta canalización debe cumplir los parámetros descritos en Modificar la salida del flujo de cambios. Ten en cuenta que el Atlas Stream Processing espera recibir los campos |
| entero | Opcional | Tiempo máximo, en milisegundos, para esperar a que se reporten nuevos cambios de datos al cursor del flujo de cambios antes de devolver un lote vacío. Se establece por defecto en |
AWS Kinesis Data Stream
Atlas Stream Processing admite la creación de conexiones Private Link con flujos AWS Kinesis. Para obtener más información, consulte Agregar una conexión privada de Kinesis.
Para operar con datos de un flujo de datos de AWS Kinesis, la etapa $source tiene la siguiente forma de prototipo:
{ "$source": { "connectionName": "<registered-connection>", "stream": "<stream-name>", "region": "<aws-region>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<field-name>", "shardIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "consumerARN": "<aws-arn>", "initialPosition": <initial-position>, reshardDetectionIntervalSecs: <interval> } } }
La etapa $source procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Requerido | Etiqueta que identifica la conexión en el Registro de Conexiones, de la cual se obtiene la información. |
| string | Requerido | ARN correspondiente a un consumidor de Kinesis. Su consumidor debe usar Fan-out mejorado. |
| string | Requerido | AWS Kinesis flujo de datos desde el cual transmitir mensajes. |
| string | Condicional | Región deAWS donde se encuentra el flujo especificado. Kinesis admite varios flujos de datos con el mismo nombre en diferentes regiones. Si usa el mismo nombre para flujos de datos en dos o más regiones dentro de la misma conexión, debe usar este campo para especificar la combinación de nombre y región que se usará. |
| Documento | Opcional | Documento que define una marca de tiempo autorizada para los mensajes entrantes. Si utilizas
Si no declaras un |
| Documento | Opcional | Documento que especifica la cantidad de tiempo que un fragmento puede permanecer inactivo antes de que se ignore en los cálculos de marca de agua. Este campo está deshabilitado de forma predeterminada. Para gestionar los fragmentos que no avanzan debido a la inactividad, configure un valor para este campo. |
| Documento | Opcional | Número que especifica la duración del tiempo de espera de inactividad de la partición. |
| Documento | Opcional | Unidad de tiempo para la duración del tiempo de espera inactivo de la partición. El valor de
|
| Documento | Opcional | Documento que contiene campos que sobrescriben diversos valores por defecto. |
| string | Opcional | Posición en el historial del flujo de datos de Kinesis desde la que se iniciará la ingesta de mensajes. Debe ser una de las siguientes:
Se establece por defecto en |
| entero | Opcional | Intervalo, en segundos, entre chequeos de la velocidad del flujo de datos a través de tu flujo de Kinesis para fines de rebalanceo. El valor predeterminado es |
arreglo de documentos
Para operar sobre un arreglo de documentos, la etapa $source tiene el siguiente formato prototipo:
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "documents" : [{source-doc},...] | <expression> } }
La etapa $source procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| Documento | Opcional | Documento que define una marca de tiempo autorizada para los mensajes entrantes. Si utilizas
Si no declaras un |
| arreglo | Condicional | Arreglo de documentos para usar como fuente de datos en streaming. El valor de este campo puede ser un arreglo de objetos o una expresión que se evalúe como un arreglo de objetos. No utilice este campo cuando use el campo |
Comportamiento
$source debe ser la primera etapa de cualquier pipeline en el que aparezca. Solo puedes usar una $source etapa por pipeline.
Para etapas de Kafka $source, Atlas Stream Processing lee en paralelo desde múltiples particiones dentro del tema de origen. El límite de partición lo determina tu nivel de procesador. Para obtener detalles adicionales, revise la referencia de facturación de Stream Processing.
Ejemplos
Ejemplo de Kafka
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:
La etapa establece una
$sourceconexión con el agente de Apache Kafka que recopila estos informes en un temamy_weatherdatallamado, exponiendo cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndoloingestionTimeen.La etapa
$matchexcluye los documentos que tienen undewPoint.valuemenor o igual a5.0y pasa los documentos condewPoint.valuemayor que5.0a la siguiente etapa.La etapa escribe la salida en una colección de Atlas
$mergellamadastreamen lasample_weatherstreambase de datos. Si no existe dicha base de datos o colección, Atlas las crea.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.
Ejemplo de flujo de cambios
La siguiente agregación ingiere datos de la cluster0-collection fuente, que se conecta a un clúster de Atlas cargado con el conjunto de datos de muestra. Para aprender a crear un espacio de trabajo de procesamiento de flujos y agregar una conexión a un clúster de Atlas al registro de conexiones, consulte Introducción al procesamiento de flujos de Atlas. Esta agregación ejecuta dos etapas para abrir un flujo de cambios y registrar los cambios en la data colección en la sample_weatherdata base de datos:
La etapa
$sourcese conecta a la fuentecluster0-collectiony abre un flujo de cambios en la coleccióndatade la base de datossample_weatherdata.La etapa
$mergeescribe los documentos de flujo de cambios filtrados en una colección de Atlas denominadadata_changesen la base de datossample_weatherdata. Si no existe tal colección, Atlas la crea.
{ $source: { connectionName: "cluster0-connection", db : "sample_weatherdata", coll : "data" }, $merge: { into: { connectionName: "cluster0-connection", db: "sample_weatherdata", coll: "data_changes" } } }
El siguiente comando mongosh borra un documento data:
db.getSiblingDB("sample_weatherdata").data.deleteOne( { _id: ObjectId("5553a99ae4b02cf715120e4b") } )
Tras eliminar el data documento, el procesador de flujo escribe el documento de evento de flujo sample_weatherdata.data_changes de cambios en la colección. Para ver los documentos de la sample_weatherdata.data_changes colección resultante, utilice para conectarse a su clúster de Atlas y ejecute el siguiente mongosh comando:
db.getSiblingDB("sample_weatherdata").data_changes.find()
[ { _id: { _data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004' }, clusterTime: Timestamp({ t: 1738790819, i: 1 }), documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') }, ns: { db: 'sample_weatherdata', coll: 'data' }, operationType: 'delete', wallTime: ISODate('2025-02-05T21:26:59.313Z') } ]