Docs Menu
Docs Home
/ /
/ / /

$emit Stage de agregación (Stream Processing)

La etapa $emit especifica una conexión en el Registro de conexión al que se envían mensajes. Se admiten los siguientes tipos de conexión:

  • Apache Kafka corredor

  • Colección de series temporales deMongoDB

  • Flujo de datos de AWS Kinesis

  • AWS CuboS3

$emit debe ser la última etapa de cualquier canalización en la que aparezca. Solo puede utilizar una etapa $emit por canalización.

Para escribir datos procesados ​​en un agente Apache Kafka, utilice la $emit etapa de canalización con el siguiente formato de prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic": "<target-topic>" | <expression>,
"schemaRegistry": {
"connectionName": "<schema-registry-name>",
"valueSchema": {
type: "<schema-type>",
schema: <schema-name>,
options: {
subjectNameStrategy: "<topic-name-strategy>",
autoRegisterSchemas: true
}
}
},
"config": {
"acks": <number-of-acknowledgements>,
"compression_type": "<compression-type>",
"dateFormat": "default" | "ISO8601",
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<serialization-type>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"tombstoneWhen": <expression>
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de conexión, de la conexión desde la cual se ingerirán datos.

topic

cadena o expresión

Requerido

Nombre del tema de Apache Kafka al que se emitirán mensajes.

schemaRegistry

Documento

Opcional

Documento que permite el uso de un registro de esquema para soportar la escritura en una fuente serializada por Avro.

Para habilitar esta función, debe crear una conexión de Registro de esquema.

schemaRegistry.connectionName

string

Condicional

Nombre de la conexión del Registro de esquema que se utilizará para la deserialización de Avro.

schemaRegistry.valueSchema

documento | expresión

Condicional

Documento que define las propiedades de su esquema de serialización, o una expresión que evalúa tal.

schemaRegistry.valueSchema.type

string

Condicional

El tipo de serialización para el que se utiliza el Registro de Esquemas. Atlas Stream Processing actualmente admite la serialización "avro" mediante conexiones del Registro de Esquemas.

schemaRegistry.valueSchema.schema

Documento

Condicional

Documento que define su Declaración de Esquema.

schemaRegistry.valueSchema.options

Documento

Opcional

Documento que define parámetros de configuración opcionales para su conexión de registro de esquema.

schemaRegistry.valueSchema.options.autoRegisterSchemas

booleano

Opcional

Activa o desactiva el registro automático de esquemas al procesar documentos con esquemas no reconocidos. Si se establece en "false", los documentos con esquemas no reconocidos se envían a la cola de mensajes fallidos.

Se establece por defecto en true.

schemaRegistry.valueSchema.options.subjectNameStrategy

string

Condicional

Método para determinar el nombre del sujeto de los esquemas registrados automáticamente. Debe ser uno de los siguientes:

  • "TopicNameStrategy": Utiliza el nombre {topic} de Kafka como el nombre del tema.

  • "RecordNameStrategy":Utiliza el nombre del registro Avro como nombre del sujeto.

  • "TopicRecordNameStrategy":Utiliza una combinación del nombre de Kafka {topic} y el nombre del registro de Avro como nombre del sujeto.

El valor predeterminado es "TopicNameStrategy". Solo se puede configurar este parámetro si schemaRegistry.valueSchema.options.autoRegisterSchemas está configurado en true.

config

Documento

Opcional

Documento que contiene campos que anulan varios valores predeterminados.

config.acks

Int

Opcional

Número de reconocimientos necesarios del clúster Apache Kafka para una $emit operación exitosa.

El valor predeterminado es all. Atlas Stream Processing admite los siguientes valores:

  • -1

  • 0

  • 1

  • all

config.compression_type

string

Opcional

Tipo de compresión para todos los datos generados por el productor. El valor predeterminado es ninguno (es decir, sin compresión). Los valores válidos son:

  • none

  • gzip

  • snappy

  • lz4

  • zstd

La compresión se utiliza para lotes completos de datos, por lo que la eficacia del procesamiento en lotes afecta la relación de compresión; un mayor procesamiento en lotes da como resultado una mejor compresión.

config.dateFormat

string

Opcional

Formato de fecha para el valor de fecha. Los valores válidos son:

  • default - utilizar el valor predeterminado outputFormat.

  • ISO8601 - para convertir fechas en cadenas en el formato ISO8601, que incluye precisión de milisegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por ejemplo:

Considere la siguiente entrada:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

Si $emit.config.dateFormat se establece en default, la salida será similar a la siguiente:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Si $emit.config.dateFormat se establece en ISO8601, la salida será similar a la siguiente:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.headers

expresión

Opcional

Encabezados que se añadirán al mensaje de salida. La expresión debe evaluarse como un objeto o una matriz.

Si la expresión se evalúa como un objeto, Atlas Stream Processing construye un encabezado a partir de cada par clave-valor en ese objeto, donde la clave es el nombre del encabezado y el valor es el valor del encabezado.

Si la expresión se evalúa como una matriz, debe adoptar la forma de una matriz de objetos de pares clave-valor. Por ejemplo:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing construye un encabezado a partir de cada objeto de la matriz, donde la clave es el nombre del encabezado y el valor es el valor del encabezado. Atlas Stream Processing admite valores de encabezado de los siguientes tipos:

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

objeto | cadena | expresión

Opcional

Expresión que evalúa una clave de mensaje de Apache Kafka.

Si especifica config.key, debe especificar config.keyFormat.

config.keyFormat

string

Condicional

Tipo de dato utilizado para serializar los datos de clave de Apache Kafka. Debe ser uno de los siguientes valores:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

El valor predeterminado binDataes. Si config.key especifica, debe especificar. Si el config.keyFormat config.key de un documento no se serializa correctamente al tipo de datos especificado, Atlas Stream Processing lo envía a la cola de mensajes fallidos.

config.outputFormat

string

Opcional

Formato JSON para enviar mensajes a Apache Kafka. Debe ser uno de los siguientes valores:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

Se establece por defecto en "relaxedJson".

Para obtener más información,consulte JSON básico.

config.tombstoneWhen

expresión

Opcional

Expresión que determina cuándo emitir null a Kafka. La expresión debe evaluarse como un booleano true o false. Cuando la expresión evalúa como true para un documento dado, Atlas Stream Processing emite como null en su lugar al receptor de Kafka. Si la expresión evalúa como falsa, Atlas Stream Processing emite el documento tal como existe al llegar a la etapa $emit.

Si la expresión no puede evaluarse como un valor booleano o no se puede evaluar, Atlas Stream Processing escribe el documento en el DLQ.

Esta configuración permite habilitar la compactación de temas si se proporcionan los valores $emit.config.key y $emit.config.keyFormat. Si no se proporcionan estos valores, Atlas Stream Processing seguirá emitiendo null cuando esta expresión se evalúe como true, pero estos valores no activarán la compactación de temas de Kafka.

Para escribir datos procesados ​​en una colección de series de tiempo de Atlas, utilice la etapa de canalización $emit con el siguiente formato de prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"db": "<target-db>" | <expression>,
"coll": "<target-coll>" | <expression>,
"timeseries": {
<options>
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de conexión, de la conexión desde la cual se ingerirán datos.

db

cadena | expresión

Requerido

Nombre o expresión que se resuelve en la base de datos Atlas que contiene la colección de series de tiempo de destino.

coll

cadena | expresión

Requerido

Nombre o expresión que se resuelve en la colección de series de tiempo Atlas donde se escribirá.

timeseries

Documento

Requerido

Documento que define los campos de series de tiempo para la colección.

Nota

El tamaño máximo de los documentos dentro de una colección de series temporales es de 4 MB. Para obtener más información, consulte Limitaciones de las colecciones de series temporales.

Para escribir datos procesados ​​en AWS Kinesis, utilice la $emit etapa de canalización con el siguiente formato de prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"stream": "<stream-name>",
"region": "<aws-region>",
"partitionKey": "<key>" | <field> | <expression>
"config": {
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de conexión, de la conexión desde la cual se ingerirán datos.

stream

string

Requerido

Nombre del flujo de datos de Kinesis al que conectarse.

region

string

Opcional

Región en la que opera el flujo de datos de Kinesis. AWS admite múltiples flujos con el mismo nombre, cada uno en una región distinta. Este parámetro permite a Atlas Stream Processing distinguir entre tales flujos.

config

Documento

Opcional

Documento que contiene campos que anulan varios valores predeterminados.

config.outputFormat

string

Opcional

Formato JSON para enviar mensajes a Kinesis. Debe ser uno de los siguientes valores:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

Se establece por defecto en "relaxedJson".

Para obtener más información,consulte JSON básico.

config.dateFormat

string

Opcional

Formato de fecha para el valor de fecha. Los valores válidos son:

  • default - utilizar el valor predeterminado outputFormat.

  • ISO8601 - para convertir fechas en cadenas en el formato ISO8601, que incluye precisión de milisegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por ejemplo:

Considere la siguiente entrada:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

Si $emit.config.dateFormat se establece en default, la salida será similar a la siguiente:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Si $emit.config.dateFormat se establece en ISO8601, la salida será similar a la siguiente:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

Para escribir datos procesados ​​en una conexión de sumidero3 de bucket de AWS S, utilice la $emit etapa de canalización con el siguiente formato de prototipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"bucket": "<target-bucket>" | <expression>,
"region": "<target-region>",
"path": "<key-prefix>" | <expression>,
"config": {
"writeOptions": {
"count": <doc-count>,
"bytes": <threshold>,
"interval": {
"size": <unit-count>,
"unit": "<time-denomination>"
}
},
"delimiter": "<delimiter>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
"compression": "gzip" | "snappy",
"compressionLevel": <level>
}
}
}

La etapa $emit procesa un documento con los siguientes campos:

Campo
Tipo
Necesidad
Descripción

connectionName

string

Requerido

Nombre, tal como aparece en el Registro de conexión, de la conexión donde escribir datos.

bucket

string

Requerido

Nombre del depósito S en el que se escribirán los datos.3

region

string

Opcional

Nombre de la región de AWS donde reside el bucket de destino. Si aloja su espacio de trabajo de procesamiento de flujos en una región de AWS, este parámetro se establece de forma predeterminada en esa región. De lo contrario, se establece de forma predeterminada en la región de AWS más cercana a la región de host de su espacio de trabajo de procesamiento de flujos.

path

cadena | expresión

Requerido

Prefijo de la clave de los objetos escritos en el contenedor S.Debe ser una3 cadena de prefijo literal o una expresión que evalúe una cadena.

config

Documento

Opcional

Documento que contiene parámetros adicionales que anulan varios valores predeterminados.

config.writeOptions

Documento

Opcional

Documento que contiene parámetros adicionales que rigen el comportamiento de escritura. Estos parámetros activan el comportamiento de escritura según el umbral que se alcance primero.

Por ejemplo, si los documentos ingeridos alcanzan el config.writeOptions.count umbral sin alcanzar el config.writeOptions.interval umbral, el procesador de flujo aún emite estos documentos a S3 de acuerdo con el config.writeOptions.count umbral.

config.writeOptions.count

entero

Opcional

Cantidad de documentos que se agruparán en cada archivo escrito en S3.

config.writeOptions.bytes

entero

Opcional

Especifica la cantidad mínima de bytes que se deben acumular antes de que un archivo se escriba en S. El número de bytes 3se determina por el tamaño de los documentos BSON ingeridos por la canalización, no por el tamaño del archivo de salida final.

config.writeOptions.interval

Documento

Opcional

Especifica un temporizador para la escritura masiva de documentos como una combinación de size y unit.

El valor predeterminado es 1 minutos. No se puede establecer de size a 0 para ningún unit. El intervalo máximo es de 7 días.

config.writeOptions.interval.size

entero

Condicional

La cantidad de unidades especificadas por writeOptions.interval.unit después de las cuales el procesador de flujo escribe documentos en masa en 3S.

El valor predeterminado es 1. No se puede establecer un valor size de 0. Si define writeOptions.interval, también debe definir este parámetro.

config.writeOptions.interval.unit

string

Condicional

El tiempo que se utiliza para contabilizar el temporizador de escritura masiva. Este parámetro admite los siguientes valores:

  • ms

  • second

  • minute

  • hour

  • day

El valor predeterminado es minute. Si define writeOptions.interval, también debe definir este parámetro.

config.delimiter

string

Opcional

Delimitador entre cada entrada en el archivo emitido.

Se establece por defecto en \n.

config.outputFormat

string

Opcional

Especifica el formato de salida del JSON escrito en S.3 Debe ser uno de los siguientes valores:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

El valor predeterminado es "relaxedJson".

Para obtener más información,consulte JSON básico.

config.dateFormat

string

Opcional

Formato de fecha para el valor de fecha. Los valores válidos son:

  • default - utilizar el valor predeterminado outputFormat.

  • ISO8601 - para convertir fechas en cadenas en el formato ISO8601, que incluye precisión de milisegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por ejemplo, si agrega el siguiente registro a la canalización:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

Entonces, si $emit.config.dateFormat se establece en default, la salida se parece a la siguiente:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Si $emit.config.dateFormat se establece en ISO8601, la salida será similar a la siguiente:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.compression

string

Opcional

Nombre del algoritmo de compresión a utilizar. Debe ser uno de los siguientes valores:

  • "gzip"

  • "rápido"

config.compressionLevel

string

Condicional

Nivel de compresión que se aplicará al mensaje emitido. Admite valores 1-9 inclusive; valores más altos indican mayor compresión.

Se establece por defecto en 6.

Este parámetro es obligatorio y está limitado a gzip. Si se configura de config.compression a snappy, la configuración de este parámetro no tendrá ningún efecto.

Para facilitar la ingesta de mensajes, la etapa $emit permite escribir datos procesados ​​en receptores en formato JSON básico, lo que simplifica los formatos JSON extendido (canonicalJson) y extendido relajado (relaxedJson). JSON básico no utiliza los envoltorios JSON extendido de MongoDB y, por lo tanto, no conserva todos los tipos de BSON.

Puede especificar el formato JSON básico configurando el campo config.outputFormat en "basicJson" en su etapa $emit.

La siguiente tabla proporciona ejemplos de estas simplificaciones para todos los campos afectados.

Tipo de campo
relajadoJson
basicJson

Binario

{ "binary": { "$binary": { "base64": "gf1UcxdHTJ2HQ/EGQrO7mQ==", "subType": "00" }}}

{ "binary": "gf1UcxdHTJ2HQ/EGQrO7mQ=="}

fecha

{ "date": { "$date": "2024-10-24T18:07:29.636Z"}}

{ "date": 1729625275856}

Decimal

{ "decimal": { "$numberDecimal": "9.9" }}

{ "decimal": "9.9" }

Marca de tiempo

{ "timestamp": { "$timestamp": { "t": 1729793249, "i": 1 }}}

{ "timestamp": 1729793249000}

ObjectId

{ "_id": { "$oid": "671a8ce1497407eff0e17cba" }}

{ "_id": "6717fcbba18c8a8f74b6d977" }

Infinito negativo

{ "negInf": { "$numberDouble": "-Infinity" }}

{ "negInf": "-Infinity" }

Infinito positivo

{ "posInf": { "$numberDouble": "Infinity" }}

{ "posInf": "Infinity" }

Expresiones regulares

{ "regex": { "$regularExpression": { "pattern": "ab+c", "options": "i" }}}

{ "regex": { "pattern": "ab+c", "options": "i" }}

UUID

{ "uuid": { "$binary": { "base64": "Kat+fHk6RkuAmotUmsU7gA==", "subType": "04" }}}

{ "uuid": "420b7ade-811a-4698-aa64-c8347c719cf1"}

Solo se puede escribir en una única colección de series temporales de Atlas por procesador de flujo. Si se especifica una colección inexistente, Atlas la crea con los campos de series temporales especificados. Debe especificar una base de datos existente.

Atlas Stream Processing no admite la escritura de documentos BSON más grandes que 125 MB mediante la etapa $emit en un bucket de AWS S.3

Puede usar una expresión dinámica como valor de los topic db coll campos, y para que su procesador de flujo pueda escribir en diferentes destinos mensaje por mensaje. La expresión debe evaluarse como una cadena.

Ejemplo

Tiene un flujo de eventos de transacción que genera mensajes del siguiente formato:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para ordenar cada uno de estos en un tema distinto de Apache Kafka, puedes escribir la siguiente $emit etapa:

{
"$emit": {
"connectionName": "kafka1",
"topic": "$customerStatus"
}
}

Esta $emit etapa:

  • Escribe el mensaje Very Important Industries en un tema llamado VIP.

  • Escribe el mensaje N. E. Buddy en un tema llamado employee.

  • Escribe el mensaje Khan Traktor en un tema llamado contractor.

Para obtener más información sobre expresiones dinámicas, consulte operadores de expresión.

Si especifica un tema que aún no existe, Apache Kafka crea automáticamente el tema cuando recibe el primer mensaje dirigido a él.

Si especifica un tema con una expresión dinámica, pero Atlas Stream Processing no puede evaluar la expresión para un mensaje dado, Atlas Stream Processing envía ese mensaje a la cola de mensajes no entregados, si está configurada, y procesa los mensajes subsiguientes. Si no hay una cola de mensajes no entregados configurada, Atlas Stream Processing omite el mensaje por completo y procesa los mensajes subsiguientes.

Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación consta de tres etapas:

  1. La etapa establece una $source conexión con el agente de Apache Kafka que recopila estos informes en un tema my_weatherdata llamado, exponiendo cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndolo ingestionTime en.

  2. La etapa excluye los documentos que tienen $match un airTemperature.value mayor o igual a 30.0 y pasa los documentos con un airTemperature.value menor que 30.0 a la siguiente etapa.

  3. La etapa enriquece la transmisión con $addFields metadatos.

  4. La etapa $emit escribe la salida en un tema llamado stream a través de la conexión del agente de Kafka weatherStreamOutput.

{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata",
"tsFieldName": "ingestionTime"
}
},
{
"$match": {
"airTemperature.value": {
"$lt": 30
}
}
},
{
"$addFields": {
"processorMetadata": {
"$meta": "stream"
}
}
},
{
"$emit": {
"connectionName": "weatherStreamOutput",
"topic": "stream"
}
}

Los documentos del tema stream tienen el siguiente formato:

{
"st": "x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8, 116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1", "AG1", "UG1", "SA1", "MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight": {
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime": {
"$date": "2024-09-26T17:34:41.843Z"
}
}

Nota

El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.

Volver

$setStreamMeta

En esta página