Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

$source Etapa (Stream Processing)

$source

La etapa especifica una $source conexión en el Registro de conexiones para transmitir datos. Se admiten los siguientes tipos de conexión:

  • Apache Kafka broker

  • flujo de cambios de colección de MongoDB

  • Flujo de cambios de la base de datos MongoDB

  • Flujo de cambios del clúster de MongoDB

  • AWS Kinesis flujo de datos

  • Arreglo de documentos

Para operar con datos en transmisión de un broker de Apache Kafka, la etapa $source tiene la siguiente forma de prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"schemaRegistry": {
"connectionName": "<schema-registry-name>",
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Etiqueta que identifica la conexión en el Registro de conexiones, para ingerir datos de esta.

topic

string o arreglo de strings

Requerido

Nombre de uno o más temas de Apache Kafka de los que transmitir mensajes. Si desea transmitir mensajes de más de un tema, especifíquelos en un arreglo.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utilizas timeField, debes definirlo como uno de los siguientes:

  • un :expresión:$toDate que toma un campo de mensaje de origen como argumento.

  • un :expresión:$dateFromString que toma un campo de mensaje de origen como argumento.

Si no declaras un timeField, Atlas Stream Processing creará una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

partitionIdleTimeout

Documento

Opcional

Documento especificando la cantidad de tiempo que se permite que una partición esté inactiva antes de que se ignore en los cálculos de marcas de agua.

Este campo está desactivado por defecto. Para gestionar las particiones que no avanzan debido a la inactividad, establece un valor para este campo.

partitionIdleTimeout.size

entero

Opcional

Número que especifica la duración del tiempo de espera de inactividad de la partición.

partitionIdleTimeout.unit

string

Opcional

Unidad de tiempo para la duración del tiempo de espera por inactividad de la partición.

El valor de unit puede ser uno de los siguientes:

  • "ms" (milisegundo)

  • "second"

  • "minute"

  • "hour"

  • "day"

schemaRegistry

Documento

Opcional

Documento que habilita el uso de un Registro de esquemas para apoyar la lectura desde una fuente serializada en Avro.

Para habilitar esta funcionalidad, debe crear una conexión con el Registro de esquemas.

schemaRegistry.connectionName

string

Condicional

Nombre de la conexión del Registro de esquemas que se utilizará para la deserialización de Avro.

config

Documento

Opcional

Documento que contiene campos que sobrescriben diversos valores por defecto.

config.auto_offset_reset

string

Opcional

Especifica con qué evento del tema fuente Apache Kafka se iniciará la ingestión. auto_offset_reset toma los siguientes valores:

  • end, latest, o largest : para comenzar la ingesta desde el evento más reciente en el tema en el momento en que se inicializa la agregación.

  • earliest, beginning, o smallest : para comenzar la ingestión desde el evento más temprano en el tema.

Se establece por defecto en latest.

config.group_id

string

Opcional

ID del grupo de consumidores de Kafka para asociar con el procesador de flujo. Si se omite, Atlas Stream Processing asocia el espacio de trabajo de Stream Processing con una ID generada automáticamente en el siguiente formato:

asp-${streamProcessorId}-consumer

Atlas Stream Processing genera automáticamente un valor para este parámetro en todos los procesadores de flujos persistentes. Para los procesadores de flujo efímeros definidos con sp.process(), este parámetro solo se configura si lo define manualmente.

config.enable_auto_commit

booleano

Condicional

Indicador que determina la política de confirmaciones para los desfases de partición de los intermediarios de Kafka. Atlas Stream Processing admite dos políticas de confirmación:

  • Si configuras este parámetro en true, Atlas Stream Processing confirma los desplazamientos cada vez que la etapa $source pasa datos al siguiente operador.

  • Si estableces este parámetro en false, los procesadores de flujo confirman los desplazamientos de partición cuando Atlas Stream Processing realiza un punto de control.

Solo puedes establecer este parámetro si config.group_id está configurado.

Para un procesador de flujos efímero definido con sp.process(), este parámetro se asigna por defecto a false a menos que establezca group_id. De lo contrario, el valor por defecto es true.

Para obtener más información sobre los offsets cuando se utiliza Kafka como $source, consulte Fuentes de Kafka y offsets del grupo de consumidores.

config.keyFormat

string

Opcional

Tipo de dato usado para deserializar los datos de claves de Apache Kafka. Debe ser uno de los siguientes valores:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Se establece por defecto en binData.

config.keyFormatError

string

Opcional

Cómo gestionar los errores encontrados al deserializar los datos clave de Apache Kafka. Debe ser uno de los siguientes valores:

  • dlq, que guarda el documento en tu fila de letra muerta.

  • passThrough, que envía el documento a la siguiente etapa sin datos clave.

Nota

Atlas Stream Processing requiere que los documentos en el flujo de datos de origen sean válidos json o ejson. Atlas Stream Processing establece que los documentos que no cumplen este requisito se asignen a tu fila de letra muerta si has configurado una.

Un flujo de cambios de la colección de Atlas permite que las aplicaciones accedan a los cambios de datos en tiempo real en una sola colección. Para aprender a abrir un flujo de cambios en una colección, consulta Change Streams.

Al usar un flujo de cambios $source, configura el clúster de origen con una oplog window de al menos 24 horas.

Para leer el flujo de cambios, Atlas Stream Processing escanea la colección oplog. Como resultado, es posible que veas COLLSCAN advertencias en tus registros. Estas advertencias indican un comportamiento normal y no señalan un error.

Si configuras config.fullDocument o config.fullDocumentBeforeChange en required, activa changeStreamPreAndPostImages en cada colección antes de cualquier operación de guardado que desees capturar. Si una postimagen no está disponible para un evento porque no se habilitó la funcionalidad cuando ocurrió el guardado, o porque la postimagen ha caducado, el procesador de flujos falla. Para obtener instrucciones sobre cómo habilitar las preimágenes y las postimágenes, consulta Change Streams con pre/postimágenes de documentos.

Para operar sobre datos en transmisión de un flujo de cambios de una colección de Atlas, la etapa $source tiene la siguiente forma prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"initialSync": {
"enable": <boolean>,
"parallelism": <integer>
},
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}],
"maxAwaitTimeMS": <time-ms>,
}
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Condicional

Etiqueta que identifica la conexión en el Registro de conexiones, para ingerir datos de esta.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utilizas timeField, debes definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declaras un timeField, Atlas Stream Processing creará una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

db

string

Requerido

Nombre de la base de datos MongoDB alojada en la instancia de Atlas especificada por connectionName. El flujo de cambios de esta base de datos actúa como la fuente de datos de transmisión.

coll

string o arreglo de strings

Requerido

Nombre de una o más colecciones de MongoDB alojadas en la instancia de Atlas especificada por connectionName. El flujo de cambios de estas colecciones actúa como la fuente de datos de transmisión. Si omites este campo, tu procesador de flujos obtendrá los datos de un Cambio de flujo de la base de datos MongoDB.

initialSync

Documento

Opcional

Documento que contiene campos relacionados con la funcionalidad de initialSync.

Atlas Stream Processing initialSync permite ingestar documentos preexistentes en una colección Atlas como si se tratara de documentos de inserción changeEvent. Si habilitas initialSync, cuando inicies tu procesador de flujo, primero ingerirá y procesará todos los documentos existentes en la colección antes de proceder a ingerir y procesar nuevos documentos changeEvent entrantes. Una vez que el initialSync está completo, no se repite.

Si habilitas initialSync, no puedes utilizar las etapas $hoppingWindow, $sessionWindow o $tumblingWindow en tu pipeline.

IMPORTANTE: Solo puedes utilizar initialSync en colecciones donde el valor _id de los documentos entrantes sean valores ObjectId generados por defecto, u ordenados int/long. Todos los valores de _id deben ser del mismo tipo.

initialSync.enable

booleano

Condicional

Determina si se activa initialSync o no. Si declaras un campo initialSync, debes configurar este campo.

initialSync.parallelism

entero

Opcional

Determina el nivel de paralelismo con el que se procesará la operación initialSync. Si no especificas un valor, este es por defecto 1.

Cada procesador de flujo tiene un valor máximo de paralelismo acumulado determinado por su nivel. El paralelismo acumulativo de un procesador de flujo se calcula de la siguiente manera:

parallelism total - parallelized stages

Donde parallelism total es la suma de todos los valores parallelism mayores que 1 a través de las etapas $source, $lookup, y $merge, y parallelized stages es la cantidad de estas etapas con valores parallelism mayores que 1.

Por ejemplo, si tu etapa $source establece un valor parallelism de 4, tu etapa $lookup no define un valor parallelism (por lo tanto, se toma el valor por defecto 1), y tu etapa $merge define un valor parallelism de 2, entonces tiene dos parallelized stages y el paralelismo acumulativo de su procesador de flujo se calcula como (4 + 2) - 2.

Si un procesador de flujo supera el paralelismo acumulativo máximo para su nivel, Atlas Stream Processing genera un error e informa del nivel mínimo de procesador requerido para el nivel de paralelismo previsto. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para aprender más, consulte Procesamiento de flujos.

readPreference

Documento

Opcional

Preferencia de lectura para flujo de cambios y initialSync operaciones.

Se establece por defecto en primary.

readPreferenceTags

Documento

Opcional

Leer etiquetas de preferencia de lectura para el flujo de cambios y las operaciones initialSync.

config

Documento

Opcional

Documento que contiene campos que sobrescriben diversos valores por defecto.

config.startAfter

token

Condicional

El evento de cambio después del cual la fuente comienza a reporte. Esto toma la forma de un token de currículum.

Puedes utilizar solo uno de config.startAfter o config.StartAtOperationTime.

config.startAtOperationTime

timestamp | fecha

Condicional

El operation time después del cual la fuente debe comenzar a reportar.

Puedes utilizar solo uno de config.startAfter o config.StartAtOperationTime.

Acepta valores MongoDB Extended JSON $date u $timestamp.

config.fullDocument

string

Condicional

Configuración que controla si una fuente de flujo de cambios debe devolver un documento completo o sólo las modificaciones cuando se produce una actualización. Debe ser uno de los siguientes:

  • default : No devuelve un documento completo para las operaciones de update.

  • updateLookup : Devuelve únicamente los cambios al actualizar.

  • required Debe devolver un documento completo. Si no hay un documento completo disponible, no devuelve nada.

  • whenAvailable : Devuelve un documento completo cada vez que haya uno disponible, en caso contrario, devuelve los cambios.

Para utilizar este campo con una colección de flujo de cambios, debe habilitar la Preimágenes y Postimágenes de flujo de cambios en esa colección.

config.fullDocumentOnly

booleano

Condicional

Configuración que controla si una fuente de flujos de cambios devuelve todo el documento del evento de cambio, incluyendo todos los metadatos, o solo el contenido de fullDocument. Si se establece en true, la fuente devuelve únicamente el contenido de fullDocument.

Para utilizar este campo con una colección de flujo de cambios, debe habilitar la Preimágenes y Postimágenes de flujo de cambios en esa colección.

config.fullDocumentBeforeChange

string

Opcional

Especifica si una fuente de flujo de cambios debe incluir el documento completo en su estado original "antes de los cambios" en la salida. Debe ser uno de los siguientes:

  • off : Omite el campo fullDocumentBeforeChange.

  • required : Debe devolverse un documento completo en su estado previo a los cambios. Si no hay disponibilidad de un documento completo en su estado previo a los cambios, el procesador de transmisión falla.

  • whenAvailable : devuelve un documento completo en su estado previo a los cambios siempre que esté disponible; de lo contrario, omite el campo fullDocumentBeforeChange.

Si no se especifica un valor para fullDocumentBeforeChange, se usa por defecto off.

Para utilizar este campo con una colección de flujo de cambios, debe habilitar la Preimágenes y Postimágenes de flujo de cambios en esa colección.

config.pipeline

Documento

Opcional

Especifica un pipeline de agregación para filtrar la salida del flujo de cambios antes de pasarla para procesamiento adicional. Este pipeline debe respetar los parámetros que se describen en Modifica la salida del change stream.

IMPORTANTE: Cada evento de cambio incluye los campos wallTime y clusterTime. Las etapas de Atlas Stream Processing después de $source esperan recibir estos campos tal como fueron ingeridos por el procesador. Para garantizar el procesamiento adecuado de los datos de Change Stream, no modifiques estos campos en $source.config.pipeline.

config.maxAwaitTimeMS

entero

Opcional

Tiempo máximo, en milisegundos, para esperar a que se reporten nuevos cambios de datos al cursor del flujo de cambios antes de devolver un lote vacío.

Se establece por defecto en 1000.

Un flujo de cambios de base de datos de Atlas permite que las aplicaciones accedan a cambios de datos en tiempo real en una única base de datos. Para aprender a abrir un flujo de cambios en una base de datos, consulta Change Streams.

Al usar un flujo de cambios $source, configura el clúster de origen con una oplog window de al menos 24 horas.

Para leer el flujo de cambios, Atlas Stream Processing escanea la colección oplog. Como resultado, es posible que veas COLLSCAN advertencias en tus registros. Estas advertencias indican un comportamiento normal y no señalan un error.

Si configuras config.fullDocument o config.fullDocumentBeforeChange en required, activa changeStreamPreAndPostImages en cada colección antes de cualquier operación de guardado que desees capturar. Si una postimagen no está disponible para un evento porque no se habilitó la funcionalidad cuando ocurrió el guardado, o porque la postimagen ha caducado, el procesador de flujos falla. Para obtener instrucciones sobre cómo habilitar las preimágenes y las postimágenes, consulta Change Streams con pre/postimágenes de documentos.

Para operar sobre datos en transmisión de un flujo de cambios de la base de datos Atlas, la etapa $source tiene la siguiente forma de prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Condicional

Etiqueta que identifica la conexión en el Registro de conexiones, para ingerir datos de esta.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utilizas timeField, debes definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declaras un timeField, Atlas Stream Processing creará una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

db

string

Requerido

Nombre de la base de datos MongoDB alojada en la instancia de Atlas especificada por connectionName. El flujo de cambios de esta base de datos actúa como la fuente de datos de transmisión.

readPreference

Documento

Opcional

preferencia de lectura para operaciones de flujo de cambios.

Se establece por defecto en primary.

readPreferenceTags

Documento

Opcional

Lea las etiquetas de preferencia de lectura para las operaciones de flujo de cambios.

config

Documento

Opcional

Documento que contiene campos que sobrescriben diversos valores por defecto.

config.startAfter

token

Condicional

El evento de cambio después del cual la fuente comienza a reporte. Esto toma la forma de un token de currículum.

Puedes utilizar solo uno de config.startAfter o config.StartAtOperationTime.

config.startAtOperationTime

Marca de tiempo

Condicional

El operation time después del cual la fuente debe comenzar a reportar.

Puedes utilizar solo uno de config.startAfter o config.StartAtOperationTime.

Acepta valores MongoDB Extended JSON $date u $timestamp.

config.fullDocument

string

Condicional

Configuración que controla si una fuente de flujo de cambios debe devolver un documento completo o sólo las modificaciones cuando se produce una actualización. Debe ser uno de los siguientes:

  • updateLookup : Devuelve únicamente los cambios al actualizar.

  • required Debe devolver un documento completo. Si no hay un documento completo disponible, no devuelve nada.

  • whenAvailable : Devuelve un documento completo cada vez que haya uno disponible, en caso contrario, devuelve los cambios.

Si no especificas un valor para fullDocument, el valor por defecto es updateLookup.

Para usar este campo con un flujo de cambios de base de datos, debes habilitar imágenes anteriores y posteriores en cada colección de esa base de datos.

config.fullDocumentOnly

booleano

Condicional

Configuración que controla si una fuente de flujos de cambios devuelve todo el documento del evento de cambio, incluyendo todos los metadatos, o solo el contenido de fullDocument. Si se establece en true, la fuente devuelve únicamente el contenido de fullDocument.

Para usar este campo con un flujo de cambios de base de datos, debes habilitar imágenes anteriores y posteriores en cada colección de esa base de datos.

config.fullDocumentBeforeChange

string

Opcional

Especifica si una fuente de flujo de cambios debe incluir el documento completo en su estado original "antes de los cambios" en la salida. Debe ser uno de los siguientes:

  • off : Omite el campo fullDocumentBeforeChange.

  • required : Debe devolverse un documento completo en su estado previo a los cambios. Si no hay disponibilidad de un documento completo en su estado previo a los cambios, el procesador de transmisión falla.

  • whenAvailable : devuelve un documento completo en su estado previo a los cambios siempre que esté disponible; de lo contrario, omite el campo fullDocumentBeforeChange.

Si no se especifica un valor para fullDocumentBeforeChange, se usa por defecto off.

Para usar este campo con un flujo de cambios de base de datos, debes habilitar imágenes anteriores y posteriores en cada colección de esa base de datos.

config.pipeline

Documento

Opcional

Especifica un pipeline de agregación para filtrar la salida del flujo de cambios en el punto de origen. Esta pipeline debe ajustarse a los parámetros descritos en Modificar la salida del flujo de cambios.

IMPORTANTE: Cada evento de cambio incluye los campos wallTime y clusterTime. Las etapas de Atlas Stream Processing después de $source esperan recibir estos campos tal como fueron ingeridos por el procesador. Para garantizar el procesamiento adecuado de los datos de Change Stream, no modifiques estos campos en $source.config.pipeline.

config.maxAwaitTimeMS

entero

Opcional

Tiempo máximo, en milisegundos, para esperar a que se reporten nuevos cambios de datos al cursor del flujo de cambios antes de devolver un lote vacío.

Se establece por defecto en 1000.

Al usar un flujo de cambios $source, configura el clúster de origen con una oplog window de al menos 24 horas.

Para leer el flujo de cambios, Atlas Stream Processing escanea la colección oplog. Como resultado, es posible que veas COLLSCAN advertencias en tus registros. Estas advertencias indican un comportamiento normal y no señalan un error.

Si configuras config.fullDocument o config.fullDocumentBeforeChange en required, activa changeStreamPreAndPostImages en cada colección antes de cualquier operación de guardado que desees capturar. Si una postimagen no está disponible para un evento porque no se habilitó la funcionalidad cuando ocurrió el guardado, o porque la postimagen ha caducado, el procesador de flujos falla. Para obtener instrucciones sobre cómo habilitar las preimágenes y las postimágenes, consulta Change Streams con pre/postimágenes de documentos.

Para operar datos de transmisión de todo un flujo de cambio del clúster Atlas, la etapa $source tiene la siguiente forma prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Condicional

Etiqueta que identifica la conexión en el Registro de conexiones, para ingerir datos de esta.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utilizas timeField, debes definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declaras un timeField, Atlas Stream Processing creará una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

readPreference

Documento

Opcional

preferencia de lectura para operaciones de flujo de cambios.

Se establece por defecto en primary.

readPreferenceTags

Documento

Opcional

Lea las etiquetas de preferencia de lectura para las operaciones de flujo de cambios.

config

Documento

Opcional

Documento que contiene campos que sobrescriben diversos valores por defecto.

config.startAfter

token

Condicional

El evento de cambio después del cual la fuente comienza a reporte. Esto toma la forma de un token de currículum.

Puedes utilizar solo uno de config.startAfter o config.StartAtOperationTime.

config.startAtOperationTime

fecha | marca temporal

Condicional

El operation time después del cual la fuente debe comenzar a reportar.

Puedes utilizar solo uno de config.startAfter o config.StartAtOperationTime.

Acepta valores MongoDB Extended JSON $date u $timestamp.

config.fullDocument

string

Condicional

Configuración que controla si una fuente de flujo de cambios debe devolver un documento completo o sólo las modificaciones cuando se produce una actualización. Debe ser uno de los siguientes:

  • updateLookup : Devuelve únicamente los cambios al actualizar.

  • required Debe devolver un documento completo. Si no hay un documento completo disponible, no devuelve nada.

  • whenAvailable : Devuelve un documento completo cada vez que haya uno disponible, en caso contrario, devuelve los cambios.

Si no especificas un valor para fullDocument, el valor por defecto es updateLookup.

Para usar este campo con un flujo de cambios de base de datos, debes habilitar imágenes anteriores y posteriores en cada colección de esa base de datos.

config.fullDocumentOnly

booleano

Condicional

Configuración que controla si una fuente de flujos de cambios devuelve todo el documento del evento de cambio, incluyendo todos los metadatos, o solo el contenido de fullDocument. Si se establece en true, la fuente devuelve únicamente el contenido de fullDocument.

Para usar este campo con un flujo de cambios de base de datos, debes habilitar imágenes anteriores y posteriores en cada colección de esa base de datos.

config.fullDocumentBeforeChange

string

Opcional

Especifica si una fuente de flujo de cambios debe incluir el documento completo en su estado original "antes de los cambios" en la salida. Debe ser uno de los siguientes:

  • off : Omite el campo fullDocumentBeforeChange.

  • required : Debe devolverse un documento completo en su estado previo a los cambios. Si no hay disponibilidad de un documento completo en su estado previo a los cambios, el procesador de transmisión falla.

  • whenAvailable : devuelve un documento completo en su estado previo a los cambios siempre que esté disponible; de lo contrario, omite el campo fullDocumentBeforeChange.

Si no se especifica un valor para fullDocumentBeforeChange, se usa por defecto off.

Para usar este campo con un flujo de cambios de base de datos, debes habilitar imágenes anteriores y posteriores en cada colección de esa base de datos.

config.pipeline

Documento

Opcional

Especifica un pipeline de agregación para filtrar la salida del flujo de cambios en el punto de origen. Esta pipeline debe ajustarse a los parámetros descritos en Modificar la salida del flujo de cambios.

Ten en cuenta que el Atlas Stream Processing espera recibir los campos wallTime y clusterTime de cada Evento de Cambio ingerido. Para garantizar el procesamiento adecuado de los datos de Change Stream, no modifiques estos campos en $source.config.pipeline.

config.maxAwaitTimeMS

entero

Opcional

Tiempo máximo, en milisegundos, para esperar a que se reporten nuevos cambios de datos al cursor del flujo de cambios antes de devolver un lote vacío.

Se establece por defecto en 1000.

Atlas Stream Processing admite la creación de conexiones Private Link con flujos AWS Kinesis. Para obtener más información, consulte Agregar una conexión privada de Kinesis.

Para operar sobre datos de un AWS Kinesis data stream, la etapa $source tiene la siguiente forma de prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"stream": "<stream-name>",
"region": "<aws-region>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<field-name>",
"shardIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"consumerARN": "<aws-arn>",
"initialPosition": <initial-position>,
reshardDetectionIntervalSecs: <interval>
}
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Etiqueta que identifica la conexión en el Registro de Conexiones, de la cual se obtiene la información.

config.consumerARN

string

Requerido

ARN correspondiente a un consumidor de Kinesis. Su consumidor debe usar Fan-out mejorado.

stream

string

Requerido

Flujo de datos de AWS Kinesis desde el cual transmitir mensajes.

region

string

Condicional

AWS región en la que existe la transmisión especificada. Kinesis admite múltiples flujos de datos con el mismo nombre en diferentes regiones. Si usas el mismo nombre para flujos de datos en dos o más regiones dentro de la misma conexión, debes usar este campo para especificar qué combinación de nombre y región usar.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utilizas timeField, debes definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declaras un timeField, Atlas Stream Processing creará una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

shardIdleTimeout

Documento

Opcional

Documento que especifica el tiempo que se permite que una partición esté inactiva antes de que se ignore en los cálculos de marcas de agua.

Este campo está desactivado por defecto. Para gestionar las particiones que no avanzan debido a la inactividad, establece un valor para este campo.

shardIdleTimeout.size

Documento

Opcional

Número que especifica la duración del tiempo de espera de inactividad de la partición.

shardIdleTimeout.unit

Documento

Opcional

Unidad de tiempo para la duración del tiempo de espera inactivo de la partición.

El valor de unit puede ser uno de los siguientes:

  • "ms" (milisegundo)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

Documento

Opcional

Documento que contiene campos que sobrescriben diversos valores por defecto.

config.initialPosition

string

Opcional

Posición en el historial del flujo de datos de Kinesis desde la cual comenzar a ingerir mensajes. Debe ser uno de los siguientes:

  • "TRIM_HORIZON": Comienza a ingerir desde el mensaje más antiguo en la partición.

  • "LATEST": Comienza a ingerir desde el mensaje más reciente de la partición.

Se establece por defecto en "LATEST".

reshardDetectionIntervalSecs

entero

Opcional

Intervalo, en segundos, entre chequeos de la velocidad del flujo de datos a través de tu flujo de Kinesis para fines de rebalanceo.

Por defecto son 300 segundos.

Para operar sobre un arreglo de documentos, la etapa $source tiene el siguiente formato prototipo:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"documents" : [{source-doc},...] | <expression>
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utilizas timeField, debes definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declaras un timeField, Atlas Stream Processing creará una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

documents

arreglo

Condicional

Arreglo de documentos para usar como fuente de datos en streaming. El valor de este campo puede ser un arreglo de objetos o una expresión que se evalúe como un arreglo de objetos. No utilice este campo cuando use el campo connectionName.

$source debe ser la primera etapa de cualquier canalización en la que aparezca. Solo puede usar una etapa por $source canalización.

Para etapas de Kafka $source, Atlas Stream Processing lee en paralelo desde múltiples particiones dentro del tema de origen. El límite de partición lo determina tu nivel de procesador. Para obtener detalles adicionales, revise la referencia de facturación de Stream Processing.

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:

  1. La etapa $source establece una conexión con el Apache Kafka broker que recopila estos informes en un tema denominado my_weatherdata, exponiendo cada registro a medida que se procesa a las siguientes etapas de agregación. Este etapa también reemplaza el nombre del campo de marca de tiempo que proyecta, estableciéndolo en ingestionTime.

  2. La etapa $match excluye los documentos que tienen un dewPoint.value menor o igual a 5.0 y pasa los documentos con dewPoint.value mayor que 5.0 a la siguiente etapa.

  3. La fase $merge escribe la salida en una colección Atlas denominada stream en la base de datos sample_weatherstream. Si no existe tal base de datos o colección, Atlas los creará.

[{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata"
}
},
{
"$match": { "dewPoint.value": { "$gt": 5 } }
},
{
"$merge": {
"into": {
"connectionName": "weatherStreamOutput",
"db": "sample_weatherstream",
"coll": "stream"
}
}
}]

Para ver los documentos de la colección sample_weatherstream.stream resultante, conéctate a un clúster de Atlas y ejecuta el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: {
code: 'N',
period: 99.9,
quantity: '9',
value: -30.4
},
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: {
condition: '9',
depth: 160,
period: 24,
quality: '2'
},
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: {
discrepancy: '4',
estimatedWaterDepth: 4
},
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.

La siguiente agregación ingiere datos de la fuente cluster0-collection, que se conecta a un clúster de Atlas cargado con el conjunto de datos de muestra. Para aprender cómo crear un espacio de trabajo de Stream Processing y agregar una conexión a un clúster de Atlas al registro de conexión, consulte Introducción a Atlas Stream Processing. Esta agregación ejecuta dos etapas para abrir un flujo de cambios y registrar los cambios en la colección data de la base de datos sample_weatherdata:

  1. La etapa se conecta a $source la cluster0-collection fuente y abre un flujo de cambios contra la data colección en la sample_weatherdata base de datos.

  2. La etapa $merge escribe los documentos de flujo de cambios filtrados en una colección de Atlas denominada data_changes en la base de datos sample_weatherdata. Si no existe tal colección, Atlas la crea.

[{
"$source": {
"connectionName": "cluster0-connection",
"db": "sample_weatherdata",
"coll": "data"
}
},
{
"$merge": {
"into": {
"connectionName": "cluster0-connection",
"db": "sample_weatherdata",
"coll": "data_changes"
}
}
}]

El siguiente comando mongosh borra un documento data:

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

Después de que el documento data sea eliminado, el procesador de flujo registra el documento de evento del flujo de cambios en la colección sample_weatherdata.data_changes. Para ver los documentos en la colección resultante de sample_weatherdata.data_changes, usa mongosh para conectarte a tu clúster de Atlas y ejecutar el siguiente comando:

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]

La siguiente agregación utiliza un arreglo de documentos en línea como fuente de datos de transmisión, que contiene observaciones meteorológicas para tres ubicaciones. El arreglo utiliza el mismo esquema que el Conjunto de Datos de muestra de Meteorología. Esta agregación ejecuta tres etapas:

  1. La etapa define una matriz en $source línea documents de observaciones meteorológicas como la fuente de datos de transmisión y utiliza timeField para designar el timestamp campo de cada documento como la marca de tiempo autorizada.

  2. La etapa $match remite solo los documentos con dewPoint.value mayor que 5.0 a la siguiente etapa.

  3. La fase $merge escribe la salida en una colección Atlas denominada stream en la base de datos sample_weatherstream. Si no existe tal base de datos o colección, Atlas los creará.

[{
"$source": {
"documents": [
{
"location": "New York",
"timestamp": ISODate('2024-01-15T08:00:00Z'),
"temp": 23.5,
"dewPoint": { "value": 6.2 }
},
{
"location": "Los Angeles",
"timestamp": ISODate('2024-01-15T08:05:00Z'),
"temp": 18.2,
"dewPoint": { "value": 4.8 }
},
{
"location": "Chicago",
"timestamp": ISODate('2024-01-15T08:10:00Z'),
"temp": 26.8,
"dewPoint": { "value": 7.5 }
}
]
}
},
{
"$match": { "dewPoint.value": { "$gt": 5.0 } }
},
{
"$merge": {
"into": {
"connectionName": "weatherStreamOutput",
"db": "sample_weatherstream",
"coll": "stream"
}
}
}]

Para ver los documentos de la colección sample_weatherstream.stream resultante, conéctate a un clúster de Atlas y ejecuta el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
[
{
_id: ObjectId('67a6c3df14fcac13b1a21a01'),
dewPoint: { value: 6.2 },
location: 'New York',
temp: 23.5,
timestamp: ISODate('2024-01-15T08:00:00.000Z')
},
{
_id: ObjectId('67a6c3df14fcac13b1a21a03'),
dewPoint: { value: 7.5 },
location: 'Chicago',
temp: 26.8,
timestamp: ISODate('2024-01-15T08:10:00.000Z')
}
]