Definição
O $hoppingWindow estágio especifica uma janela de salto para agregação de dados. As janelas do Atlas Stream Processing têm estado, podem ser recuperadas se interrompidos e possuem mecanismos para processar dados que chegam atrasados. Você deve aplicar todas as outras consultas de agregação aos seus dados de streaming dentro desse estágio de janela.
$hoppingWindowUm estágio de pipeline do
$hoppingWindowtem a seguinte forma de protótipo:{ "$hoppingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "hopSize": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { "size": <int>, "unit": "<unit-of-time>" }, } }
Sintaxe
O estágio $hoppingWindow recebe um documento com os seguintes campos:
Campo | Tipo | necessidade | Descrição |
|---|---|---|---|
| string | Opcional | String que especifica se os limites da janela são determinados pelo tempo de evento ou pelo tempo de processamento. O valor pode ser
|
| documento | Obrigatório | Documento que especifica o intervalo de uma janela de salto como uma combinação de um tamanho e uma unidade de tempo em que:
Por exemplo, um |
| documento | Obrigatório | Documento que especifica o comprimento do salto entre os horários de início da janela como uma combinação de
Por exemplo, um |
| array | Obrigatório | Pipeline de agregação aninhado avaliado em relação às mensagens dentro da janela. |
| documento | Opcional | Documento que especifica uma compensação de horário para os limites da janela em relação ao UTC. O documento é uma combinação do campo de tamanho
Por exemplo, um |
| documento | Opcional | Documento que especifica quanto tempo esperar antes de fechar Windows se
Se você definir Por exemplo, considere uma janela 12:00 pm a 1:00 pm e |
| documento | Opcional | Documento que especifica por quanto tempo manter abertas as janelas geradas a partir da fonte para aceitar dados que chegam tarde após o processamento dos documentos até o horário de término da janela. Se omitido, o padrão é 3 segundos. |
Comportamento
O Atlas Stream Processing permite apenas um estágio de janela por pipeline.
Quando você aplica o estágio $group ao estágio da janela, uma única chave de grupo tem um limite de 100 megabytes de RAM.
A compatibilidade para determinados estágios de agregação pode ser limitada ou indisponível nas janelas. Para saber mais, consulte Estágios de pipeline de agregação com compatibilidade.
No caso de uma interrupção de serviço, você pode retomar o pipeline interno de uma janela a partir do estado em que se encontrava no momento da interrupção. Para saber mais, consulte checkpoints.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:
O estágio estabelece
$sourceuma conexão com o broker do Apache Kafka que coleta esses relatórios em um tópico chamadomy_weatherdata, expondo cada registro à medida que ele é ingerido aos estágios de agregação posteriores.O estágio
$hoppingWindowdefine janelas de tempo sobrepostas com 100 segundos de duração e que começam a cada 20 segundos. Cada janela executa umpipelineinterno que encontra a médialiquidPrecipitation.depth, conforme definido nossample_weatherdatadocumentos transmitidos pelo corretor Apache Kafka, durante a duração de uma determinada janela. Em seguida, opipelinegera um único documento com um_idequivalente ao carimbo de data/hora de início da janela que ele representa e oaveragePrecipitationdessa janela.O estágio
$mergegrava a saída na coleção do Atlas chamadastreamno banco de dadossample_weatherstream. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
pipeline = [ { $source: { "connectionName": "streamsExampleConnectionToKafka", "topic": "my_weatherdata" } }, { $hoppingWindow: { "interval": { "size": 100, "unit": "second" }, "hopSize": { "size": 20, "unit": "second" }, "pipeline" : [ { $group: { _id: { $meta: "stream.window.start" }, averagePrecipitation: { $avg: "$liquidPrecipitation.depth" } } } ], } }, { $merge: { "into": { "connectionName":"streamsExampleConnectionToAtlas", "db":"streamDB", "coll":"streamCollection" } } } ]
Para visualizar os documentos na coleção sample_weatherstream.stream resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-08-28T19:30:20.000Z'), averagePrecipitation: 2264.3973214285716 }, { _id: ISODate('2024-08-28T19:30:40.000Z'), averagePrecipitation: 2285.7061611374406 }, { _id: ISODate('2024-08-28T19:31:00.000Z'), averagePrecipitation: 2357.6940154440153 }, { _id: ISODate('2024-08-28T19:31:20.000Z'), averagePrecipitation: 2378.374061433447 }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.