Definición
La etapa $emit especifica una conexión en la
Registro de conexión al que se envían mensajes. Se admiten los siguientes tipos de conexión:
Apache Kafka corredor
MongoDB colección de series de tiempo
AWS Kinesis flujo de datos
AWS CuboS3
Ubicación
$emit debe ser la última etapa de cualquier canalización en la que aparezca. Solo puede utilizar una etapa $emit por canalización.
Sintaxis
Apache Kafka Broker
Para escribir datos procesados en un agente Apache Kafka, utilice la $emit etapa de canalización con el siguiente formato de prototipo:
{ "$emit": { "connectionName": "<registered-connection>", "topic": "<target-topic>" | <expression>, "schemaRegistry": { "connectionName": "<schema-registry-name>", "valueSchema": { type: "<schema-type>", schema: <schema-name>, options: { subjectNameStrategy: "<topic-name-strategy>", autoRegisterSchemas: true } } }, "config": { "acks": <number-of-acknowledgements>, "compression_type": "<compression-type>", "dateFormat": "default" | "ISO8601", "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<serialization-type>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "tombstoneWhen": <expression> } } }
La etapa $emit procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción | |||||
|---|---|---|---|---|---|---|---|---|
| string | Requerido | Nombre, tal como aparece en el Registro de Conexiones, de la conexión desde la cual se ingieren datos. | |||||
| cadena o expresión | Requerido | Nombre del tema de Apache Kafka al que se emitirán mensajes. | |||||
| Documento | Opcional | Documento que permite el uso de un registro de esquema para soportar la escritura en una fuente serializada por Avro. Para habilitar esta función, debe crear una conexión de Registro de esquema. | |||||
| string | Condicional | Nombre de la conexión del Registro de esquemas que se utilizará para la deserialización de Avro. | |||||
| documento | expresión | Condicional | Documento que define las propiedades de su esquema de serialización, o una expresión que evalúa tal. | |||||
| string | Condicional | El tipo de serialización para el que se utiliza el Registro de Esquemas. Atlas Stream Processing actualmente admite la serialización | |||||
| Documento | Condicional | Documento que define tu Declaración de Esquema. | |||||
| Documento | Opcional | Documento que define los parámetros de configuración opcionales para la conexión de tu registro de esquema. | |||||
| booleano | Opcional | Alternar para determinar si se deben registrar automáticamente los esquemas al procesar documentos con esquemas no reconocidos. Si se establece en falso, los documentos con esquemas no reconocidos son enviados a la fila de letra muerta. Se establece por defecto en | |||||
| string | Condicional | Método para determinar el nombre del sujeto de los esquemas registrados automáticamente. Debe ser uno de los siguientes:
Por defecto es | |||||
| Documento | Opcional | Documento que contiene campos que sobrescriben diversos valores por defecto. | |||||
| Int | Opcional | Número de reconocimientos necesarios del clúster Apache Kafka para una El valor por defecto es
| |||||
| string | Opcional | Tipo de compresión para todos los datos generados por el productor. El valor predeterminado es ninguno (es decir, sin compresión). Los valores válidos son:
La compresión se utiliza para lotes completos de datos, por lo que la eficacia del procesamiento en lotes afecta la relación de compresión; un mayor procesamiento en lotes da como resultado una mejor compresión. | |||||
| string | Opcional | Formato de fecha para el valor de fecha. Los valores válidos son:
Por ejemplo: Considere la siguiente entrada: Si Si | |||||
| expresión | Opcional | Encabezados para añadir al mensaje de salida. La expresión debe evaluarse ya sea a un objeto o un arreglo. Si la expresión se evalúa como un objeto, Atlas Stream Processing construye un encabezado a partir de cada par clave-valor de ese objeto, donde la clave es el nombre del encabezado y el valor es el valor del encabezado. Si la expresión se evalúa como una matriz, debe adoptar la forma de una matriz de objetos de pares clave-valor. Por ejemplo: Atlas Stream Processing construye un encabezado a partir de cada objeto del arreglo, donde la clave es el nombre del encabezado y el valor es el valor del encabezado. Atlas Stream Processing admite los valores de encabezado de los siguientes tipos:
| |||||
| objeto | string | expresión | Opcional | Expresión que evalúa a una Apache Kafka clave de mensaje. Si especificas | |||||
| string | Condicional | Tipo de datos utilizado para serializar datos clave de Apache Kafka. Debe ser uno de los siguientes valores:
Por defecto será | |||||
| string | Opcional | Formato JSON a usar al emitir mensajes a Apache Kafka. Debe ser uno de los siguientes valores:
Se establece por defecto en Para obtener más información,consulte JSON básico. | |||||
| expresión | Opcional | Expresión que determina cuándo emitir Si la expresión no puede evaluarse como un valor booleano, o no puede evaluarse, Atlas Stream Processing escribe el documento en la DLQ. Esta configuración se puede utilizar para habilitar la compactación de temas si se proporcionan valores de |
Atlas colección de series de tiempo
Para guardar datos procesados en una colección de series de tiempo de Atlas, utiliza la etapa de pipeline $emit con el siguiente formulario prototipo:
{ "$emit": { "connectionName": "<registered-connection>", "db": "<target-db>" | <expression>, "coll": "<target-coll>" | <expression>, "timeseries": { <options> } } }
La etapa $emit procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Requerido | Nombre, tal como aparece en el Registro de Conexiones, de la conexión desde la cual se ingieren datos. |
| string | expresión | Requerido | Nombre de, o expresión que resuelve a, la base de datos de Atlas que contiene la colección de series de tiempo objetivo. |
| string | expresión | Requerido | Nombre de o expresión que se resuelve en la colección de series de tiempo Atlas a la que guardar. |
| Documento | Requerido | Documento que define los campos de serie de tiempo de la colección. |
Nota
El tamaño máximo para documentos dentro de una colección de series de tiempo es 4 MB. Para obtener más información, consulta Limitaciones de la colección de series de tiempo.
AWS Kinesis
Atlas Stream Processing admite la creación de conexiones Private Link con flujos AWS Kinesis. Para obtener más información, consulte Agregar una conexión privada de Kinesis.
Para escribir datos procesados en AWS Kinesis, utilice la $emit etapa de canalización con el siguiente formato de prototipo:
{ "$emit": { "connectionName": "<registered-connection>", "stream": "<stream-name>", "region": "<aws-region>", "partitionKey": "<key>" | <field> | <expression> "config": { "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", } } }
La etapa $emit procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción | |||
|---|---|---|---|---|---|---|
| string | Requerido | Nombre, tal como aparece en el Registro de Conexiones, de la conexión desde la cual se ingieren datos. | |||
| string | Requerido | Nombre del flujo de datos de Kinesis al que conectarse. | |||
| string | Opcional | Región en la que opera el flujo de datos de Kinesis. AWS admite múltiples flujos con el mismo nombre, cada uno en una región distinta. Este parámetro permite a Atlas Stream Processing distinguir entre tales flujos. | |||
| Documento | Opcional | Documento que contiene campos que sobrescriben diversos valores por defecto. | |||
| string | Opcional | Formato JSON a utilizar al emitir mensajes a Kinesis. Debe ser uno de los siguientes valores:
Se establece por defecto en Para obtener más información,consulte JSON básico. | |||
| string | Opcional | Formato de fecha para el valor de fecha. Los valores válidos son:
Por ejemplo: Considere la siguiente entrada: Si Si |
AWS S3
Para guardar datos procesados en una conexión de sumidero de buckets AWS S3, utiliza la etapa de pipeline $emit con el siguiente prototipo:
{ "$emit": { "connectionName": "<registered-connection>", "bucket": "<target-bucket>" | <expression>, "region": "<target-region>", "path": "<key-prefix>" | <expression>, "config": { "writeOptions": { "count": <doc-count>, "bytes": <threshold>, "interval": { "size": <unit-count>, "unit": "<time-denomination>" } }, "delimiter": "<delimiter>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", "compression": "gzip" | "snappy", "compressionLevel": <level> } } }
La etapa $emit procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción | |||
|---|---|---|---|---|---|---|
| string | Requerido | Nombre, tal como aparece en el Registro de Conexiones, de la conexión a la que se va a guardar datos. | |||
| string | Requerido | Nombre del depósito S en el que se escribirán los datos.3 | |||
| string | Opcional | Nombre de la región de AWS en la que se encuentra el bucket de destino. Si aloja su espacio de trabajo de Stream Processing en una región de AWS, este parámetro se establece de forma predeterminada en esa región. De lo contrario, se selecciona por defecto la región AWS más cercana a la región anfitriona de tu espacio de trabajo de Stream Processing. | |||
| string | expresión | Requerido | Prefijo de la clave de objetos escritos en el depósito S3. Debe ser una cadena literal prefijo o una expresión que evalúe en una string. | |||
| Documento | Opcional | Documento que contiene parámetros adicionales que anulan varios valores por defecto. | |||
| Documento | Opcional | Documento que contiene parámetros adicionales que rigen el comportamiento de guardar. Estos parámetros desencadenan un comportamiento de escritura según el umbral que se cumpla primero. Por ejemplo, si los documentos ingeridos alcanzan el | |||
| entero | Opcional | Cantidad de documentos que se agruparán en cada archivo escrito en S3. | |||
| entero | Opcional | Especifica el número mínimo de bytes que deben acumularse antes de que un archivo se escriba en S3. El conteo de bytes se determina por el tamaño de los documentos BSON ingeridos por la pipeline, no por el tamaño del archivo de salida final. | |||
| Documento | Opcional | Especifica un temporizador para la escritura masiva de documentos como una combinación de El valor predeterminado es 1 minutos. No se puede establecer de | |||
| entero | Condicional | La cantidad de unidades especificadas por Por defecto será | |||
| string | Condicional | La denominación de tiempo en la que contabilizar el temporizador de guardado masivo. Este parámetro admite los siguientes valores:
Por defecto será | |||
| string | Opcional | Delimitador entre cada entrada en el archivo emitido. Se establece por defecto en | |||
| string | Opcional | Especifica el formato de salida del JSON escrito en S3. Debe ser uno de los siguientes valores:
El valor por defecto es " Para obtener más información,consulte JSON básico. | |||
| string | Opcional | Formato de fecha para el valor de fecha. Los valores válidos son:
Por ejemplo, si agregas el siguiente registro a la pipeline: Entonces, si Si | |||
| string | Opcional | Nombre del algoritmo de compresión a utilizar. Debe ser uno de los siguientes valores:
| |||
| string | Condicional | Nivel de compresión que se aplicará al mensaje emitido. Admite valores Se establece por defecto en Este parámetro es obligatorio y está limitado a |
Basic JSON
Para facilitar la ingesta de mensajes, la etapa $emit permite escribir datos procesados en receptores en formato JSON básico, lo que simplifica los formatos JSON extendido (canonicalJson) y extendido relajado (relaxedJson). JSON básico no utiliza los envoltorios JSON extendido de MongoDB y, por lo tanto, no conserva todos los tipos de BSON.
Puede especificar el formato JSON básico configurando el campo config.outputFormat en "basicJson" en su etapa $emit.
La siguiente tabla proporciona ejemplos de estas simplificaciones para todos los campos afectados.
tipo de campo | relaxedJson | basicJson |
|---|---|---|
Binario |
|
|
fecha |
|
|
Decimal |
|
|
Marca de tiempo |
|
|
ObjectId |
|
|
Infinito negativo |
|
|
Infinito positivo |
|
|
Expresiones regulares |
|
|
UUID |
|
|
Comportamiento
Solo puedes guardar en una única colección de series de tiempo de Atlas por procesador de flujos. Si especificas una colección que no existe, Atlas crea la colección con los campos de serie de tiempo que especificaste. Debes especificar una base de datos existente.
Atlas Stream Processing no admite la escritura de documentos BSON mayores a 125
MB mediante la etapa $emit en un bucket AWS S3.
Puede usar una expresión dinámica como valor de los topic db coll campos, y para que su procesador de flujo pueda escribir en diferentes destinos mensaje por mensaje. La expresión debe evaluarse como una cadena.
Ejemplo
Tienes un flujo de eventos de transacciones que genera mensajes de la siguiente forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para organizar cada uno de estos en un tema distinto de Apache Kafka, puedes guardar la siguiente etapa $emit:
{ "$emit": { "connectionName": "kafka1", "topic": "$customerStatus" } }
Esta $emit etapa:
Guarda el mensaje
Very Important Industriesen un tema llamadoVIP.Guarda el mensaje
N. E. Buddyen un tema llamadoemployee.Guarda el mensaje
Khan Traktoren un tema llamadocontractor.
Para obtener más información sobre expresiones dinámicas, consulte operadores de expresión.
Si se especifica un tema que no existe aún, Apache Kafka crea automáticamente el tema cuando recibe el primer mensaje que lo dirige.
Si se especifica un tema con una expresión dinámica, pero Atlas Stream Processing no puede evaluar la expresión para un mensaje dado, Atlas Stream Processing envía ese mensaje a la fila de letra muerta si está configurada y procesa los siguientes mensajes. Si no se configura una fila de letra muerta, entonces Atlas Stream Processing omite el mensaje por completo y procesa los mensajes siguientes.
Ejemplos
Una fuente de datos de transmisión genera informes meteorológicos detallados desde varias ubicaciones, cumpliendo con el esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación tiene tres etapas:
La etapa
$sourceestablece una conexión con el Apache Kafka broker que recopila estos informes en un tema denominadomy_weatherdata, exponiendo cada registro a medida que se procesa a las siguientes etapas de agregación. Este etapa también reemplaza el nombre del campo de marca de tiempo que proyecta, estableciéndolo eningestionTime.La etapa
$matchexcluye documentos que tienen unairTemperature.valuede mayor o igual a30.0y pasa los documentos con unairTemperature.valuemenor que30.0a la siguiente etapa.La etapa
$addFieldsenriquece el flujo con metadatos.La etapa
$emitescribe la salida en un tema llamadostreama través de la conexión del agente de KafkaweatherStreamOutput.
{ "$source": { "connectionName": "sample_weatherdata", "topic": "my_weatherdata", "tsFieldName": "ingestionTime" } }, { "$match": { "airTemperature.value": { "$lt": 30 } } }, { "$addFields": { "processorMetadata": { "$meta": "stream" } } }, { "$emit": { "connectionName": "weatherStreamOutput", "topic": "stream" } }
Los documentos del tema stream toman la siguiente forma:
{ "st": "x+34700+119500", "position": { "type": "Point", "coordinates": [122.8, 116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1", "AG1", "UG1", "SA1", "MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight": { "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime": { "$date": "2024-09-26T17:34:41.843Z" } }
Nota
Lo anterior es un ejemplo representativo. Los datos en transmisión no son estáticos, y cada usuario ve documentos distintos.