Definição
O estágio $emit especifica uma conexão no Registro de Conexão para o qual emitir mensagens. Os seguintes tipos de conexão são suportados:
Fluxo de dados do AWS Kinesis
AWS Bucket S3
Sintaxe
Corretora Apache Kafka
Para gravar dados processados em um agente Apache Kafka, use o estágio de pipeline $emit com o seguinte formulário de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "topic": "<target-topic>" | <expression>, "schemaRegistry": { "connectionName": "<schema-registry-name>", "valueSchema": { type: "<schema-type>", schema: <schema-name>, options: { subjectNameStrategy: "<topic-name-strategy>", autoRegisterSchemas: true } } }, "config": { "acks": <number-of-acknowledgements>, "compression_type": "<compression-type>", "dateFormat": "default" | "ISO8601", "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<serialization-type>", "outputFormat": "<json-format>", "tombstoneWhen": <expression> } } }
O estágio $emit recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição | |||||
|---|---|---|---|---|---|---|---|---|
| string | Obrigatório | Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados. | |||||
| string ou expressão | Obrigatório | Nome do tópico do Apache Kafka para o qual emitir mensagens. | |||||
| documento | Opcional | Documento que habilita o uso de um registro de esquema para dar suporte à gravação em uma fonte serializada Avro. Para habilitar esse recurso, você deve criar uma conexão de registro de esquema. | |||||
| string | Condicional | Nome da conexão do Registro de Esquema a ser usada para desserialização Avro. | |||||
| documento | expressão | Condicional | Documento que define as propriedades do seu esquema de serialização ou uma expressão que avalia para tal. | |||||
| string | Condicional | O tipo de serialização para o qual usar o Registro de Esquema. Atualmente, o Atlas Stream Processing oferece suporte à serialização | |||||
| documento | Condicional | Documento que define sua Declaração de Esquema. | |||||
| documento | Opcional | Documento que define parâmetros de configuração opcionais para sua conexão de registro de esquema. | |||||
| booleano | Opcional | Alterne para determinar se os esquemas devem ou não ser registrados automaticamente ao processar documentos com esquemas não reconhecidos. Se definido como falso, os documentos com esquemas não reconhecidos são enviados para a fila de mensagens não entregues (DLQ)). Padrão é | |||||
| string | Condicional | Método de determinar o nome do assunto de esquemas registrados automaticamente. Deve ser um dos seguintes:
Padrão é | |||||
| documento | Opcional | documento que contém campo que substituem vários valores padrão. | |||||
| int | Opcional | Número de confirmações necessárias do cluster Apache Kafka para uma operação O valor padrão é
| |||||
| string | Opcional | Tipo de compactação para todos os dados gerados pelo produtor. O padrão é nenhum (ou seja, sem compactação). Os valores válidos são:
A compactação é usada para lotes completos de dados, portanto, a eficácia do agrupamento impacta a taxa de compactação; mais agrupamento resulta em melhor compactação. | |||||
| string | Opcional | Formato de data para o valor da data. Os valores válidos são:
Por exemplo: Considere a seguinte entrada: Se Se | |||||
| expressão | Opcional | Cabeçalhos a serem adicionados à mensagem de saída. A expressão deve ser avaliada como um objeto ou uma array. Se a expressão for avaliada como um objeto, o Atlas Stream Processing constrói um cabeçalho a partir de cada par de chave-valor nesse objeto, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho. Se a expressão for avaliada como uma array, ela deverá assumir a forma de uma array de objetos de pares de valores-chave. Por exemplo: O Atlas Stream Processing constrói um cabeçalho a partir de cada objeto na array, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho. O Atlas Stream Processing é compatível com valores de cabeçalho dos seguintes tipos:
| |||||
| objeto | string | expressão | Opcional | Expressão que avalia para um Apache Kafka chave da mensagem. Se você especificar | |||||
| string | Condicional | Tipo de dados usado para serializar os principais dados do Apache Kafka. Deve ser um dos seguintes valores:
O padrão é | |||||
| string | Opcional | Formato JSON a ser usado ao emitir mensagens para o Apache Kafka. Deve ser um dos seguintes valores:
Padrão é | |||||
| expressão | Opcional | Expressão que determina quando emitir Se a expressão falhar ao avaliar para um valor booleano, ou não puder ser avaliada, o Atlas Stream Processing gravará o documento no DLQ. Esta configuração pode ser utilizada para habilitar a compactação de tópico se você fornecer valores |
Atlas Time Series Collection
Para gravar dados processados em uma coleção de séries temporais do Atlas, use o estágio de pipeline $emit com o seguinte formulário de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "db": "<target-db>" | <expression>, "coll": "<target-coll>" | <expression>, "timeseries": { <options> } } }
O estágio $emit recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
|---|---|---|---|
| string | Obrigatório | Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados. |
| corda | expressão | Obrigatório | Nome ou expressão que se resolve para o banco de dados do Atlas que contém a coleção de séries temporais de destino. |
| corda | expressão | Obrigatório | Nome de ou expressão que se resolve para a coleção de séries temporais do Atlas para gravação. |
| documento | Obrigatório | Documento que define os campos de série temporal para a coleção. |
Observação
O tamanho máximo para documentos em uma coleção de séries temporais é de 4 MB. Para saber mais, consulte Limitações de Coleção de Séries Temporais.
AWS Kinesis
Para gravar dados processados no AWS Kinesis, use o estágio de pipeline $emit com a seguinte forma de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "stream": "<stream-name>", "region": "<aws-region>", "partitionKey": "<key>" | <field> | <expression> "config": { "outputFormat": "<json-format>", "dateFormat": "default" | "ISO8601", } } }
O estágio $emit recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição | |||
|---|---|---|---|---|---|---|
| string | Obrigatório | Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados. | |||
| string | Obrigatório | Nome do fluxo de dados do Kinesis ao qual se conectar. | |||
| string | Opcional | Região em que o Kinesis Data Stream opera. A AWS oferece suporte a vários fluxos com o mesmo nome, cada um em uma região distinta. Este parâmetro permite ao Atlas Stream Processing distinguir entre esses fluxos. | |||
| documento | Opcional | documento que contém campo que substituem vários valores padrão. | |||
| string | Opcional | Formato JSON a ser usado ao emitir mensagens para o Kinesis. Deve ser um dos seguintes valores:
Padrão é | |||
| string | Opcional | Formato de data para o valor da data. Os valores válidos são:
Por exemplo: Considere a seguinte entrada: Se Se |
AWS S3
Para gravar dados processados em uma conexão de destino de bucket AWS S3, use o estágio de pipeline $emit com a seguinte forma de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "bucket": "<target-bucket>", "region": "<target-region>", "path": "<key-prefix>" | <expression>, "config": { "writeOptions": { "count": <doc-count>, "bytes": <threshold>, "interval": { "size": <unit-count>, "unit": "<time-denomination>" } }, "delimiter": "<delimiter>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", "compression": "gzip" | "snappy", "compressionLevel": <level> } } }
O estágio $emit recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição | |||
|---|---|---|---|---|---|---|
| string | Obrigatório | Nome, conforme exibido no Registro de conexões, da conexão na qual os dados devem ser gravados. | |||
| string | Obrigatório | Nome do bucket S3 no qual os dados devem ser gravados. | |||
| string | Opcional | Nome da região AWS em que o bucket de destino está localizado. Se você hospedar seu espaço de trabalho do processamento de fluxo em uma região da AWS, o parâmetro será definido como padrão para essa região. Caso contrário, ele assume como padrão a região AWS mais próxima da região do host da área de trabalho do processamento de fluxos. | |||
| corda | expressão | Obrigatório | Prefixo da chave dos objetos gravados no bucket S3. Deve ser uma string de prefixo literal ou uma expressão que avalie para uma string. | |||
| documento | Opcional | Documento que contém parâmetros adicionais que substituem valores padrão diversos. | |||
| documento | Opcional | Documento com parâmetros adicionais que determinam o comportamento de gravação. Esses parâmetros acionam o comportamento de gravação de acordo com o limite que for atingido primeiro. Por exemplo, se os documentos ingeridos atingirem o limite | |||
| inteiro | Opcional | Número de documentos a serem agrupados em cada arquivo escrito no S3. | |||
| inteiro | Opcional | Especifica o número mínimo de bytes que devem se acumular antes que um arquivo seja gravado no S3. A contagem de bytes é determinada pelo tamanho dos documentos BSON ingeridos pelo pipeline, e não pelo tamanho do arquivo final de saída. | |||
| documento | Opcional | Especifica um temporizador para gravação em massa de documentos como uma combinação de O padrão é 1 minuto. Não é possível configurar | |||
| inteiro | Condicional | O número de unidades especificadas por O padrão é | |||
| string | Condicional | A unidade de tempo do cronômetro de gravação em massa. Este parâmetro aceita os seguintes valores:
O padrão é | |||
| string | Opcional | Delimitador entre cada entrada no arquivo gerado. Padrão é | |||
| string | Opcional | Especifica o formato de saída do JSON gravado no S3. Deve ser um dos seguintes valores:
O padrão é " Para aprender mais, consulte JSON básico. | |||
| string | Opcional | Formato de data para o valor da data. Os valores válidos são:
Por exemplo, se você adicionar o seguinte registro ao pipeline: então, se Se | |||
| string | Opcional | Nome do algoritmo de compactação a ser utilizado. Deve ser um dos seguintes valores:
| |||
| string | Condicional | Nível de compactação a ser aplicado à mensagem emitida. Suporta valores Padrão é Este parâmetro é obrigatório e restrito a |
Basic JSON
Para facilitar a ingestão de mensagens, o Atlas Stream Processing oferece suporte ao JSON básico, que simplifica o formato RelaxedJSON. A tabela a seguir fornece exemplos dessas simplificações para todos os campos afetados.
tipo de campo | relaxedJson | basicJson |
|---|---|---|
Binário |
|
|
Data |
|
|
Decimal |
|
|
Timestamp |
|
|
ObjectId |
|
|
Infinito Negativo |
|
|
Infinito positivo |
|
|
Expressões regulares |
|
|
UUID |
|
|
Comportamento
$emit deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $emit por pipeline.
Você só pode escrever em uma única coleção de séries temporais do Atlas por processador de stream. Se você especificar uma collection que não existe, o Atlas criará a collection com os campos de série temporal especificados. Você deve especificar um banco de dados existente.
Você pode usar uma expressão dinâmica como o valor dos campos topic, db e coll para habilitar o processador de fluxo a gravar em destinos diferentes, mensagem a mensagem. A expressão deve avaliar para uma string.
Exemplo
Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para classificar cada um deles em um tópico distinto do Apache Kafka, você pode escrever o seguinte estágio $emit:
{ "$emit": { "connectionName": "kafka1", "topic": "$customerStatus" } }
Este estágio $emit :
Escreve a mensagem
Very Important Industriespara um tópico denominadoVIP.Escreve a mensagem
N. E. Buddypara um tópico denominadoemployee.Escreve a mensagem
Khan Traktorpara um tópico denominadocontractor.
Para mais informações sobre expressões dinâmicas, consulte operadores de expressão.
Se você especificar um tópico que ainda não existe, o Apache Kafka criará automaticamente o tópico quando receber a primeira mensagem destinada a ele.
Se você especificar um tópico com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão de uma determinada mensagem, ele enviará essa mensagem para a fila de mensagens não entregues (Dead Letter Queue, DLQ), se ela estiver configurada, e processará as mensagens subsequentes. Se não houver uma DLQ configurada, o Atlas Stream Processing ignorará completamente a mensagem e processará as mensagens subsequentes.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:
O estágio
$sourceestabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamadomy_weatherdata, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime.O estágio
$matchexclui documentos que têm umairTemperature.valuemaior ou igual a30.0e passa os documentos com umairTemperature.valuemenor que30.0para o próximo estágio.O estágio
$addFieldsenriquece o fluxo com metadados.O estágio
$emitgrava a saída em um tópico chamadostreampela conexão do agenteweatherStreamOutputKafka.
{ "$source": { "connectionName": "sample_weatherdata", "topic": "my_weatherdata", "tsFieldName": "ingestionTime" } }, { "$match": { "airTemperature.value": { "$lt": 30 } } }, { "$addFields": { "processorMetadata": { "$meta": "stream" } } }, { "$emit": { "connectionName": "weatherStreamOutput", "topic": "stream" } }
Os documentos no tópico stream têm o seguinte formato:
{ "st": "x+34700+119500", "position": { "type": "Point", "coordinates": [122.8, 116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1", "AG1", "UG1", "SA1", "MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight": { "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime": { "$date": "2024-09-26T17:34:41.843Z" } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.