Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

$emit Stage de agregación (Stream Processing)

La etapa $emit especifica una conexión en el Registro de Conexiones a la que emitir mensajes. Se admiten los siguientes tipos de conexión:

$emit debe ser la última etapa de cualquier pipeline en la que aparezca. Solo se puede utilizar una etapa $emit por pipeline.

Para guardar datos procesados en un broker de Apache Kafka, utiliza la etapa de pipeline $emit con el siguiente formulario prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic": "<target-topic>" | <expression>,
"schemaRegistry": {
"connectionName": "<schema-registry-name>",
"valueSchema": {
type: "<schema-type>",
schema: <schema-name>,
options: {
subjectNameStrategy: "<topic-name-strategy>",
autoRegisterSchemas: true
}
}
},
"config": {
"acks": <number-of-acknowledgements>,
"compression_type": "<compression-type>",
"dateFormat": "default" | "ISO8601",
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<serialization-type>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"tombstoneWhen": <expression>
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de Conexiones, de la conexión desde la cual se ingieren datos.

topic

string o expresión

Requerido

Nombre del tema de Apache Kafka al que emitir los mensajes.

schemaRegistry

Documento

Opcional

Documento que permite el uso de un Registro de Esquemas para apoyar la escritura en un formato serializado Avro fuente.

Para habilitar esta funcionalidad, debe crear una conexión con el Registro de esquemas.

schemaRegistry.connectionName

string

Condicional

Nombre de la conexión del Registro de esquemas que se utilizará para la deserialización de Avro.

schemaRegistry.valueSchema

documento | expresión

Condicional

Documento que define las propiedades de su esquema de serialización, o una expresión que se evalúe como tal.

schemaRegistry.valueSchema.type

string

Condicional

El tipo de serialización para el que se utilizará el registro de esquema. Atlas Stream Processing actualmente admite la serialización "avro" a través de conexiones Schema Registry.

schemaRegistry.valueSchema.schema

Documento

Condicional

Documento que define tu Declaración de Esquema.

schemaRegistry.valueSchema.options

Documento

Opcional

Documento que define los parámetros de configuración opcionales para la conexión de tu registro de esquema.

schemaRegistry.valueSchema.options.autoRegisterSchemas

booleano

Opcional

Alternar para determinar si se deben registrar automáticamente los esquemas al procesar documentos con esquemas no reconocidos. Si se establece en falso, los documentos con esquemas no reconocidos son enviados a la fila de letra muerta.

Se establece por defecto en true.

schemaRegistry.valueSchema.options.subjectNameStrategy

string

Condicional

Método para determinar el nombre del sujeto de los esquemas registrados automáticamente. Debe ser uno de los siguientes:

  • "TopicNameStrategy": Utiliza el nombre {topic} de Kafka como el nombre del tema.

  • "RecordNameStrategy"Utiliza el nombre del registro Avro como el nombre del sujeto.

  • "TopicRecordNameStrategy"Usa un compuesto del nombre de Kafka {topic} y el nombre de registro Avro como el nombre del sujeto.

Por defecto es "TopicNameStrategy". Este parámetro solo se puede configurar si schemaRegistry.valueSchema.options.autoRegisterSchemas está establecido en true.

config

Documento

Opcional

Documento que contiene campos que sobrescriben diversos valores por defecto.

config.acks

Int

Opcional

Número de reconocimientos requeridos del clúster Apache Kafka para una operación $emit exitosa.

El valor por defecto es all. Atlas Stream Processing admite los siguientes valores:

  • -1

  • 0

  • 1

  • all

config.compression_type

string

Opcional

Tipo de compresión para todos los datos generados por el productor. La configuración por defecto es ninguna (es decir, sin compresión). Los valores válidos son:

  • none

  • gzip

  • snappy

  • lz4

  • zstd

La compresión se utiliza para lotes completos de datos, por lo que la eficacia del agrupamiento impacta en la relación de compresión; más agrupamiento resulta en una mejor compresión.

config.dateFormat

string

Opcional

Formato de fecha para el valor de fecha. Los valores válidos son:

  • default - para utilizar por defecto el outputFormat.

  • ISO8601 - para convertir fechas a cadenas en el formato ISO8601, que incluye precisión de milisegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por ejemplo:

Considera la siguiente entrada.

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

Si $emit.config.dateFormat está configurado en default, la salida será similar a la siguiente:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Si $emit.config.dateFormat está configurado en ISO8601, la salida será similar a la siguiente:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.headers

expresión

Opcional

Encabezados para añadir al mensaje de salida. La expresión debe evaluarse ya sea a un objeto o un arreglo.

Si la expresión se evalúa como un objeto, Atlas Stream Processing construye un encabezado a partir de cada par clave-valor de ese objeto, donde la clave es el nombre del encabezado y el valor es el valor del encabezado.

Si la expresión se evalúa como un arreglo, debe tener la forma de un arreglo de objetos de pares clave-valor. Por ejemplo:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing construye un encabezado a partir de cada objeto del arreglo, donde la clave es el nombre del encabezado y el valor es el valor del encabezado. Atlas Stream Processing admite los valores de encabezado de los siguientes tipos:

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

objeto | string | expresión

Opcional

Expresión que evalúa a una Apache Kafka clave de mensaje.

Si especificas config.key, también debes especificar config.keyFormat.

config.keyFormat

string

Condicional

Tipo de datos utilizado para serializar datos clave de Apache Kafka. Debe ser uno de los siguientes valores:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Por defecto será binData. Si especificas config.key, debes especificar config.keyFormat. Si el config.key de un documento no se serializa correctamente al tipo de dato especificado, Atlas Stream Processing lo envía a su fila de letra muerta.

config.outputFormat

string

Opcional

Formato JSON a usar al emitir mensajes a Apache Kafka. Debe ser uno de los siguientes valores:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

Se establece por defecto en "relaxedJson".

Para obtener más información, consulta JSON básico.

config.tombstoneWhen

expresión

Opcional

Expresión que determina cuándo emitir null a Kafka. La expresión debe evaluarse como booleano true o false. Cuando la expresión se evalúa como true para un documento determinado, Atlas Stream Processing emite un null en su lugar a tu sumidero de Kafka. Cuando la expresión es falsa, Atlas Stream Processing emite el documento tal como existe en el momento en que alcanza la etapa $emit.

Si la expresión no puede evaluarse como un valor booleano, o no puede evaluarse, Atlas Stream Processing escribe el documento en la DLQ.

Esta configuración se puede utilizar para habilitar la compactación de temas si se proporcionan valores de $emit.config.key y $emit.config.keyFormat. Si no proporcionas estos valores, Atlas Stream Processing aún emite null cuando esta expresión se evalúa en true, pero estos no activan la compactación de temas de Kafka.

Para guardar datos procesados en una colección de series de tiempo de Atlas, utiliza la etapa de pipeline $emit con el siguiente formulario prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"db": "<target-db>" | <expression>,
"coll": "<target-coll>" | <expression>,
"parallelism": <number>,
"partitionBy": <expression>,
"timeseries": {
<options>
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de Conexiones, de la conexión desde la cual se ingieren datos.

db

string | expresión

Requerido

Nombre de, o expresión que resuelve a, la base de datos de Atlas que contiene la colección de series de tiempo objetivo.

coll

string | expresión

Requerido

Nombre de o expresión que se resuelve en la colección de series de tiempo Atlas a la que guardar.

parallelism

entero

Opcional

Número de hilos de escritura interna que utiliza el operador de almacenamiento en serie temporal para escribir datos en la colección de series de tiempo. Esto mejora el rendimiento para cargas de trabajo de alto volumen al distribuir las operaciones de escritura entre múltiples hilos.

Por defecto 1. El valor máximo es 16.

partitionBy

expresión

Opcional

Expresión que determina cómo se dividen los documentos entre los hilos de guardar cuando parallelism es mayor que 1. El hash de la expresión evaluada determina qué hilo procesa cada documento.

Si parallelism es mayor que 1 pero partitionBy no está especificado, se utiliza una estrategia de particionamiento no especificada.

timeseries

Documento

Requerido

Documento que define los campos de serie de tiempo de la colección.

Nota

El tamaño máximo para documentos dentro de una colección de series de tiempo es 4 MB. Para obtener más información, consulta Limitaciones de la colección de series de tiempo.

Importante

Requisitos de expresión dinámica

Al utilizar expresiones para los campos db o coll, debe especificar la opción timeField en la configuración timeseries. Esto garantiza que Atlas Stream Processing pueda crear correctamente colecciones de series temporales cuando la colección de destino no existe.

Por ejemplo:

{
"$emit": {
"connectionName": "atlas1",
"db": "$targetDatabase",
"coll": "$targetCollection",
"timeseries": {
"timeField": "timestamp"
}
}
}

Atlas Stream Processing admite la creación de conexiones Private Link con flujos AWS Kinesis. Para obtener más información, consulte Agregar una conexión privada de Kinesis.

Para guardar datos procesados en AWS Kinesis, use la etapa de pipeline $emit con el siguiente formulario prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"stream": "<stream-name>",
"region": "<aws-region>",
"partitionKey": "<key>" | <field> | <expression>
"config": {
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de Conexiones, de la conexión desde la cual se ingieren datos.

stream

string

Requerido

Nombre del flujo de datos de Kinesis al que conectarse.

region

string

Opcional

Región en la que opera el Kinesis Data Stream. AWS admite múltiples flujos con el mismo nombre, cada uno en una región distinta. Este parámetro permite que Atlas Stream Processing diferencie entre tales flujos.

config

Documento

Opcional

Documento que contiene campos que sobrescriben diversos valores por defecto.

config.outputFormat

string

Opcional

Formato JSON a utilizar al emitir mensajes a Kinesis. Debe ser uno de los siguientes valores:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

Se establece por defecto en "relaxedJson".

Para obtener más información, consulta JSON básico.

config.dateFormat

string

Opcional

Formato de fecha para el valor de fecha. Los valores válidos son:

  • default - para utilizar por defecto el outputFormat.

  • ISO8601 - para convertir fechas a cadenas en el formato ISO8601, que incluye precisión de milisegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por ejemplo:

Considera la siguiente entrada.

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

Si $emit.config.dateFormat está configurado en default, la salida será similar a la siguiente:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Si $emit.config.dateFormat está configurado en ISO8601, la salida será similar a la siguiente:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

Para guardar datos procesados en una conexión de sumidero de buckets de AWS S3, utilizar la etapa del pipeline $emit con el siguiente formato de prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"bucket": "<target-bucket>" | <expression>,
"region": "<target-region>",
"path": "<key-prefix>" | <expression>,
"parallelism": <number>,
"partitionBy": <expression>,
"config": {
"writeOptions": {
"count": <doc-count>,
"bytes": <threshold>,
"interval": {
"size": <unit-count>,
"unit": "<time-denomination>"
}
},
"delimiter": "<delimiter>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
"compression": "gzip" | "snappy",
"compressionLevel": <level>
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de Conexiones, de la conexión a la que se va a guardar datos.

bucket

string

Requerido

Nombre del bucket S3 al que se guardarán los datos.

region

string

Opcional

Nombre de la región AWS en la que se encuentra el bucket de destino. Si hospedas tu espacio de trabajo de procesamiento de streams en una región AWS, este parámetro será predeterminado a esa región. De lo contrario, por defecto se utilizará la región AWS más cercana a la región host de tu espacio de trabajo de procesamiento de streams.

path

string | expresión

Requerido

Prefijo de la clave de los objetos escritos en el bucket S3. Debe ser una cadena literal prefijo o una expresión que evalúe a una string.

parallelism

entero

Opcional

Número de hilos internos de guardar que el operador sink S3 utiliza para guardar datos a S3. Esto mejora el rendimiento para cargas de trabajo de alto volumen al distribuir las operaciones de escritura entre múltiples hilos.

Por defecto 1. El valor máximo es 16.

partitionBy

expresión

Opcional

Expresión que determina cómo se dividen los documentos entre los hilos de guardar cuando parallelism es mayor que 1. El hash de la expresión evaluada determina qué hilo procesa cada documento.

Si parallelism es mayor que 1 pero partitionBy no está especificado, se utiliza una estrategia de particionamiento no especificada.

config

Documento

Opcional

Documento que contiene parámetros adicionales que anulan varios valores por defecto.

config.writeOptions

Documento

Opcional

Documento que contiene parámetros adicionales que rigen el comportamiento de guardar. Estos parámetros desencadenan un comportamiento de escritura según el umbral que se cumpla primero.

Por ejemplo, si los documentos ingeridos alcanzan el umbral de config.writeOptions.count sin alcanzar el umbral de config.writeOptions.interval, el procesador de flujo aún emite estos documentos a S3 de acuerdo con el umbral de config.writeOptions.count.

config.writeOptions.count

entero

Opcional

Número de documentos a agrupar en cada archivo escrito en S3.

config.writeOptions.bytes

entero

Opcional

Especifica el número mínimo de bytes que se deben acumular antes de que se escriba un archivo en S3. El recuento de bytes está determinado por el tamaño de los documentos BSON ingeridos por el pipeline, no por el tamaño del archivo de salida final.

El valor predeterminado es 32 MB.

config.writeOptions.interval

Documento

Opcional

Especifica un temporizador para la escritura masiva de documentos como una combinación de size y unit.

Por defecto 1 minuto. No puedes establecer el size a 0 para ningún unit. El intervalo máximo es de 7 días.

config.writeOptions.interval.size

entero

Condicional

El número de unidades especificadas por writeOptions.interval.unit tras el cual el procesador de streams escribe documentos masivamente en S3.

Por defecto será 1. No puedes establecer un size de 0. Si define writeOptions.interval, también debe definir este parámetro.

config.writeOptions.interval.unit

string

Condicional

La denominación de tiempo en la que contabilizar el temporizador de guardado masivo. Este parámetro admite los siguientes valores:

  • ms

  • second

  • minute

  • hour

  • day

Por defecto será minute. Si se define writeOptions.interval, también se debe definir este parámetro.

config.delimiter

string

Opcional

Delimitador entre cada entrada en el archivo emitido.

Se establece por defecto en \n.

config.outputFormat

string

Opcional

Especifica el formato de salida del JSON escrito en S3. Debe ser uno de los siguientes valores:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

El valor por defecto es "relaxedJson".

Para obtener más información, consulta JSON básico y JSON extendido.

config.dateFormat

string

Opcional

Formato de fecha para el valor de fecha. Los valores válidos son:

  • default - para utilizar por defecto el outputFormat.

  • ISO8601 - para convertir fechas a cadenas en el formato ISO8601, que incluye precisión de milisegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por ejemplo, si agregas el siguiente registro a la pipeline:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

entonces si $emit.config.dateFormat está configurado en default, la salida se parece a lo siguiente:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Si $emit.config.dateFormat está configurado en ISO8601, la salida será similar a la siguiente:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.compression

string

Opcional

Nombre del algoritmo de compresión a utilizar. Debe ser uno de los siguientes valores:

  • "gzip"

  • "snappy"

config.compressionLevel

string

Condicional

Nivel de compresión que se aplicará al mensaje generado. Admite valores 1-9 inclusive; los valores más altos significan más compresión.

Se establece por defecto en 6.

Este parámetro es obligatorio y está limitado a gzip. Si se establece config.compression en snappy, configurar este parámetro no tiene efecto.

Para guardar datos procesados en Azure Blob almacenamiento, use la $emit pipeline con el siguiente formulario de prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"container": "<storage-container>",
"path": "<prefix-key>" | <expression>,
"config": {
"dateFormat": "default" | "ISO8601",
"compression": "none" | "gzip" | "snappy",
"compressionLevel": [1-9],
"outputFormat": "relaxedJson"| "canonicalJson" | "basicJson",
"delimiter": "<delimiter>",
"writeOptions": {
"count": <int>,
"interval": {
"size": <int>,
"unit": "ms" | "second" | "minute" | "hour" | "day"
},
"bytes": <int>
},
"parallelism": <int>,
"partitionBy": "<key>" | expression
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de Conexiones, de la conexión en la que se guardarán datos.

container

string

Requerido

Contenedor de almacenamiento en el que guardar blobs.

path

string | expresión

Requerido

Prefijo de la clave de los objetos escritos en Azure Blob Storage. Debe ser un string literal o una expresión que se evalúe como un string.

config

Documento

Opcional

Documento que contiene parámetros adicionales que anulan varios valores por defecto.

config.dateFormat

string

Opcional

Formato de fecha para el valor de fecha. Los valores válidos son:

  • default - para utilizar por defecto el outputFormat.

  • ISO8601 - para convertir fechas a cadenas en el formato ISO8601, que incluye precisión de milisegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por ejemplo, si agregas el siguiente registro a la pipeline:

{ "flightTime" : ISODate("2025-01-10T20:17:38.387Z") }

entonces si $emit.config.dateFormat está configurado en default, la salida se parece a lo siguiente:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Si $emit.config.dateFormat está configurado en ISO8601, la salida será similar a la siguiente:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.compression

string

Opcional

Nombre del algoritmo de compresión a utilizar. Debe ser uno de los siguientes valores:

  • "none"

  • "gzip"

  • "snappy"

Si no especifica un valor para este campo, Atlas Stream Processing escribe los datos sin compresión.

config.compressionLevel

string

Condicional

Nivel de compresión que se aplicará al mensaje generado. Admite valores 1-9 inclusive; los valores más altos significan más compresión.

Se establece por defecto en 6.

Este parámetro es obligatorio y está limitado a gzip. Si se establece config.compression en snappy, configurar este parámetro no tiene efecto.

config.outputFormat

string

Opcional

Especifica el formato de salida del JSON escrito en Azure Blob Storage. Debe ser uno de los siguientes valores:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

Se establece por defecto en "relaxedJson".

Para obtener más información, consulta JSON básico y JSON extendido.

config.delimiter

string

Opcional

Delimitador entre cada entrada en el archivo emitido.

Se establece por defecto en \n.

config.writeOptions

Documento

Opcional

Documento que contiene parámetros adicionales que rigen el comportamiento de guardar. Estos parámetros desencadenan un comportamiento de escritura según el umbral que se cumpla primero.

Por ejemplo, si los documentos incorporados alcanzan el umbral de config.writeOptions.count sin alcanzar el umbral de config.writeOptions.interval, el procesador de flujo sigue emitiendo estos documentos a Azure Blob Storage de acuerdo con el umbral de config.writeOptions.count.

config.writeOptions.count

entero

Opcional

Número de documentos a agrupar en cada archivo escrito en Azure Blob Storage.

config.writeOptions.bytes

entero

Opcional

Especifica el número mínimo de bytes que se deben acumular antes de guardar un archivo en Azure Blob Storage. La cantidad de bytes se determina por el tamaño de los documentos BSON procesados en el pipeline, no por el tamaño del archivo de salida final.

El valor predeterminado es 32 MB.

config.writeOptions.interval

Documento

Opcional

Especifica un temporizador para la escritura masiva de documentos como una combinación de size y unit.

Por defecto 1 minuto. No puedes establecer el size a 0 para ningún unit. El intervalo máximo es de 7 días.

config.writeOptions.interval.size

entero

Condicional

El número de unidades especificado por writeOptions.interval.unit, después del cual el procesador de flujo escribe documentos de forma masiva en Azure Blob almacenamiento.

Por defecto será 1. No puedes establecer un size de 0. Si define writeOptions.interval, también debe definir este parámetro.

config.writeOptions.interval.unit

string

Condicional

La denominación de tiempo en la que contabilizar el temporizador de guardado masivo. Este parámetro admite los siguientes valores:

  • ms

  • second

  • minute

  • hour

  • day

Por defecto será minute. Si se define writeOptions.interval, también se debe definir este parámetro.

parallelism

entero

Opcional

Cantidad de subprocesos de escritura internos que Atlas Stream Processing utiliza para escribir datos en Azure Blob Storage. Esto mejora el rendimiento para cargas de trabajo de alto volumen al distribuir las operaciones de escritura en múltiples hilos.

Por defecto 1. El valor máximo es 16.

partitionBy

string | expresión

Opcional

String o expresión que determina cómo Atlas Stream Processing particiona los documentos entre los hilos de guardado cuando parallelism es mayor que 1. El hash de la expresión evaluada determina cuál hilo procesa cada documento.

Si parallelism es mayor que 1, pero partitionBy no está especificado, Atlas Stream Processing utiliza una estrategia de round-robin para asignar documentos a los threads.

Para guardar datos procesados en Google Cloud Pub/Sub, utiliza la etapa de pipeline $emit con el siguiente formulario prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic": "<topic>" | <expression>,
"projectId": "<project-id>",
"region": "<pubsub-region>",
"orderingKey": "<key>" | <expression>,
"attributes": {
"<key1>": "<value1>" | <expression>,
. . .
"<keyN>": "<valueN>" | <expression>
},
"config": {
"dateFormat": "default" | "ISO8601",
"outputFormat": "relaxedJson"| "canonicalJson" | "basicJson",
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de Conexiones, de la conexión en la que se guardarán datos.

projectId

string

Requerido

ID del proyecto al que se debe publicar.

topic

string | expresión

Requerido

Nombre del tema dentro del proyecto dado en el cual publicar mensajes. Debe ser un string o una expresión que evalúe a un string.

region

string

Requerido

El endpoint de ubicación al que enviar solicitudes. Para consultar las regiones disponibles, revisa la documentación de Pub/Sub.

orderingKey

string | expresión

Opcional

Clave en los datos publicados para ordenar la publicación del mensaje. Si estableces un orderingKey, Atlas Stream Processing garantiza que el destino del suscriptor reciba todos los mensajes publicados que contengan esa clave en el orden en que la fuente los publica.

Utiliza un valor de string para este campo a fin de establecer un orderingKey estático. Utiliza una expresión que devuelva una string para un orderingKey dinámico. Una expresión que da como resultado específicamente una ruta de campo establece el orderingKey en el valor de un campo en un documento.

attributes

Documento

Opcional

Documento que define los atributos que se deben agregar a los mensajes. Los atributos adoptan la forma de pares clave-valor donde:

  • Cada clave es una string o una expresión de ruta de campo que determina la clave que Atlas Stream Processing añade a los mensajes.

  • Cada valor es un string o una expresión de ruta de campo que se evalúa como un string.

config

Documento

Opcional

Documento que contiene parámetros adicionales que anulan varios valores por defecto.

config.dateFormat

string

Opcional

Formato de fecha para el valor de fecha. Los valores válidos son:

  • default - para utilizar por defecto el outputFormat.

  • ISO8601 - para convertir fechas a cadenas en el formato ISO8601, que incluye precisión de milisegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por ejemplo, si agregas el siguiente registro a la pipeline:

{ "flightTime" : ISODate("2025-01-10T20:17:38.387Z") }

entonces si $emit.config.dateFormat está configurado en default, la salida se parece a lo siguiente:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Si $emit.config.dateFormat está configurado en ISO8601, la salida será similar a la siguiente:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.outputFormat

string

Opcional

Especifica el formato de salida del JSON escrito en Google Cloud Pub/Sub. Debe ser uno de los siguientes valores:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

Se establece por defecto en "relaxedJson".

Para obtener más información, consulta JSON básico y JSON extendido.

Para facilitar la ingesta de mensajes, la etapa $emit admite la escritura de datos procesados en destinos en el formato JSON básico, que simplifica los formatos JSON extendido (canonicalJson) y JSON extendido relajado (relaxedJson). JSON básico no utiliza los contenedores JSON extendido de MongoDB y, por tanto, no conserva todos los tipos de BSON.

Se puede especificar el formato JSON básico configurando el campo config.outputFormat en "basicJson" en la etapa $emit.

La siguiente tabla proporciona ejemplos de estas simplificaciones para todos los campos afectados.

tipo de campo
relaxedJson
basicJson

Binario

{ "binary": { "$binary": { "base64": "gf1UcxdHTJ2HQ/EGQrO7mQ==", "subType": "00" }}}

{ "binary": "gf1UcxdHTJ2HQ/EGQrO7mQ=="}

fecha

{ "date": { "$date": "2024-10-24T18:07:29.636Z"}}

{ "date": 1729625275856}

Decimal

{ "decimal": { "$numberDecimal": "9.9" }}

{ "decimal": "9.9" }

Marca de tiempo

{ "timestamp": { "$timestamp": { "t": 1729793249, "i": 1 }}}

{ "timestamp": 1729793249000}

ObjectId

{ "_id": { "$oid": "671a8ce1497407eff0e17cba" }}

{ "_id": "6717fcbba18c8a8f74b6d977" }

Infinito negativo

{ "negInf": { "$numberDouble": "-Infinity" }}

{ "negInf": "-Infinity" }

Infinito positivo

{ "posInf": { "$numberDouble": "Infinity" }}

{ "posInf": "Infinity" }

Expresiones regulares

{ "regex": { "$regularExpression": { "pattern": "ab+c", "options": "i" }}}

{ "regex": { "pattern": "ab+c", "options": "i" }}

UUID

{ "uuid": { "$binary": { "base64": "Kat+fHk6RkuAmotUmsU7gA==", "subType": "04" }}}

{ "uuid": "420b7ade-811a-4698-aa64-c8347c719cf1"}

Solo puedes guardar en una única colección de series de tiempo de Atlas por procesador de flujos. Si especificas una colección que no existe, Atlas crea la colección con los campos de serie de tiempo que especificaste. Debes especificar una base de datos existente.

Atlas Stream Processing no admite la escritura de documentos BSON de más de 125 MB usando la etapa $emit en un AWS S3 bucket.

La variante Azure Blob Storage $emit también admite la escritura en2 contenedores Azure Data Lake Storage v.

La variante $emit de GCP Pub/Sub admite la definición de valores a través de expresiones de rutas de campo tanto para el campo orderingKey como para los pares clave-valor en el campo attributes. Si hay un error en la expresión o si la expresión no se resuelve en una string, Atlas Stream Processing envía el mensaje afectado a la fila de letra muerta.

Puedes usar una expresión dinámica como el valor de los campos topic, db y coll para permitir que el procesador de flujo guarde en diferentes destinos en función de cada mensaje. La expresión debe evaluarse como una string.

Ejemplo

Tienes un flujo de eventos de transacciones que genera mensajes de la siguiente forma:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para organizar cada uno de estos en un tema distinto de Apache Kafka, puedes guardar la siguiente etapa $emit:

{
"$emit": {
"connectionName": "kafka1",
"topic": "$customerStatus"
}
}

Esta $emit etapa:

  • Guarda el mensaje Very Important Industries en un tema llamado VIP.

  • Guarda el mensaje N. E. Buddy en un tema llamado employee.

  • Guarda el mensaje Khan Traktor en un tema llamado contractor.

Para obtener más información sobre las expresiones dinámicas, consulta operadores de expresiones.

Si se especifica un tema que no existe aún, Apache Kafka crea automáticamente el tema cuando recibe el primer mensaje que lo dirige.

Si se especifica un tema con una expresión dinámica, pero Atlas Stream Processing no puede evaluar la expresión para un mensaje dado, Atlas Stream Processing envía ese mensaje a la fila de letra muerta si está configurada y procesa los siguientes mensajes. Si no se configura una fila de letra muerta, entonces Atlas Stream Processing omite el mensaje por completo y procesa los mensajes siguientes.

Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:

  1. La etapa $source establece una conexión con el Apache Kafka broker que recopila estos informes en un tema denominado my_weatherdata, exponiendo cada registro a medida que se procesa a las siguientes etapas de agregación. Este etapa también reemplaza el nombre del campo de marca de tiempo que proyecta, estableciéndolo en ingestionTime.

  2. La etapa $match excluye documentos que tienen un airTemperature.value de mayor o igual a 30.0 y pasa los documentos con un airTemperature.value menor que 30.0 a la siguiente etapa.

  3. La etapa $addFields enriquece el flujo con metadatos.

  4. La etapa $emit escribe la salida en un tema denominado stream a través de la conexión del broker de Kafka weatherStreamOutput.

{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata",
"tsFieldName": "ingestionTime"
}
},
{
"$match": {
"airTemperature.value": {
"$lt": 30
}
}
},
{
"$addFields": {
"processorMetadata": {
"$meta": "stream"
}
}
},
{
"$emit": {
"connectionName": "weatherStreamOutput",
"topic": "stream"
}
}

Los documentos del tema stream toman la siguiente forma:

{
"st": "x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8, 116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1", "AG1", "UG1", "SA1", "MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight": {
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime": {
"$date": "2024-09-26T17:34:41.843Z"
}
}

Nota

Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.