Definição
O estágio $emit
especifica uma conexão no Registro de conexão para a qual emitir mensagens. A conexão deve ser um Corretor Apache Kafka ou uma coleção de séries temporais.
Sintaxe
Corretora Apache Kafka
Para gravar dados processados em um agente Apache Kafka, use o estágio de pipeline $emit
com o seguinte formulário de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "topic": "<target-topic>" | <expression>, "config": { "acks": <number-of-acknowledgements>, "compression_type": "<compression-type>", "dateFormat": "default" | "ISO8601", "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<serialization-type>", "outputFormat": "<json-format>", "tombstoneWhen": <expression> } } }
O estágio $emit
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição | |||||
---|---|---|---|---|---|---|---|---|
| string | Obrigatório | Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados. | |||||
| corda | expressão | Obrigatório | Nome do tópico do Apache Kafka para o qual emitir mensagens. | |||||
| documento | Opcional | documento que contém campo que substituem vários valores padrão. | |||||
| int | Opcional | Número de confirmações necessárias do cluster Apache Kafka para uma operação O valor padrão é
| |||||
| string | Opcional | Tipo de compactação para todos os dados gerados pelo produtor. O padrão é nenhum (ou seja, sem compactação). Os valores válidos são:
A compactação é usada para lotes completos de dados, portanto, a eficácia do agrupamento impacta a taxa de compactação; mais agrupamento resulta em melhor compactação. | |||||
| string | Opcional | Formato de data para o valor da data. Os valores válidos são:
Por exemplo: Considere a seguinte entrada:
Se
Se
| |||||
| expressão | Opcional | Cabeçalhos a serem adicionados à mensagem de saída. A expressão deve ser avaliada como um objeto ou uma array. Se a expressão for avaliada como um objeto, o Atlas Stream Processing constrói um cabeçalho a partir de cada par de chave-valor nesse objeto, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho. Se a expressão for avaliada como uma array, ela deverá assumir a forma de uma array de objetos de pares de valores-chave. Por exemplo:
O Atlas Stream Processing constrói um cabeçalho a partir de cada objeto na array, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho. O Atlas Stream Processing é compatível com valores de cabeçalho dos seguintes tipos:
| |||||
| objeto | string | expressão | Opcional | Expressão que avalia para um Apache Kafka chave da mensagem. Se você especificar | |||||
| string | Condicional | Tipo de dados usado para serializar os principais dados do Apache Kafka. Deve ser um dos seguintes valores:
O padrão é | |||||
| string | Opcional | Formato JSON a ser usado ao emitir mensagens para o Apache Kafka. Deve ser um dos seguintes valores:
Padrão é | |||||
| expressão | Opcional | Expressão que determina quando emitir Se a expressão falhar ao avaliar para um valor booleano, ou não puder ser avaliada, o Atlas Stream Processing gravará o documento no DLQ. Esta configuração pode ser utilizada para habilitar a compactação de tópico se você fornecer valores |
Atlas Time Series Collection
Para gravar dados processados em uma coleção de séries temporais do Atlas, use o estágio de pipeline $emit
com o seguinte formulário de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "db": "<target-db>" | <expression>, "coll": "<target-coll>" | <expression>, "timeseries": { <options> } } }
O estágio $emit
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
| string | Obrigatório | Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados. |
| corda | expressão | Obrigatório | Nome ou expressão que se resolve para o banco de dados do Atlas que contém a coleção de séries temporais de destino. |
| corda | expressão | Obrigatório | Nome de ou expressão que se resolve para a coleção de séries temporais do Atlas para gravação. |
| documento | Obrigatório | Documento que define os campos de série temporal para a coleção. |
Observação
O tamanho máximo para documentos em uma coleção de séries temporais é de 4 MB. Para saber mais, consulte Limitações de Coleção de Séries Temporais.
AWS S3
Para gravar dados processados em uma conexão de destino de bucket AWS S3, use o estágio de pipeline $emit
com a seguinte forma de protótipo:
{ "$emit": { "connectionName": "<registered-connection>", "bucket": "<target-bucket>", "region": "<target-region>", "path": "<key-prefix>" | <expression>, "config": { "writeOptions": { "count": <doc-count>, "bytes": <threshold>, "interval": { "size": <unit-count>, "unit": "<time-denomination>" } }, "delimiter": "<delimiter>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", "compression": "gzip" | "snappy", "compressionLevel": <level> } } }
O estágio $emit
recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição | |||
---|---|---|---|---|---|---|
| string | Obrigatório | Nome, conforme exibido no Registro de conexões, da conexão na qual os dados devem ser gravados. | |||
| string | Obrigatório | Nome do bucket S3 no qual os dados devem ser gravados. | |||
| string | Opcional | Nome da região AWS em que o bucket de destino está localizado. Se você hospedar sua instância de processamento de stream em uma região da AWS, o parâmetro será definido como padrão para essa região. Caso contrário, ele assume como padrão a região AWS mais próxima da região do host da instância em processamento de fluxo. | |||
| corda | expressão | Obrigatório | Prefixo da chave dos objetos gravados no bucket S3. Deve ser uma string de prefixo literal ou uma expressão que avalie para uma string. | |||
| documento | Opcional | Documento que contém parâmetros adicionais que substituem valores padrão diversos. | |||
| documento | Opcional | Documento com parâmetros adicionais que determinam o comportamento de gravação. Esses parâmetros acionam o comportamento de gravação de acordo com o limite que for atingido primeiro. Por exemplo, se os documentos ingeridos atingirem o limite | |||
| inteiro | Opcional | Número de documentos a serem agrupados em cada arquivo escrito no S3. | |||
| inteiro | Opcional | Especifica o número mínimo de bytes que devem se acumular antes que um arquivo seja gravado no S3. A contagem de bytes é determinada pelo tamanho dos documentos BSON ingeridos pelo pipeline, e não pelo tamanho do arquivo final de saída. | |||
| documento | Opcional | Especifica um temporizador para gravação em massa de documentos como uma combinação de O padrão é 1 minuto. Não é possível configurar | |||
| inteiro | Condicional | O número de unidades especificadas por O padrão é | |||
| string | Condicional | A unidade de tempo do cronômetro de gravação em massa. Este parâmetro aceita os seguintes valores:
O padrão é | |||
| string | Opcional | Delimitador entre cada entrada no arquivo gerado. Padrão é | |||
| string | Opcional | Especifica o formato de saída do JSON gravado no S3. Deve ser um dos seguintes valores:
O padrão é " Para aprender mais, consulte JSON básico. | |||
| string | Opcional | Formato de data para o valor da data. Os valores válidos são:
Por exemplo, se você adicionar o seguinte registro ao pipeline:
então, se
Se
| |||
| string | Opcional | Nome do algoritmo de compactação a ser utilizado. Deve ser um dos seguintes valores:
| |||
| string | Condicional | Nível de compactação a ser aplicado à mensagem emitida. Suporta valores Padrão é Este parâmetro é obrigatório e restrito a |
Basic JSON
Para facilitar a ingestão de mensagens, o Atlas Stream Processing oferece suporte ao JSON básico, que simplifica o formato RelaxedJSON. A tabela a seguir fornece exemplos dessas simplificações para todos os campos afetados.
tipo de campo | relaxedJson | basicJson |
---|---|---|
Binário |
|
|
Data |
|
|
Decimal |
|
|
Timestamp |
|
|
ObjectId |
|
|
Infinito Negativo |
|
|
Infinito positivo |
|
|
Expressões regulares |
|
|
UUID |
|
|
Comportamento
$emit
deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $emit
por pipeline.
Você só pode escrever em uma única coleção de séries temporais do Atlas por processador de stream. Se você especificar uma collection que não existe, o Atlas criará a collection com os campos de série temporal especificados. Você deve especificar um banco de dados existente.
Você pode usar uma expressão dinâmica como o valor dos campos topic
, db
e coll
para habilitar o processador de fluxo a gravar em destinos diferentes, mensagem a mensagem. A expressão deve avaliar para uma string.
Exemplo
Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para classificar cada um deles em um tópico distinto do Apache Kafka, você pode escrever o seguinte estágio $emit
:
{ "$emit": { "connectionName": "kafka1", "topic": "$customerStatus" } }
Este estágio $emit
:
Escreve a mensagem
Very Important Industries
para um tópico denominadoVIP
.Escreve a mensagem
N. E. Buddy
para um tópico denominadoemployee
.Escreve a mensagem
Khan Traktor
para um tópico denominadocontractor
.
Para mais informações sobre expressões dinâmicas, consulte operadores de expressão.
Se você especificar um tópico que ainda não existe, o Apache Kafka criará automaticamente o tópico quando receber a primeira mensagem destinada a ele.
Se você especificar um tópico com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão de uma determinada mensagem, ele enviará essa mensagem para a fila de mensagens não entregues (Dead Letter Queue, DLQ), se ela estiver configurada, e processará as mensagens subsequentes. Se não houver uma DLQ configurada, o Atlas Stream Processing ignorará completamente a mensagem e processará as mensagens subsequentes.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:
O estágio
$source
estabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamadomy_weatherdata
, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime
.O estágio
$match
exclui documentos que têm umairTemperature.value
maior ou igual a30.0
e passa os documentos com umairTemperature.value
menor que30.0
para o próximo estágio.O estágio
$addFields
enriquece o fluxo com metadados.O estágio
$emit
grava a saída em um tópico chamadostream
pela conexão do agenteweatherStreamOutput
Kafka.
{ "$source": { "connectionName": "sample_weatherdata", "topic": "my_weatherdata", "tsFieldName": "ingestionTime" } }, { "$match": { "airTemperature.value": { "$lt": 30 } } }, { "$addFields": { "processorMetadata": { "$meta": "stream" } } }, { "$emit": { "connectionName": "weatherStreamOutput", "topic": "stream" } }
Os documentos no tópico stream
têm o seguinte formato:
{ "st": "x+34700+119500", "position": { "type": "Point", "coordinates": [122.8, 116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1", "AG1", "UG1", "SA1", "MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight": { "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime": { "$date": "2024-09-26T17:34:41.843Z" }, "_stream_meta": { "source": { "type": "kafka", "topic": "my_weatherdata", "partition": 0, "offset": 4285 } } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.