Menu Docs
Página inicial do Docs
/ /
/ / /

$emit Estágio de agregação (processamento de fluxo)

O estágio $emit especifica uma conexão no Registro de conexão para a qual emitir mensagens. A conexão deve ser um Corretor Apache Kafka ou uma coleção de séries temporais.

Para gravar dados processados em um agente Apache Kafka, use o estágio de pipeline $emit com o seguinte formulário de protótipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic": "<target-topic>" | <expression>,
"config": {
"acks": <number-of-acknowledgements>,
"compression_type": "<compression-type>",
"dateFormat": "default" | "ISO8601",
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<serialization-type>",
"outputFormat": "<json-format>",
"tombstoneWhen": <expression>
}
}
}

O estágio $emit recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Obrigatório

Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados.

topic

corda | expressão

Obrigatório

Nome do tópico do Apache Kafka para o qual emitir mensagens.

config

documento

Opcional

documento que contém campo que substituem vários valores padrão.

config.acks

int

Opcional

Número de confirmações necessárias do cluster Apache Kafka para uma operação $emit bem-sucedida.

O valor padrão é all. O Atlas Stream Processing é compatível com os seguintes valores:

  • -1

  • 0

  • 1

  • all

config.compression_type

string

Opcional

Tipo de compactação para todos os dados gerados pelo produtor. O padrão é nenhum (ou seja, sem compactação). Os valores válidos são:

  • none

  • gzip

  • snappy

  • lz4

  • zstd

A compactação é usada para lotes completos de dados, portanto, a eficácia do agrupamento impacta a taxa de compactação; mais agrupamento resulta em melhor compactação.

config.dateFormat

string

Opcional

Formato de data para o valor da data. Os valores válidos são:

  • default - para usar o padrão do outputFormat.

  • ISO8601 - para converter datas em strings no formato ISO8601, que inclui precisão de milissegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por exemplo:

Considere a seguinte entrada:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

Se $emit.config.dateFormat estiver definido como default, a saída será semelhante à seguinte:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Se $emit.config.dateFormat estiver definido como ISO8601, a saída será semelhante à seguinte:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.headers

expressão

Opcional

Cabeçalhos a serem adicionados à mensagem de saída. A expressão deve ser avaliada como um objeto ou uma array.

Se a expressão for avaliada como um objeto, o Atlas Stream Processing constrói um cabeçalho a partir de cada par de chave-valor nesse objeto, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho.

Se a expressão for avaliada como uma array, ela deverá assumir a forma de uma array de objetos de pares de valores-chave. Por exemplo:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

O Atlas Stream Processing constrói um cabeçalho a partir de cada objeto na array, em que a chave é o nome do cabeçalho e o valor é o valor do cabeçalho. O Atlas Stream Processing é compatível com valores de cabeçalho dos seguintes tipos:

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

objeto | string | expressão

Opcional

Expressão que avalia para um Apache Kafka chave da mensagem.

Se você especificar config.key, deverá especificar config.keyFormat.

config.keyFormat

string

Condicional

Tipo de dados usado para serializar os principais dados do Apache Kafka. Deve ser um dos seguintes valores:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

O padrão é binData. Se você especificar config.key, você deve especificar config.keyFormat. Se o config.key de um documento não for serializado com êxito para o tipo de dados especificado, o Atlas Stream Processing o enviará para sua fila de mensagens não entregues (DLQ).

config.outputFormat

string

Opcional

Formato JSON a ser usado ao emitir mensagens para o Apache Kafka. Deve ser um dos seguintes valores:

  • "relaxedJson"

  • "canonicalJson"

Padrão é "relaxedJson".

config.tombstoneWhen

expressão

Opcional

Expressão que determina quando emitir null para o Kafka. A expressão deve avaliar para booleano true ou false. Quando a expressão é avaliada como true para um determinado documento, o Atlas Stream Processing emite um null em seu lugar no coletor do Kafka. Quando a expressão é avaliada como falsa, o Atlas Stream Processing emite o documento conforme ele existe quando atinge o estágio $emit.

Se a expressão falhar ao avaliar para um valor booleano, ou não puder ser avaliada, o Atlas Stream Processing gravará o documento no DLQ.

Esta configuração pode ser utilizada para habilitar a compactação de tópico se você fornecer valores $emit.config.key e $emit.config.keyFormat. Se você não fornecer esses valores, o Atlas Stream Processing ainda emite null quando essa expressão é avaliada como true, mas eles não acionam a compactação de tópico do Kafka.

Para gravar dados processados em uma coleção de séries temporais do Atlas, use o estágio de pipeline $emit com o seguinte formulário de protótipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"db": "<target-db>" | <expression>,
"coll": "<target-coll>" | <expression>,
"timeseries": {
<options>
}
}
}

O estágio $emit recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Obrigatório

Nome, conforme exibido no Registro de Conexões, da conexão da qual ingerir dados.

db

corda | expressão

Obrigatório

Nome ou expressão que se resolve para o banco de dados do Atlas que contém a coleção de séries temporais de destino.

coll

corda | expressão

Obrigatório

Nome de ou expressão que se resolve para a coleção de séries temporais do Atlas para gravação.

timeseries

documento

Obrigatório

Documento que define os campos de série temporal para a coleção.

Observação

O tamanho máximo para documentos em uma coleção de séries temporais é de 4 MB. Para saber mais, consulte Limitações de Coleção de Séries Temporais.

Para gravar dados processados em uma conexão de destino de bucket AWS S3, use o estágio de pipeline $emit com a seguinte forma de protótipo:

{
"$emit": {
"connectionName": "<registered-connection>",
"bucket": "<target-bucket>",
"region": "<target-region>",
"path": "<key-prefix>" | <expression>,
"config": {
"writeOptions": {
"count": <doc-count>,
"bytes": <threshold>,
"interval": {
"size": <unit-count>,
"unit": "<time-denomination>"
}
},
"delimiter": "<delimiter>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
"compression": "gzip" | "snappy",
"compressionLevel": <level>
}
}
}

O estágio $emit recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Obrigatório

Nome, conforme exibido no Registro de conexões, da conexão na qual os dados devem ser gravados.

bucket

string

Obrigatório

Nome do bucket S3 no qual os dados devem ser gravados.

region

string

Opcional

Nome da região AWS em que o bucket de destino está localizado. Se você hospedar sua instância de processamento de stream em uma região da AWS, o parâmetro será definido como padrão para essa região. Caso contrário, ele assume como padrão a região AWS mais próxima da região do host da instância em processamento de fluxo.

path

corda | expressão

Obrigatório

Prefixo da chave dos objetos gravados no bucket S3. Deve ser uma string de prefixo literal ou uma expressão que avalie para uma string.

config

documento

Opcional

Documento que contém parâmetros adicionais que substituem valores padrão diversos.

config.writeOptions

documento

Opcional

Documento com parâmetros adicionais que determinam o comportamento de gravação. Esses parâmetros acionam o comportamento de gravação de acordo com o limite que for atingido primeiro.

Por exemplo, se os documentos ingeridos atingirem o limite config.writeOptions.count sem atingir o limite config.writeOptions.interval, o processador de fluxo ainda emitirá esses documentos para o S3 de acordo com o limite config.writeOptions.count.

config.writeOptions.count

inteiro

Opcional

Número de documentos a serem agrupados em cada arquivo escrito no S3.

config.writeOptions.bytes

inteiro

Opcional

Especifica o número mínimo de bytes que devem se acumular antes que um arquivo seja gravado no S3. A contagem de bytes é determinada pelo tamanho dos documentos BSON ingeridos pelo pipeline, e não pelo tamanho do arquivo final de saída.

config.writeOptions.interval

documento

Opcional

Especifica um temporizador para gravação em massa de documentos como uma combinação de size e units.

O padrão é 1 minuto. Não é possível configurar size como 0 em nenhuma unit. O intervalo máximo é de 7 dias.

config.writeOptions.interval.size

inteiro

Condicional

O número de unidades especificadas por writeOptions.interval.units após o qual o processador de fluxo grava documentos em massa no S3.

O padrão é 1. Não é possível definir um size de 0. Se você definir writeOptions.interval, será necessário definir este parâmetro também.

config.writeOptions.interval.units

string

Condicional

A unidade de tempo do cronômetro de gravação em massa. Este parâmetro aceita os seguintes valores:

  • ms

  • second

  • minute

  • hour

  • day

O padrão é minute. Se você definir writeOptions.interval, será necessário este parâmetro também.

config.delimiter

string

Opcional

Delimitador entre cada entrada no arquivo gerado.

Padrão é \n.

config.outputFormat

string

Opcional

Especifica o formato de saída do JSON gravado no S3. Deve ser um dos seguintes valores:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

O padrão é "relaxedJson".

Para aprender mais, consulte JSON básico.

config.dateFormat

string

Opcional

Formato de data para o valor da data. Os valores válidos são:

  • default - para usar o padrão do outputFormat.

  • ISO8601 - para converter datas em strings no formato ISO8601, que inclui precisão de milissegundos (YYYY-MM-DDTHH:mm:ss.sssZ).

Por exemplo, se você adicionar o seguinte registro ao pipeline:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

então, se $emit.config.dateFormat estiver definido como default, a saída será semelhante à seguinte:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

Se $emit.config.dateFormat estiver definido como ISO8601, a saída será semelhante à seguinte:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.compression

string

Opcional

Nome do algoritmo de compactação a ser utilizado. Deve ser um dos seguintes valores:

  • "gzip"

  • "snappy"

config.compressionLevel

string

Condicional

Nível de compactação a ser aplicado à mensagem emitida. Suporta valores 1-9 inclusivos; valores mais altos significam mais compactação.

Padrão é 6.

Este parâmetro é obrigatório e restrito a gzip. Se você definir config.compression como snappy, este parâmetro não terá efeito caso seja definido.

Para facilitar a ingestão de mensagens, o Atlas Stream Processing oferece suporte ao JSON básico, que simplifica o formato RelaxedJSON. A tabela a seguir fornece exemplos dessas simplificações para todos os campos afetados.

tipo de campo
relaxedJson
basicJson

Binário

{ "binary": { "$binary": { "base64": "gf1UcxdHTJ2HQ/EGQrO7mQ==", "subType": "00" }}}

{ "binary": "gf1UcxdHTJ2HQ/EGQrO7mQ=="}

Data

{ "date": { "$date": "2024-10-24T18:07:29.636Z"}}

{ "date": 1729625275856}

Decimal

{ "decimal": { "$numberDecimal": "9.9" }}

{ "decimal": "9.9" }

Timestamp

{ "timestamp": { "$timestamp": { "t": 1729793249, "i": 1 }}}

{ "timestamp": 1729793249000}

ObjectId

{ "_id": { "$oid": "671a8ce1497407eff0e17cba" }}

{ "_id": "6717fcbba18c8a8f74b6d977" }

Infinito Negativo

{ "negInf": { "$numberDouble": "-Infinity" }}

{ "negInf": "-Infinity" }

Infinito positivo

{ "posInf": { "$numberDouble": "Infinity" }}

{ "posInf": "Infinity" }

Expressões regulares

{ "regex": { "$regularExpression": { "pattern": "ab+c", "options": "i" }}}

{ "regex": { "pattern": "ab+c", "options": "i" }}

UUID

{ "uuid": { "$binary": { "base64": "Kat+fHk6RkuAmotUmsU7gA==", "subType": "04" }}}

{ "uuid": "420b7ade-811a-4698-aa64-c8347c719cf1"}

$emit deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $emit por pipeline.

Você só pode escrever em uma única coleção de séries temporais do Atlas por processador de stream. Se você especificar uma collection que não existe, o Atlas criará a collection com os campos de série temporal especificados. Você deve especificar um banco de dados existente.

Você pode usar uma expressão dinâmica como o valor dos campos topic, db e coll para habilitar o processador de fluxo a gravar em destinos diferentes, mensagem a mensagem. A expressão deve avaliar para uma string.

Exemplo

Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para classificar cada um deles em um tópico distinto do Apache Kafka, você pode escrever o seguinte estágio $emit:

{
"$emit": {
"connectionName": "kafka1",
"topic": "$customerStatus"
}
}

Este estágio $emit :

  • Escreve a mensagem Very Important Industries para um tópico denominado VIP.

  • Escreve a mensagem N. E. Buddy para um tópico denominado employee.

  • Escreve a mensagem Khan Traktor para um tópico denominado contractor.

Para mais informações sobre expressões dinâmicas, consulte operadores de expressão.

Se você especificar um tópico que ainda não existe, o Apache Kafka criará automaticamente o tópico quando receber a primeira mensagem destinada a ele.

Se você especificar um tópico com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão de uma determinada mensagem, ele enviará essa mensagem para a fila de mensagens não entregues (Dead Letter Queue, DLQ), se ela estiver configurada, e processará as mensagens subsequentes. Se não houver uma DLQ configurada, o Atlas Stream Processing ignorará completamente a mensagem e processará as mensagens subsequentes.

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:

  1. O estágio $source estabelece uma conexão com o broker do Apache Kafka coletando esses relatórios em um tópico chamado my_weatherdata, expondo cada registro à medida que ele é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o como ingestionTime.

  2. O estágio $match exclui documentos que têm um airTemperature.value maior ou igual a 30.0 e passa os documentos com um airTemperature.value menor que 30.0 para o próximo estágio.

  3. O estágio $addFields enriquece o fluxo com metadados.

  4. O estágio $emit grava a saída em um tópico chamado stream pela conexão do agente weatherStreamOutput Kafka.

{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata",
"tsFieldName": "ingestionTime"
}
},
{
"$match": {
"airTemperature.value": {
"$lt": 30
}
}
},
{
"$addFields": {
"processorMetadata": {
"$meta": "stream"
}
}
},
{
"$emit": {
"connectionName": "weatherStreamOutput",
"topic": "stream"
}
}

Os documentos no tópico stream têm o seguinte formato:

{
"st": "x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8, 116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1", "AG1", "UG1", "SA1", "MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight": {
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime": {
"$date": "2024-09-26T17:34:41.843Z"
},
"_stream_meta": {
"source": {
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

Voltar

$tumblingWindow

Nesta página