Definición
La etapa $emit especifica una conexión en el
Registro de conexión al que se envían mensajes. Se admiten los siguientes tipos de conexión:
Apache Kafka corredor
Flujo de datos de AWS Kinesis
AWS CuboS3
Colocación
$emit debe ser la última etapa de cualquier canalización en la que aparezca. Solo puede utilizar una etapa $emit por canalización.
Sintaxis
Broker Apache Kafka
Para escribir datos procesados en un agente Apache Kafka, utilice la $emit etapa de canalización con el siguiente formato de prototipo:
{ "$emit": { "connectionName": "<registered-connection>", "topic": "<target-topic>" | <expression>, "schemaRegistry": { "connectionName": "<schema-registry-name>", "valueSchema": { type: "<schema-type>", schema: <schema-name>, options: { subjectNameStrategy: "<topic-name-strategy>", autoRegisterSchemas: true } } }, "config": { "acks": <number-of-acknowledgements>, "compression_type": "<compression-type>", "dateFormat": "default" | "ISO8601", "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<serialization-type>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "tombstoneWhen": <expression> } } }
La etapa $emit procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción | |||||
|---|---|---|---|---|---|---|---|---|
| string | Requerido | Nombre, tal como aparece en el Registro de conexión, de la conexión desde la cual se ingerirán datos. | |||||
| cadena o expresión | Requerido | Nombre del tema de Apache Kafka al que se emitirán mensajes. | |||||
| Documento | Opcional | Documento que permite el uso de un registro de esquema para soportar la escritura en una fuente serializada por Avro. Para habilitar esta función, debe crear una conexión de Registro de esquema. | |||||
| string | Condicional | Nombre de la conexión del Registro de esquema que se utilizará para la deserialización de Avro. | |||||
| documento | expresión | Condicional | Documento que define las propiedades de su esquema de serialización, o una expresión que evalúa tal. | |||||
| string | Condicional | El tipo de serialización para el que se utiliza el Registro de Esquemas. Atlas Stream Processing actualmente admite la serialización | |||||
| Documento | Condicional | Documento que define su Declaración de Esquema. | |||||
| Documento | Opcional | Documento que define parámetros de configuración opcionales para su conexión de registro de esquema. | |||||
| booleano | Opcional | Activa o desactiva el registro automático de esquemas al procesar documentos con esquemas no reconocidos. Si se establece en "false", los documentos con esquemas no reconocidos se envían a la cola de mensajes fallidos. Se establece por defecto en | |||||
| string | Condicional | Método para determinar el nombre del sujeto de los esquemas registrados automáticamente. Debe ser uno de los siguientes:
El valor predeterminado es | |||||
| Documento | Opcional | Documento que contiene campos que anulan varios valores predeterminados. | |||||
| Int | Opcional | Número de reconocimientos necesarios del clúster Apache Kafka para una El valor predeterminado es
| |||||
| string | Opcional | Tipo de compresión para todos los datos generados por el productor. El valor predeterminado es ninguno (es decir, sin compresión). Los valores válidos son:
La compresión se utiliza para lotes completos de datos, por lo que la eficacia del procesamiento en lotes afecta la relación de compresión; un mayor procesamiento en lotes da como resultado una mejor compresión. | |||||
| string | Opcional | Formato de fecha para el valor de fecha. Los valores válidos son:
Por ejemplo: Considere la siguiente entrada: Si Si | |||||
| expresión | Opcional | Encabezados que se añadirán al mensaje de salida. La expresión debe evaluarse como un objeto o una matriz. Si la expresión se evalúa como un objeto, Atlas Stream Processing construye un encabezado a partir de cada par clave-valor en ese objeto, donde la clave es el nombre del encabezado y el valor es el valor del encabezado. Si la expresión se evalúa como una matriz, debe adoptar la forma de una matriz de objetos de pares clave-valor. Por ejemplo: Atlas Stream Processing construye un encabezado a partir de cada objeto de la matriz, donde la clave es el nombre del encabezado y el valor es el valor del encabezado. Atlas Stream Processing admite valores de encabezado de los siguientes tipos:
| |||||
| objeto | cadena | expresión | Opcional | Expresión que evalúa una clave de mensaje de Apache Kafka. Si especifica | |||||
| string | Condicional | Tipo de dato utilizado para serializar los datos de clave de Apache Kafka. Debe ser uno de los siguientes valores:
El valor predeterminado | |||||
| string | Opcional | Formato JSON para enviar mensajes a Apache Kafka. Debe ser uno de los siguientes valores:
Se establece por defecto en Para obtener más información,consulte JSON básico. | |||||
| expresión | Opcional | Expresión que determina cuándo emitir Si la expresión no puede evaluarse como un valor booleano o no se puede evaluar, Atlas Stream Processing escribe el documento en el DLQ. Esta configuración permite habilitar la compactación de temas si se proporcionan los valores |
Colección de series temporales Atlas
Para escribir datos procesados en una colección de series de tiempo de Atlas, utilice la etapa de canalización $emit con el siguiente formato de prototipo:
{ "$emit": { "connectionName": "<registered-connection>", "db": "<target-db>" | <expression>, "coll": "<target-coll>" | <expression>, "timeseries": { <options> } } }
La etapa $emit procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción |
|---|---|---|---|
| string | Requerido | Nombre, tal como aparece en el Registro de conexión, de la conexión desde la cual se ingerirán datos. |
| cadena | expresión | Requerido | Nombre o expresión que se resuelve en la base de datos Atlas que contiene la colección de series de tiempo de destino. |
| cadena | expresión | Requerido | Nombre o expresión que se resuelve en la colección de series de tiempo Atlas donde se escribirá. |
| Documento | Requerido | Documento que define los campos de series de tiempo para la colección. |
Nota
El tamaño máximo de los documentos dentro de una colección de series temporales es de 4 MB. Para obtener más información, consulte Limitaciones de las colecciones de series temporales.
AWS Kinesis
Para escribir datos procesados en AWS Kinesis, utilice la $emit etapa de canalización con el siguiente formato de prototipo:
{ "$emit": { "connectionName": "<registered-connection>", "stream": "<stream-name>", "region": "<aws-region>", "partitionKey": "<key>" | <field> | <expression> "config": { "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", } } }
La etapa $emit procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción | |||
|---|---|---|---|---|---|---|
| string | Requerido | Nombre, tal como aparece en el Registro de conexión, de la conexión desde la cual se ingerirán datos. | |||
| string | Requerido | Nombre del flujo de datos de Kinesis al que conectarse. | |||
| string | Opcional | Región en la que opera el flujo de datos de Kinesis. AWS admite múltiples flujos con el mismo nombre, cada uno en una región distinta. Este parámetro permite a Atlas Stream Processing distinguir entre tales flujos. | |||
| Documento | Opcional | Documento que contiene campos que anulan varios valores predeterminados. | |||
| string | Opcional | Formato JSON para enviar mensajes a Kinesis. Debe ser uno de los siguientes valores:
Se establece por defecto en Para obtener más información,consulte JSON básico. | |||
| string | Opcional | Formato de fecha para el valor de fecha. Los valores válidos son:
Por ejemplo: Considere la siguiente entrada: Si Si |
AWS S3
Para escribir datos procesados en una conexión de sumidero3 de bucket de AWS S, utilice la $emit etapa de canalización con el siguiente formato de prototipo:
{ "$emit": { "connectionName": "<registered-connection>", "bucket": "<target-bucket>" | <expression>, "region": "<target-region>", "path": "<key-prefix>" | <expression>, "config": { "writeOptions": { "count": <doc-count>, "bytes": <threshold>, "interval": { "size": <unit-count>, "unit": "<time-denomination>" } }, "delimiter": "<delimiter>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", "compression": "gzip" | "snappy", "compressionLevel": <level> } } }
La etapa $emit procesa un documento con los siguientes campos:
Campo | Tipo | Necesidad | Descripción | |||
|---|---|---|---|---|---|---|
| string | Requerido | Nombre, tal como aparece en el Registro de conexión, de la conexión donde escribir datos. | |||
| string | Requerido | Nombre del depósito S en el que se escribirán los datos.3 | |||
| string | Opcional | Nombre de la región de AWS donde reside el bucket de destino. Si aloja su espacio de trabajo de procesamiento de flujos en una región de AWS, este parámetro se establece de forma predeterminada en esa región. De lo contrario, se establece de forma predeterminada en la región de AWS más cercana a la región de host de su espacio de trabajo de procesamiento de flujos. | |||
| cadena | expresión | Requerido | Prefijo de la clave de los objetos escritos en el contenedor S.Debe ser una3 cadena de prefijo literal o una expresión que evalúe una cadena. | |||
| Documento | Opcional | Documento que contiene parámetros adicionales que anulan varios valores predeterminados. | |||
| Documento | Opcional | Documento que contiene parámetros adicionales que rigen el comportamiento de escritura. Estos parámetros activan el comportamiento de escritura según el umbral que se alcance primero. Por ejemplo, si los documentos ingeridos alcanzan el | |||
| entero | Opcional | Cantidad de documentos que se agruparán en cada archivo escrito en S3. | |||
| entero | Opcional | Especifica la cantidad mínima de bytes que se deben acumular antes de que un archivo se escriba en S. El número de bytes 3se determina por el tamaño de los documentos BSON ingeridos por la canalización, no por el tamaño del archivo de salida final. | |||
| Documento | Opcional | Especifica un temporizador para la escritura masiva de documentos como una combinación de El valor predeterminado es 1 minutos. No se puede establecer de | |||
| entero | Condicional | La cantidad de unidades especificadas por El valor predeterminado es | |||
| string | Condicional | El tiempo que se utiliza para contabilizar el temporizador de escritura masiva. Este parámetro admite los siguientes valores:
El valor predeterminado es | |||
| string | Opcional | Delimitador entre cada entrada en el archivo emitido. Se establece por defecto en | |||
| string | Opcional | Especifica el formato de salida del JSON escrito en S.3 Debe ser uno de los siguientes valores:
El valor predeterminado es " Para obtener más información,consulte JSON básico. | |||
| string | Opcional | Formato de fecha para el valor de fecha. Los valores válidos son:
Por ejemplo, si agrega el siguiente registro a la canalización: Entonces, si Si | |||
| string | Opcional | Nombre del algoritmo de compresión a utilizar. Debe ser uno de los siguientes valores:
| |||
| string | Condicional | Nivel de compresión que se aplicará al mensaje emitido. Admite valores Se establece por defecto en Este parámetro es obligatorio y está limitado a |
Basic JSON
Para facilitar la ingesta de mensajes, la etapa $emit permite escribir datos procesados en receptores en formato JSON básico, lo que simplifica los formatos JSON extendido (canonicalJson) y extendido relajado (relaxedJson). JSON básico no utiliza los envoltorios JSON extendido de MongoDB y, por lo tanto, no conserva todos los tipos de BSON.
Puede especificar el formato JSON básico configurando el campo config.outputFormat en "basicJson" en su etapa $emit.
La siguiente tabla proporciona ejemplos de estas simplificaciones para todos los campos afectados.
Tipo de campo | relajadoJson | basicJson |
|---|---|---|
Binario |
|
|
fecha |
|
|
Decimal |
|
|
Marca de tiempo |
|
|
ObjectId |
|
|
Infinito negativo |
|
|
Infinito positivo |
|
|
Expresiones regulares |
|
|
UUID |
|
|
Comportamiento
Solo se puede escribir en una única colección de series temporales de Atlas por procesador de flujo. Si se especifica una colección inexistente, Atlas la crea con los campos de series temporales especificados. Debe especificar una base de datos existente.
Atlas Stream Processing no admite la escritura de documentos BSON más grandes que 125
MB mediante la etapa $emit en un bucket de AWS S.3
Puede usar una expresión dinámica como valor de los topic db coll campos, y para que su procesador de flujo pueda escribir en diferentes destinos mensaje por mensaje. La expresión debe evaluarse como una cadena.
Ejemplo
Tiene un flujo de eventos de transacción que genera mensajes del siguiente formato:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para ordenar cada uno de estos en un tema distinto de Apache Kafka, puedes escribir la siguiente $emit etapa:
{ "$emit": { "connectionName": "kafka1", "topic": "$customerStatus" } }
Esta $emit etapa:
Escribe el mensaje
Very Important Industriesen un tema llamadoVIP.Escribe el mensaje
N. E. Buddyen un tema llamadoemployee.Escribe el mensaje
Khan Traktoren un tema llamadocontractor.
Para obtener más información sobre expresiones dinámicas, consulte operadores de expresión.
Si especifica un tema que aún no existe, Apache Kafka crea automáticamente el tema cuando recibe el primer mensaje dirigido a él.
Si especifica un tema con una expresión dinámica, pero Atlas Stream Processing no puede evaluar la expresión para un mensaje dado, Atlas Stream Processing envía ese mensaje a la cola de mensajes no entregados, si está configurada, y procesa los mensajes subsiguientes. Si no hay una cola de mensajes no entregados configurada, Atlas Stream Processing omite el mensaje por completo y procesa los mensajes subsiguientes.
Ejemplos
Una fuente de datos de streaming genera informes meteorológicos detallados de diversas ubicaciones, conforme al esquema del Conjunto de Datos Meteorológicos de Muestra. La siguiente agregación consta de tres etapas:
La etapa establece una
$sourceconexión con el agente de Apache Kafka que recopila estos informes en un temamy_weatherdatallamado, exponiendo cada registro a medida que se ingiere a las etapas de agregación posteriores. Esta etapa también anula el nombre del campo de marca de tiempo que proyecta, estableciéndoloingestionTimeen.La etapa excluye los documentos que tienen
$matchunairTemperature.valuemayor o igual a30.0y pasa los documentos con unairTemperature.valuemenor que30.0a la siguiente etapa.La etapa enriquece la transmisión con
$addFieldsmetadatos.La etapa
$emitescribe la salida en un tema llamadostreama través de la conexión del agente de KafkaweatherStreamOutput.
{ "$source": { "connectionName": "sample_weatherdata", "topic": "my_weatherdata", "tsFieldName": "ingestionTime" } }, { "$match": { "airTemperature.value": { "$lt": 30 } } }, { "$addFields": { "processorMetadata": { "$meta": "stream" } } }, { "$emit": { "connectionName": "weatherStreamOutput", "topic": "stream" } }
Los documentos del tema stream tienen el siguiente formato:
{ "st": "x+34700+119500", "position": { "type": "Point", "coordinates": [122.8, 116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1", "AG1", "UG1", "SA1", "MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight": { "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime": { "$date": "2024-09-26T17:34:41.843Z" } }
Nota
El ejemplo anterior es representativo. Los datos de streaming no son estáticos y cada usuario ve documentos distintos.