Docs Menu
Docs Home
/ /
/ / /

$source Etapa (procesamiento de flujo)

$source

El $source La etapa especifica una conexión en el Registro de conexión desde el que se transmiten datos. Se admiten los siguientes tipos de conexión:

  • Apache Kafka corredor

  • Flujo de cambios de la colección de MongoDB

  • Flujo de cambios de la base de datos MongoDB

  • Flujo de cambios del clúster de MongoDB

  • Flujo de datos de AWS Kinesis

  • Matriz de documentos

Para operar con datos de transmisión desde un agente Apache Kafka, la $source etapa tiene la siguiente forma de prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"schemaRegistry": {
"connectionName": "<schema-registry-name>",
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Etiqueta que identifica la conexión en el Registro de Conexión, desde donde se ingerirán los datos.

topic

cadena o matriz de cadenas

Requerido

Nombre de uno o más temas de Apache Kafka desde los que se transmitirán los mensajes. Si desea transmitir mensajes desde más de un tema, especifíquelos en una matriz.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utiliza timeField, debe definirlo como uno de los siguientes:

  • una :expresión:$toDate que toma un campo de mensaje de origen como argumento.

  • una :expresión:$dateFromString que toma un campo de mensaje de origen como argumento.

Si no declara un timeField, Atlas Stream Processing crea una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

partitionIdleTimeout

Documento

Opcional

Documento que especifica la cantidad de tiempo que una partición puede estar inactiva antes de que se ignore en los cálculos de marca de agua.

Este campo está deshabilitado de forma predeterminada. Para gestionar las particiones que no avanzan por inactividad, configure un valor para este campo.

partitionIdleTimeout.size

entero

Opcional

Número que especifica la duración del tiempo de espera inactivo de la partición.

partitionIdleTimeout.unit

string

Opcional

Unidad de tiempo para la duración del tiempo de espera inactivo de la partición.

El valor de unit puede ser uno de los siguientes:

  • "ms" (milisegundo)

  • "second"

  • "minute"

  • "hour"

  • "day"

schemaRegistry

Documento

Opcional

Documento que permite el uso de un registro de esquema para soportar la lectura desde una fuente serializada por Avro.

Para habilitar esta función, debe crear una conexión de Registro de esquema.

schemaRegistry.connectionName

string

Condicional

Nombre de la conexión del Registro de esquema que se utilizará para la deserialización de Avro.

config

Documento

Opcional

Documento que contiene campos que anulan varios valores predeterminados.

config.auto_offset_reset

string

Opcional

Especifica con qué evento en el tema de origen de Apache Kafka comenzar la ingestión. auto_offset_reset toma los siguientes valores:

  • end, latest o largest : para comenzar la ingestión desde el último evento en el tema en el momento en que se inicializa la agregación.

  • earliest, beginning o smallest : para comenzar la ingestión desde el primer evento en el tema.

Se establece por defecto en latest.

config.group_id

string

Opcional

ID del grupo de consumidores de Kafka que se asociará con el procesador de flujo. Si se omite, Atlas Stream Processing asocia el espacio de trabajo de procesamiento de flujo con un ID generado automáticamente con el siguiente formato:

asp-${streamProcessorId}-consumer

Atlas Stream Processing genera automáticamente un valor para este parámetro para todos los procesadores de flujo persistentes. Para los procesadores de flujo efímeros definidos con sp.process(), este parámetro solo se establece si se define manualmente.

config.enable_auto_commit

booleano

Condicional

Indicador que determina la política de confirmación para los desplazamientos de partición del agente de Kafka. Atlas Stream Processing admite dos políticas de confirmación:

  • Si establece este parámetro en true, Atlas Stream Processing confirma las compensaciones cada vez que la etapa $source pasa datos al siguiente operador.

  • Si establece este parámetro en false, los procesadores de flujo confirman los desplazamientos de partición cuando Atlas Stream Processing toma un punto de control.

Puedes configurar este parámetro solo si está configurado config.group_id.

Para un procesador de flujo efímero definido con sp.process(), este parámetro tiene como false valor group_id predeterminado, a menos que se configure. De lo contrario, tiene como valor true predeterminado.

config.keyFormat

string

Opcional

Tipo de dato utilizado para deserializar los datos de clave de Apache Kafka. Debe ser uno de los siguientes valores:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Se establece por defecto en binData.

config.keyFormatError

string

Opcional

Cómo gestionar los errores que se producen al deserializar datos de claves de Apache Kafka. Debe ser uno de los siguientes valores:

  • dlq, que escribe el documento en su cola de mensajes no entregados.

  • passThrough, que envía el documento a la siguiente etapa sin datos clave.

Nota

El procesamiento de flujos de Atlas requiere que los documentos del flujo de datos de origen sean válidos json ejsono. El procesamiento de flujos de Atlas envía los documentos que no cumplen este requisito a la cola de mensajes fallidos, si la ha configurado.

Un flujo de cambios de una colección Atlas permite a las aplicaciones acceder a los cambios de datos en tiempo real en una sola colección. Para saber cómo abrir un flujo de cambios en una colección, consulte Flujos de cambios.

Al utilizar un flujo $source de cambios, recomendamos configurar el clúster de origen con una ventana de registro de 24 operaciones de al menos horas.

Para operar con datos de transmisión desde un flujo de cambios de una colección Atlas, la etapa $source tiene la siguiente forma de prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"initialSync": {
"enable": <boolean>,
"parallelism": <integer>
},
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
]
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}],
"maxAwaitTimeMS": <time-ms>,
}
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Condicional

Etiqueta que identifica la conexión en el Registro de Conexión, desde donde se ingerirán los datos.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utiliza timeField, debe definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declara un timeField, Atlas Stream Processing crea una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

db

string

Requerido

Nombre de una base de datos MongoDB alojada en la instancia de Atlas especificada por connectionName. El flujo de cambios de esta base de datos actúa como fuente de datos de transmisión.

coll

cadena o matriz de cadenas

Requerido

Nombre de una o más colecciones de MongoDB alojadas en la instancia de Atlas especificada connectionName por. El flujo de cambios de estas colecciones actúa como fuente de datos de streaming. Si omite este campo, su procesador de flujos se basará en un flujo de cambios de la base de datos de MongoDB.

initialSync

Documento

Opcional

Documento que contiene campos pertenecientes a la funcionalidad initialSync.

El Procesamiento de Flujo de Atlas initialSync permite ingerir documentos preexistentes en una colección de Atlas como si fueran documentos de inserción changeEvent. Si habilita initialSync, al iniciar el procesador de flujo, este primero ingerirá y procesará todos los documentos existentes en la colección antes de proceder a ingerir y procesar los nuevos documentos entrantes changeEvent. Una vez completado el initialSync, no se repite.

Si habilita initialSync, no podrá utilizar las etapas $hoppingWindow, $sessionWindow o $tumblingWindow en su canalización.

IMPORTANTE: Solo se puede usar initialSync en colecciones donde el _id valor de los documentos entrantes sean valores ObjectId generados por defecto o intlong valores / ordenados. Todos los _id valores deben ser del mismo tipo.

initialSync.enable

booleano

Condicional

Determina si se habilita o no initialSync. Si declara un campo initialSync, debe configurarlo.

initialSync.parallelism

entero

Opcional

Determina el nivel de paralelismo con el que se procesará la operación initialSync. Si no se especifica un valor, el valor predeterminado es 1.

Cada procesador de flujo tiene un valor máximo de paralelismo acumulado, determinado por su nivel. El paralelismo acumulado de un procesador de flujo se calcula de la siguiente manera:

parallelism total - parallelized stages

Donde parallelism total es la suma de todos los parallelism valores mayores que 1 en las etapas,$source $lookup y $merge, y parallelized stages es el número de estas etapas con parallelism valores mayores 1 que.

Por ejemplo, si su etapa $source establece un valor parallelism de 4, su etapa $lookup no establece ningún valor parallelism (por lo tanto, el valor predeterminado es 1) y su etapa $merge establece un valor parallelism de 2, entonces tiene dos parallelized stages y el paralelismo acumulativo de su procesador de flujo se calcula como (4 + 2) - 2.

Si un procesador de flujo supera el paralelismo acumulado máximo para su nivel, Atlas Stream Processing genera un error y le informa del nivel mínimo de procesador necesario para el nivel de paralelismo deseado. Debe escalar el procesador a un nivel superior o reducir los valores de paralelismo de sus etapas para resolver el error. Para obtener más información, consulte Procesamiento de flujo.

readPreference

Documento

Opcional

Preferencia de lectura para initialSync operaciones.

Se establece por defecto en primary.

readPreferenceTags

Documento

Opcional

Leer etiquetas de preferencia para initialSync operaciones.

config

Documento

Opcional

Documento que contiene campos que anulan varios valores predeterminados.

config.startAfter

token

Condicional

El evento de cambio tras el cual la fuente comienza a informar. Este evento se presenta como un token de reanudación.

Solo puedes utilizar uno de config.startAfter o config.StartAtOperationTime.

config.startAtOperationTime

fecha y hora

Condicional

El tiempo de operación después del cual la fuente debe comenzar a informar.

Solo puedes utilizar uno de config.startAfter o config.StartAtOperationTime.

Acepta valores JSON extendidos de MongoDB $date $timestamp o.

config.fullDocument

string

Condicional

Configuración que controla si una fuente de flujo de cambios debe devolver un documento completo o sólo las modificaciones cuando se produce una actualización. Debe ser uno de los siguientes:

  • default :No devuelve un documento completo para update operaciones.

  • updateLookup :Devuelve sólo los cambios en la actualización.

  • required Debe devolver el documento completo. Si no está disponible, no se devuelve nada.

  • whenAvailable :Devuelve un documento completo siempre que haya uno disponible, de lo contrario, devuelve los cambios.

Para utilizar este campo con un flujo de cambio de colección, debe habilitar las imágenes previas y posteriores del flujo de cambio en esa colección.

config.fullDocumentOnly

booleano

Condicional

Configuración que controla si una fuente de flujo de cambios devuelve el documento completo del evento de cambio, incluidos todos los metadatos, o solo el contenido de fullDocument. Si se establece en true, la fuente devuelve solo el contenido de fullDocument.

Para utilizar este campo con un flujo de cambio de colección, debe habilitar las imágenes previas y posteriores del flujo de cambio en esa colección.

config.fullDocumentBeforeChange

string

Opcional

Especifica si una fuente de flujo de cambios debe incluir el documento completo en su estado original "antes de los cambios" en la salida. Debe ser uno de los siguientes:

  • off : Omite el campo fullDocumentBeforeChange.

  • required : Debe devolver un documento completo en su estado anterior a los cambios. Si no está disponible, el procesador de flujo falla.

  • whenAvailable :Devuelve un documento completo en su estado anterior a los cambios siempre que haya uno disponible, de lo contrario omite el campo fullDocumentBeforeChange.

Si no especifica un valor para fullDocumentBeforeChange, el valor predeterminado será off.

Para utilizar este campo con un flujo de cambio de colección, debe habilitar las imágenes previas y posteriores del flujo de cambio en esa colección.

config.pipeline

Documento

Opcional

Especifica un pipeline de agregación para filtrar la salida del flujo de cambios antes de pasarla para procesamiento adicional. Este pipeline debe respetar los parámetros que se describen en Modifica la salida del change stream.

IMPORTANTE: Cada evento de cambio incluye wallTime clusterTime los campos y.$source Las etapas de procesamiento de flujo de Atlas posteriores a esperan recibir estos campos a medida que el procesador los ingiere. Para garantizar el correcto procesamiento de los datos del flujo de cambio, no modifique estos campos $source.config.pipeline en.

config.maxAwaitTimeMS

entero

Opcional

Tiempo máximo, en milisegundos, que se debe esperar a que los nuevos cambios de datos se informen al cursor del flujo de cambios antes de devolver un lote vacío.

Se establece por defecto en 1000.

Un flujo de cambios de una base de datos Atlas permite a las aplicaciones acceder a los cambios de datos en tiempo real en una única base de datos. Para saber cómo abrir un flujo de cambios en una base de datos, consulte Flujos de cambios.

Al utilizar un flujo $source de cambios, recomendamos configurar el clúster de origen con una ventana de registro de 24 operaciones de al menos horas.

Para operar con datos de transmisión desde un flujo de cambios de una base de datos Atlas, la etapa $source tiene la siguiente forma de prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Condicional

Etiqueta que identifica la conexión en el Registro de Conexión, desde donde se ingerirán los datos.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utiliza timeField, debe definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declara un timeField, Atlas Stream Processing crea una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

db

string

Requerido

Nombre de una base de datos MongoDB alojada en la instancia de Atlas especificada por connectionName. El flujo de cambios de esta base de datos actúa como fuente de datos de transmisión.

config

Documento

Opcional

Documento que contiene campos que anulan varios valores predeterminados.

config.startAfter

token

Condicional

El evento de cambio tras el cual la fuente comienza a informar. Este evento se presenta como un token de reanudación.

Solo puedes utilizar uno de config.startAfter o config.StartAtOperationTime.

config.startAtOperationTime

fecha y hora

Condicional

El tiempo de operación después del cual la fuente debe comenzar a informar.

Solo puedes utilizar uno de config.startAfter o config.StartAtOperationTime.

Acepta valores JSON extendidos de MongoDB $date $timestamp o.

config.fullDocument

string

Condicional

Configuración que controla si una fuente de flujo de cambios debe devolver un documento completo o sólo las modificaciones cuando se produce una actualización. Debe ser uno de los siguientes:

  • updateLookup :Devuelve sólo los cambios en la actualización.

  • required Debe devolver el documento completo. Si no está disponible, no se devuelve nada.

  • whenAvailable :Devuelve un documento completo siempre que haya uno disponible, de lo contrario, devuelve los cambios.

Si no especifica un valor para fullDocument, el valor predeterminado será updateLookup.

Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos.

config.fullDocumentOnly

booleano

Condicional

Configuración que controla si una fuente de flujo de cambios devuelve el documento completo del evento de cambio, incluidos todos los metadatos, o solo el contenido de fullDocument. Si se establece en true, la fuente devuelve solo el contenido de fullDocument.

Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos.

config.fullDocumentBeforeChange

string

Opcional

Especifica si una fuente de flujo de cambios debe incluir el documento completo en su estado original "antes de los cambios" en la salida. Debe ser uno de los siguientes:

  • off : Omite el campo fullDocumentBeforeChange.

  • required : Debe devolver un documento completo en su estado anterior a los cambios. Si no está disponible, el procesador de flujo falla.

  • whenAvailable :Devuelve un documento completo en su estado anterior a los cambios siempre que haya uno disponible, de lo contrario omite el campo fullDocumentBeforeChange.

Si no especifica un valor para fullDocumentBeforeChange, el valor predeterminado será off.

Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos.

config.pipeline

Documento

Opcional

Especifica una canalización de agregación para filtrar la salida del flujo de cambios en el punto de origen. Esta canalización debe cumplir los parámetros descritos en Modificar la salida del flujo de cambios.

IMPORTANTE: Cada evento de cambio incluye wallTime clusterTime los campos y.$source Las etapas de procesamiento de flujo de Atlas posteriores a esperan recibir estos campos a medida que el procesador los ingiere. Para garantizar el correcto procesamiento de los datos del flujo de cambio, no modifique estos campos $source.config.pipeline en.

config.maxAwaitTimeMS

entero

Opcional

Tiempo máximo, en milisegundos, que se debe esperar a que los nuevos cambios de datos se informen al cursor del flujo de cambios antes de devolver un lote vacío.

Se establece por defecto en 1000.

Al utilizar un flujo $source de cambios, recomendamos configurar el clúster de origen con una ventana de registro de 24 operaciones de al menos horas.

Para operar con datos de transmisión de un flujo de cambios de un clúster Atlas completo, la etapa $source tiene la siguiente forma de prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Condicional

Etiqueta que identifica la conexión en el Registro de Conexión, desde donde se ingerirán los datos.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utiliza timeField, debe definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declara un timeField, Atlas Stream Processing crea una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

config

Documento

Opcional

Documento que contiene campos que anulan varios valores predeterminados.

config.startAfter

token

Condicional

El evento de cambio tras el cual la fuente comienza a informar. Este evento se presenta como un token de reanudación.

Solo puedes utilizar uno de config.startAfter o config.StartAtOperationTime.

config.startAtOperationTime

fecha | marca de tiempo

Condicional

El tiempo de operación después del cual la fuente debe comenzar a informar.

Solo puedes utilizar uno de config.startAfter o config.StartAtOperationTime.

Acepta valores JSON extendidos de MongoDB $date $timestamp o.

config.fullDocument

string

Condicional

Configuración que controla si una fuente de flujo de cambios debe devolver un documento completo o sólo las modificaciones cuando se produce una actualización. Debe ser uno de los siguientes:

  • updateLookup :Devuelve sólo los cambios en la actualización.

  • required Debe devolver el documento completo. Si no está disponible, no se devuelve nada.

  • whenAvailable :Devuelve un documento completo siempre que haya uno disponible, de lo contrario, devuelve los cambios.

Si no especifica un valor para fullDocument, el valor predeterminado será updateLookup.

Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos.

config.fullDocumentOnly

booleano

Condicional

Configuración que controla si una fuente de flujo de cambios devuelve el documento completo del evento de cambio, incluidos todos los metadatos, o solo el contenido de fullDocument. Si se establece en true, la fuente devuelve solo el contenido de fullDocument.

Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos.

config.fullDocumentBeforeChange

string

Opcional

Especifica si una fuente de flujo de cambios debe incluir el documento completo en su estado original "antes de los cambios" en la salida. Debe ser uno de los siguientes:

  • off : Omite el campo fullDocumentBeforeChange.

  • required : Debe devolver un documento completo en su estado anterior a los cambios. Si no está disponible, el procesador de flujo falla.

  • whenAvailable :Devuelve un documento completo en su estado anterior a los cambios siempre que haya uno disponible, de lo contrario omite el campo fullDocumentBeforeChange.

Si no especifica un valor para fullDocumentBeforeChange, el valor predeterminado será off.

Para utilizar este campo con un flujo de cambios de base de datos, debe habilitar las imágenes previas y posteriores al flujo de cambios en cada colección de esa base de datos.

config.pipeline

Documento

Opcional

Especifica una canalización de agregación para filtrar la salida del flujo de cambios en el punto de origen. Esta canalización debe cumplir los parámetros descritos en Modificar la salida del flujo de cambios.

Tenga en cuenta que el Procesamiento de Flujo de Atlas espera recibir los campos wallTime y clusterTime de cada Evento de Cambio ingresado. Para garantizar el correcto procesamiento de los datos del Flujo de Cambio, no modifique estos campos en $source.config.pipeline.

config.maxAwaitTimeMS

entero

Opcional

Tiempo máximo, en milisegundos, que se debe esperar a que los nuevos cambios de datos se informen al cursor del flujo de cambios antes de devolver un lote vacío.

Se establece por defecto en 1000.

Para operar con datos de un flujo de datos de AWS Kinesis, la $source etapa tiene el siguiente formato de prototipo:

{
"$source": {
"connectionName": "<registered-connection>",
"stream": "<stream-name>",
"region": "<aws-region>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<field-name>",
"shardIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"consumerARN": "<aws-arn>",
"initialPosition": <initial-position>,
reshardDetectionIntervalSecs: <interval>
}
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Etiqueta que identifica la conexión en el Registro de Conexiones, desde donde se ingerirán los datos.

stream

string

Requerido

Flujo de datos de AWS Kinesis desde el cual transmitir mensajes.

region

string

Condicional

Región deAWS donde se encuentra el flujo especificado. Kinesis admite varios flujos de datos con el mismo nombre en diferentes regiones. Si usa el mismo nombre para flujos de datos en dos o más regiones dentro de la misma conexión, debe usar este campo para especificar la combinación de nombre y región que se usará.

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utiliza timeField, debe definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declara un timeField, Atlas Stream Processing crea una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

shardIdleTimeout

Documento

Opcional

Documento que especifica la cantidad de tiempo que un fragmento puede permanecer inactivo antes de que se ignore en los cálculos de marca de agua.

Este campo está deshabilitado de forma predeterminada. Para gestionar los fragmentos que no avanzan debido a la inactividad, configure un valor para este campo.

shardIdleTimeout.size

Documento

Opcional

Número que especifica la duración del tiempo de espera inactivo del fragmento.

shardIdleTimeout.unit

Documento

Opcional

Unidad de tiempo que mide la duración del tiempo de inactividad del fragmento.

El valor de unit puede ser uno de los siguientes:

  • "ms" (milisegundo)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

Documento

Opcional

Documento que contiene campos que anulan varios valores predeterminados.

config.consumerARN

string

Opcional

ARN correspondiente a un consumidor de Kinesis. Si especifica este campo, su consumidor utilizará la función de abanico mejorado; de lo contrario, Kinesis utilizará un consumidor estándar.

config.initialPosition

string

Opcional

Posición en el historial del flujo de datos de Kinesis desde la que se iniciará la ingesta de mensajes. Debe ser una de las siguientes:

  • "TRIM_HORIZON":Comience a ingerir desde el mensaje más antiguo en el fragmento.

  • "LATEST":Comience a ingerir desde el mensaje más nuevo en el fragmento.

El valor predeterminado es "ÚLTIMO".

reshardDetectionIntervalSecs

entero

Opcional

Intervalo, en segundos, entre comprobaciones de la velocidad del flujo de datos a través de su transmisión de Kinesis con fines de refragmentación.

Para operar sobre un arreglo de documentos, la etapa $source tiene el siguiente formato prototipo:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"documents" : [{source-doc},...] | <expression>
}
}

La etapa $source procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

timeField

Documento

Opcional

Documento que define una marca de tiempo autorizada para los mensajes entrantes.

Si utiliza timeField, debe definirlo como uno de los siguientes:

  • una expresión $toDate que toma un campo de mensaje de origen como argumento

  • una expresión $dateFromString que toma un campo de mensaje de origen como argumento.

Si no declara un timeField, Atlas Stream Processing crea una marca de tiempo a partir de la marca de tiempo del mensaje proporcionada por la fuente.

documents

arreglo

Condicional

Matriz de documentos que se usará como fuente de datos de streaming. El valor de este campo puede ser una matriz de objetos o una expresión que evalúe una matriz de objetos. No utilice este campo si usa el campo connectionName.

$source debe ser la primera etapa de cualquier canalización en la que aparezca. Solo puede usar una etapa por $source canalización.

En las $source etapas de Kafka, Atlas Stream Processing lee en paralelo desde varias particiones dentro del tema de origen. El límite de particiones lo determina el nivel de procesador. Para obtener más información, consulte la referencia de facturación de Stream Processing.

Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación consta de tres etapas:

  1. La etapa establece una $source conexión con el agente de Apache Kafka que recopila estos informes en un tema my_weatherdata llamado, exponiendo cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndolo ingestionTime en.

  2. La etapa excluye los documentos que tienen $match un dewPoint.value menor o igual a 5.0 y pasa los documentos con dewPoint.value mayor que 5.0 a la siguiente etapa.

  3. La etapa escribe la salida en una colección de Atlas $merge llamada stream en la sample_weatherstream base de datos. Si no existe dicha base de datos o colección, Atlas las crea.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para ver los documentos en la colección sample_weatherstream.stream resultante, conéctese a su clúster Atlas y ejecute el siguiente comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Nota

El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.

La siguiente agregación ingiere datos de la cluster0-collection fuente, que se conecta a un clúster de Atlas cargado con el conjunto de datos de muestra. Para aprender a crear un espacio de trabajo de procesamiento de flujos y agregar una conexión a un clúster de Atlas al registro de conexiones, consulte Introducción al procesamiento de flujos de Atlas. Esta agregación ejecuta dos etapas para abrir un flujo de cambios y registrar los cambios en la data colección en la sample_weatherdata base de datos:

  1. La etapa se conecta a $source la cluster0-collection fuente y abre un flujo de cambio contra la data colección en la sample_weatherdata base de datos.

  2. La etapa $merge escribe los documentos de flujo de cambios filtrados en una colección de Atlas denominada data_changes en la base de datos sample_weatherdata. Si no existe tal colección, Atlas la crea.

{
$source: {
connectionName: "cluster0-connection",
db : "sample_weatherdata",
coll : "data"
},
$merge: {
into: {
connectionName: "cluster0-connection",
db: "sample_weatherdata",
coll: "data_changes"
}
}
}

El siguiente comando elimina mongosh un data documento:

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

Tras eliminar el data documento, el procesador de flujo escribe el documento de evento de flujo sample_weatherdata.data_changes de cambios en la colección. Para ver los documentos de la sample_weatherdata.data_changes colección resultante, utilice para conectarse a su clúster de Atlas y ejecute el siguiente mongosh comando:

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]

Volver

Etapas de agregación

En esta página