Menu Docs
Página inicial do Docs
/
Atlas
/ /

$source Estágio (Stream Processing)

$source

O estágio$source especifica uma conexão noRegistro de Conexão para transmitir dados. Os seguintes tipos de conexão são suportados:

Observação

Você não pode usar instâncias sem servidor do Atlas como um $source.

Para operar em dados de transmissão de um intermediário Apache Kafka, o estágio $source tem a seguinte forma de protótipo:

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

O estágio $source recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Obrigatório

Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados.

topic

cadeia de caracteres ou matriz de cadeias de caracteres

Obrigatório

Nome de um ou mais tópicos do Apache Kafka dos quais transmitir mensagens. Se você quiser transmitir mensagens de mais de um tópico, especifique-os em uma array.

timeField

documento

Opcional

documento que define um carimbo de data/hora oficial para mensagens recebidas.

Se você usar timeField, deverá defini-lo como um dos seguintes:

  • uma expressão $toDate que usa um campo de mensagem de origem como argumento

  • uma expressão $dateFromString que usa um campo de mensagem de origem como argumento.

Se você não declarar um timeField, o Atlas Stream Processing criará um carimbo de data/hora a partir do carimbo de data/hora da mensagem fornecido pela origem.

partitionIdleTimeout

documento

Opcional

documento que especifica a quantidade de tempo que uma partição pode ficar ociosa antes de ser ignorada nos cálculos de marca d'agua.

Este campo é desabilitado por padrão. Para lidar com partições que não progridem devido à inatividade, atribua um valor a este campo.

partitionIdleTimeout.size

inteiro

Opcional

Número que especifica a duração do tempo limite de inatividade da partição.

partitionIdleTimeout.unit

string

Opcional

Unidade de tempo para a duração do tempo limite de inatividade da partição.

O valor de unit pode ser um dos seguintes:

  • "ms" (milésimo de segundo)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

documento

Opcional

documento que contém campo que substituem vários valores padrão.

config.auto_offset_reset

string

Opcional

Especifica qual evento no Apache Kafka tópico de origem para iniciar a ingestão. auto_offset_reset assume os seguintes valores:

  • end, latest ou largest : para iniciar a ingestão a partir do evento mais recente no tópico no momento em que a agregação é inicializada.

  • earliest, beginning ou smallest : para iniciar a ingestão a partir do evento mais antigo no tópico.

Padrão é latest.

config.group_id

string

Opcional

ID do grupo de consumidores Kafka a ser associado ao processador de stream. Se omitido, o Atlas Stream Processing associa a instância de processamento de stream a um ID gerado automaticamente no seguinte formato:

asp-${streamProcessorId}-consumer

O Atlas Stream Processing confirma deslocamentos de partição para o intermediário Apache Kafka para o ID do grupo de consumidores especificado depois que um checkpoint é confirmado. Ele confirma um deslocamento quando as mensagens até esse deslocamento são registradas de forma duradoura em um checkpoint. Isso permite que você acompanhe o atraso de compensação e o progresso do processador de fluxo diretamente dos metadados do grupo de consumidores do corretor Kafka.

config.keyFormat

string

Opcional

Tipo de dados usado para desserializar dados-chave do Apache Kafka. Deve ser um dos seguintes valores:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Padrão é binData.

config.keyFormatError

string

Opcional

Como lidar com erros encontrados ao desserializar os dados-chave do Apache Kafka. Deve ser um dos seguintes valores:

Observação

O Atlas Stream Processing requer que os documentos no fluxo de dados de origem sejam json ou ejson válidos. O Atlas Stream Processing define os documentos que não atendem a esse requisito na sua fila de mensagens não entregues, se você tiver configurado uma.

Um fluxo de alterações de coleção do Atlas permite que aplicativos acessem mudanças de dados em tempo real em uma única coleção. Para aprender como abrir um fluxo de alterações em uma coleção, veja Change Streams.

Para operar na transmissão de dados de um change stream do Atlas, o estágio $source tem o seguinte formato de protótipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"initialSync": {
"enable": <boolean>,
"parallelism": <integer>
},
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}],
"maxAwaitTimeMS": <time-ms>,
"readPreference": "<read-preference>",
"readPreferenceTags": [
{"<key>": "<value>"},
. . .
]
}
}
}

O estágio $source recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Condicional

Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados.

timeField

documento

Opcional

documento que define um carimbo de data/hora oficial para mensagens recebidas.

Se você usar timeField, deverá defini-lo como um dos seguintes:

  • uma expressão $toDate que usa um campo de mensagem de origem como argumento

  • uma expressão $dateFromString que usa um campo de mensagem de origem como argumento.

Se você não declarar um timeField, o Atlas Stream Processing criará um carimbo de data/hora a partir do carimbo de data/hora da mensagem fornecido pela origem.

db

string

Obrigatório

Nome de um banco de dados MongoDB hospedado na instância do Atlas especificado por connectionName. O change stream desse banco de dados atua como a fonte de dados de streaming.

coll

cadeia de caracteres ou matriz de cadeias de caracteres

Obrigatório

Nome de uma ou mais collections MongoDB hospedadas na instância do Atlas especificada por connectionName. O change stream dessas collections atua como a fonte de dados de streaming . Se você omitir este campo, seu processador de stream obterá a partir de um MongoDB Database Change Stream.

initialSync

documento

Opcional

Documento contendo campos referentes à funcionalidade initialSync.

O Atlas Stream Processing initialSync permite a você ingestão de documentos preexistentes em uma Atlas collection como se fossem documentos changeEvent inseridos. Se você habilitar initialSync, ao iniciar o processador de fluxo, ele primeiro fará a ingestão e o processamento de todos os documentos existentes na coleção antes de prosseguir para a ingestão e o processamento dos novos documentos changeEvent recebidos. Depois que o initialSync estiver concluído, ele não se repetirá.

Se você habilitar initialSync, não poderá usar os estágios $hoppingWindow, $sessionWindow ou $tumblingWindow em seu pipeline.

Importante

Você só pode usar initialSync em collections em que o valor _id dos documentos recebidos sejam valores ObjectId gerados por padrão ou valores int/long ordenados. Todos os valores de _id devem ser do mesmo tipo.

initialSync.enable

booleano

Condicional

Determina se deseja ou não habilitar o initialSync. Se você declarar um campo initialSync, deverá definir este campo.

initialSync.parallelism

inteiro

Opcional

Determina o nível de paralelismo com o qual processar a operação initialSync. Se você não especificar um valor, o padrão será 1.

config

documento

Opcional

documento que contém campo que substituem vários valores padrão.

config.startAfter

token

Condicional

O evento de alteração após o qual a fonte começa a relatar. Isso assume a forma de um resume token.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

config.startAtOperationTime

timestamp

Condicional

O tempo de operação após o qual a fonte deve começar a relatar.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

Aceita JSON estendido do MongoDB valores de $date ou $timestamp.

config.fullDocument

string

Condicional

Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:

  • updateLookup : retorna apenas as alterações na atualização.

  • required : Deve retornar um documento completo. Se um documento completo não estiver disponível, não retornará nada.

  • whenAvailable : retorna um documento completo sempre que houver um disponível, caso contrário, retorna alterações.

Se você não especificar um valor para fullDocument, o padrão será updateLookup.

Para usar este campo com um fluxo de alterações de coleção, você deve habilitar o fluxo de alterações pré e pós-imagens nessa coleção.

config.fullDocumentOnly

booleano

Condicional

Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de fullDocument. Se definido como true, a origem retornará somente o conteúdo de fullDocument.

Para usar este campo com um fluxo de alterações de coleção, você deve habilitar o fluxo de alterações pré e pós-imagens nessa coleção.

config.fullDocumentBeforeChange

string

Opcional

Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:

  • off : omite o campo fullDocumentBeforeChange .

  • required : deve retornar um documento completo em seu estado anterior às alterações. Se um documento completo em seu estado anterior às alterações não estiver disponível, o processador de fluxo falhará.

  • whenAvailable : retorna um documento completo em seu estado anterior às alterações sempre que um estiver disponível, caso contrário, omite o campo fullDocumentBeforeChange .

Se você não especificar um valor para fullDocumentBeforeChange, o padrão será off.

Para usar este campo com um fluxo de alterações de coleção, você deve habilitar o fluxo de alterações pré e pós-imagens nessa coleção.

config.pipeline

documento

Opcional

Especifica um pipeline de agregação para filtrar a saída do fluxo de alterações antes de passá-la para processamento adicional. Este pipeline deve estar em conformidade com os parâmetros descritos em Modificar Saída do Fluxo de Alterações.

Importante

Cada Alterar evento inclui os campos wallTime e clusterTime. Os estágios de Atlas Stream Processing após $source esperam receber estes campos como o processador os ingeriu. Para garantir o processamento adequado dos dados do Change Stream, não modifique esses campos no $source.config.pipeline.

config.readPreference

documento

Opcional

Preferência de leitura para operações do initialSync.

Padrão é primary.

config.readPreferenceTags

documento

Opcional

Leia as tags de preferência para operações do initialSync.

config.maxAwaitTimeMS

inteiro

Opcional

Tempo máximo, em milissegundos, para aguardar que novas alterações de dados sejam relatadas ao cursor do fluxo de alterações antes de retornar um lote vazio.

Padrão é 1000.

Um fluxo de alterações do banco de dados Atlas permite que aplicativos acessem mudanças de dados em tempo real em um único banco de dados. Para saber como abrir um fluxo de alteração em um banco de dados, consulte Fluxos de alteração.

Para operar na transmissão de dados de um fluxo de alteração de banco de dados do Atlas, o estágio $source tem o seguinte formato de protótipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

O estágio $source recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Condicional

Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados.

timeField

documento

Opcional

documento que define um carimbo de data/hora oficial para mensagens recebidas.

Se você usar timeField, deverá defini-lo como um dos seguintes:

  • uma expressão $toDate que usa um campo de mensagem de origem como argumento

  • uma expressão $dateFromString que usa um campo de mensagem de origem como argumento.

Se você não declarar um timeField, o Atlas Stream Processing criará um carimbo de data/hora a partir do carimbo de data/hora da mensagem fornecido pela origem.

db

string

Obrigatório

Nome de um banco de dados MongoDB hospedado na instância do Atlas especificado por connectionName. O change stream desse banco de dados atua como a fonte de dados de streaming.

config

documento

Opcional

documento que contém campo que substituem vários valores padrão.

config.startAfter

token

Condicional

O evento de alteração após o qual a fonte começa a relatar. Isso assume a forma de um resume token.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

config.startAtOperationTime

timestamp

Condicional

O tempo de operação após o qual a fonte deve começar a relatar.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

Aceita JSON estendido do MongoDB valores de $date ou $timestamp.

config.fullDocument

string

Condicional

Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:

  • updateLookup : retorna apenas as alterações na atualização.

  • required : Deve retornar um documento completo. Se um documento completo não estiver disponível, não retornará nada.

  • whenAvailable : retorna um documento completo sempre que houver um disponível, caso contrário, retorna alterações.

Se você não especificar um valor para fullDocument, o padrão será updateLookup.

Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados.

config.fullDocumentOnly

booleano

Condicional

Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de fullDocument. Se definido como true, a origem retornará somente o conteúdo de fullDocument.

Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados.

config.fullDocumentBeforeChange

string

Opcional

Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:

  • off : omite o campo fullDocumentBeforeChange .

  • required : deve retornar um documento completo em seu estado anterior às alterações. Se um documento completo em seu estado anterior às alterações não estiver disponível, o processador de fluxo falhará.

  • whenAvailable : retorna um documento completo em seu estado anterior às alterações sempre que um estiver disponível, caso contrário, omite o campo fullDocumentBeforeChange .

Se você não especificar um valor para fullDocumentBeforeChange, o padrão será off.

Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados.

config.pipeline

documento

Opcional

Especifica um pipeline de agregação para filtrar a saída do fluxo de alterações no ponto de origem. Este pipeline deve estar em conformidade com os parâmetros descritos em Modificar Saída do Fluxo de Alterações.

Importante

Cada Alterar evento inclui os campos wallTime e clusterTime. Os estágios de Atlas Stream Processing após $source esperam receber estes campos como o processador os ingeriu. Para garantir o processamento adequado dos dados do Change Stream, não modifique esses campos no $source.config.pipeline.

config.maxAwaitTimeMS

inteiro

Opcional

Tempo máximo, em milissegundos, para aguardar que novas alterações de dados sejam relatadas ao cursor do fluxo de alterações antes de retornar um lote vazio.

Padrão é 1000.

Para operar em dados de streaming de um fluxo de mudança de cluster inteiro do Atlas, o estágio $source tem a seguinte forma de protótipo:

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

O estágio $source recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

connectionName

string

Condicional

Etiqueta que identifica a conexão no Registro de conexão, para ingestão de dados.

timeField

documento

Opcional

documento que define um carimbo de data/hora oficial para mensagens recebidas.

Se você usar timeField, deverá defini-lo como um dos seguintes:

  • uma expressão $toDate que usa um campo de mensagem de origem como argumento

  • uma expressão $dateFromString que usa um campo de mensagem de origem como argumento.

Se você não declarar um timeField, o Atlas Stream Processing criará um carimbo de data/hora a partir do carimbo de data/hora da mensagem fornecido pela origem.

config

documento

Opcional

documento que contém campo que substituem vários valores padrão.

config.startAfter

token

Condicional

O evento de alteração após o qual a fonte começa a relatar. Isso assume a forma de um resume token.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

config.startAtOperationTime

data | registro de data e hora

Condicional

O tempo de operação após o qual a fonte deve começar a relatar.

Você pode usar apenas um entre config.startAfter ou config.StartAtOperationTime.

Aceita JSON estendido do MongoDB valores de $date ou $timestamp.

config.fullDocument

string

Condicional

Configuração que controla se uma fonte de fluxo de alterações deve retornar um documento completo ou apenas as alterações quando ocorrer uma atualização. Deve ser um dos seguintes:

  • updateLookup : retorna apenas as alterações na atualização.

  • required : Deve retornar um documento completo. Se um documento completo não estiver disponível, não retornará nada.

  • whenAvailable : retorna um documento completo sempre que houver um disponível, caso contrário, retorna alterações.

Se você não especificar um valor para fullDocument, o padrão será updateLookup.

Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados.

config.fullDocumentOnly

booleano

Condicional

Configuração que controla se uma change stream retorna todo o documento do evento, incluindo todos os metadados, ou somente o conteúdo de fullDocument. Se definido como true, a origem retornará somente o conteúdo de fullDocument.

Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados.

config.fullDocumentBeforeChange

string

Opcional

Especifica se uma fonte de change stream deve incluir o documento completo em seu estado original "antes das alterações" na saída. Deve ser um dos seguintes:

  • off : omite o campo fullDocumentBeforeChange .

  • required : deve retornar um documento completo em seu estado anterior às alterações. Se um documento completo em seu estado anterior às alterações não estiver disponível, o processador de fluxo falhará.

  • whenAvailable : retorna um documento completo em seu estado anterior às alterações sempre que um estiver disponível, caso contrário, omite o campo fullDocumentBeforeChange .

Se você não especificar um valor para fullDocumentBeforeChange, o padrão será off.

Para usar esse campo com um fluxo de alteração de banco de dados, você deve ativar a alteração do fluxo de pré e pós-imagens em cada coleção nesse banco de dados.

config.pipeline

documento

Opcional

Especifica um pipeline de agregação para filtrar a saída do fluxo de alterações no ponto de origem. Este pipeline deve estar em conformidade com os parâmetros descritos em Modificar Saída do Fluxo de Alterações.

Observe que o Atlas Stream Processing espera receber os campos wallTime e clusterTime de cada evento de alteração ingerido. Para garantir o processamento adequado dos dados do Change Stream, não modifique esses campos no $source.config.pipeline.

config.maxAwaitTimeMS

inteiro

Opcional

Tempo máximo, em milissegundos, para aguardar que novas alterações de dados sejam relatadas ao cursor do fluxo de alterações antes de retornar um lote vazio.

Padrão é 1000.

Para operar em uma array de documentos, o estágio $source tem o seguinte formato de protótipo:

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"documents" : [{source-doc},...] | <expression>
}
}

O estágio $source recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

timeField

documento

Opcional

documento que define um carimbo de data/hora oficial para mensagens recebidas.

Se você usar timeField, deverá defini-lo como um dos seguintes:

  • uma expressão $toDate que usa um campo de mensagem de origem como argumento

  • uma expressão $dateFromString que usa um campo de mensagem de origem como argumento.

Se você não declarar um timeField, o Atlas Stream Processing criará um carimbo de data/hora a partir do carimbo de data/hora da mensagem fornecido pela origem.

documents

array

Condicional

Array de documentos para usar como fonte de dados de streaming. O valor deste campo pode ser uma matriz de objetos ou uma expressão que avalia para uma matriz de objetos. Não utilize este campo ao utilizar o campo connectionName .

$source deve ser o primeiro estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $source por pipeline.

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:

  1. O estágio $source estabelece uma conexão com o broker do Apache Kafka que está coletando esses relatórios em um tópico chamado my_weatherdata, expondo cada registro à medida que é assimilado nos estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o como ingestionTime.

  2. O estágio $match exclui documentos que têm um dewPoint.value menor ou igual a 5.0 e passa os documentos com dewPoint.value maior que 5.0 para o próximo estágio.

  3. O estágio $merge grava a saída na coleção do Atlas chamada stream no banco de dados sample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para visualizar os documentos na coleção sample_weatherstream.stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

A seguinte agregação ingere dados da fonte cluster0-collection, que se conecta a um cluster do Atlas carregado com o conjunto de dados de amostra. Para aprender como criar uma instância de processamento de fluxo e adicionar uma conexão a um cluster do Atlas ao registro de conexão, veja Introdução ao Atlas Stream Processing. Esta agregação executa duas etapas para abrir um fluxo de alterações e registrar as alterações na coleção data no banco de dados sample_weatherdata.

  1. O estágio $source conecta-se à origem cluster0-collection e abre um fluxo de alterações na coleção data no banco de dados sample_weatherdata.

  2. O estágio $merge grava os documentos filtrados do fluxo de alterações em uma coleção do Atlas chamada data_changes no banco de dados sample_weatherdata. Se não existir tal coleção, o Atlas a criará.

{
$source: {
connectionName: "cluster0-connection",
db : "sample_weatherdata",
coll : "data"
},
$merge: {
into: {
connectionName: "cluster0-connection",
db: "sample_weatherdata",
coll: "data_changes"
}
}
}

O seguinte comando mongosh exclui um documento data:

db.getSiblingDB("sample_weatherdata").data.deleteOne(
{ _id: ObjectId("5553a99ae4b02cf715120e4b") }
)

Após a exclusão do documento data, o processador de fluxo grava o documento de evento do fluxo de alterações na coleção sample_weatherdata.data_changes. Para visualizar os documentos na coleção sample_weatherdata.data_changes resultante, use mongosh para conectar-se ao seu cluster Atlas e execute o seguinte comando:

db.getSiblingDB("sample_weatherdata").data_changes.find()
[
{
_id: {
_data: '8267A3D7A3000000012B042C0100296E5A1004800951B8EDE4430AB5C1B254BB3C96D6463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F696400645553A99AE4B02CF715120E4B000004'
},
clusterTime: Timestamp({ t: 1738790819, i: 1 }),
documentKey: { _id: ObjectId('5553a99ae4b02cf715120e4b') },
ns: { db: 'sample_weatherdata', coll: 'data' },
operationType: 'delete',
wallTime: ISODate('2025-02-05T21:26:59.313Z')
}
]

Voltar

Estágios de aggregation

Nesta página