Menu Docs
Página inicial do Docs
/ /
/ / /

$sessionWindow

O estágio $sessionWindow especifica uma janela de sessão para agregação de dados. As janelas de sessão permitem que você execute um pipeline em cada "sessão" de atividade em um fluxo de entrada. Dois documentos estão na mesma sessão se tiverem a mesma partição e a diferença entre seus carimbos de data e hora for menor que o intervalo da sessão. Quando uma janela é fechada, seus resultados são liberados para o próximo estágio.

Uma janela de sessão se fecha quando a marca d'água avança por uma duração igual ao valor gap mais o valor allowedLateness além do timestamp máximo do documento na sessão. O horário de início de uma janela de sessão fechada é o timestamp do primeiro evento na sessão, e o horário de término é o timestamp do último evento na sessão mais o intervalo. O horário de término da janela de sessão adiciona o intervalo ao timestamp máximo na sessão. Os resultados da janela são a saída de $sessionWindow.pipeline nos documentos que se encaixam na janela.

$sessionWindow

Um estágio de pipeline do $sessionWindow tem a seguinte forma de protótipo:

{
$sessionWindow: {
partitionBy: "$userId",
gap: {unit: "minute", size: 5},
pipeline: [{$match: {ad: true}}, {$group: { _id: null, total: {$sum: "$value"}}}],
boundary: "eventTime",
allowedLateness: {unit: "second", size: 5}
}
}

O estágio $sessionWindow recebe um documento com os seguintes campos:

Campo
Tipo
necessidade
Descrição

partitionBy

expressão

Obrigatório

Campos para particionar o $sessionWindow. O MongoDB processa documentos recebidos que compartilham os mesmos campos partitionBy .

partitionBy suporta apenas expressões de campo único ou simples. Ele não oferece suporte a documentos incorporados com várias chaves.

gap

documento

Obrigatório

Documento que define a quantidade de tempo como uma combinação de size e unit para aguardar registros adicionais que compartilham valores partitionBy antes de fechar a sessão, em que:

  • O valor de size deve ser um número inteiro positivo diferente de zero.

  • O valor de unit deve ser um dos seguintes:

    • "ms" (milésimo de segundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por exemplo, com um size de "1" e um unit de "minuto", esta etapa aguarda um minuto por registros adicionais com os mesmos valores de partitionBy antes de fechar a janela da sessão.

boundary

string

Opcional

String que especifica se os limites da janela são determinados pelo tempo de evento ou pelo tempo de processamento. O valor pode ser eventTime ou processingTime. Consulte tempo de processamento de stream para aprender mais. Se omitido, este campo é padronizado como eventTime.

Não é possível definir o campo allowedLateness quando boundary está definido como processingTime.

pipeline

array

Obrigatório

Pipeline de agregação aninhado avaliado em relação às mensagens dentro da janela.

allowedLateness

duration

Opcional

Documento que especifica o período como uma combinação de size e um unit para manter as janelas geradas pela origem abertas para aceitar dados que chegam atrasados após o processamento de documentos até o tempo de fechamento da janela, onde:

  • O valor de size deve ser um número inteiro positivo diferente de zero.

  • O valor de unit deve ser um dos seguintes:

    • "ms" (milésimo de segundo)

    • "second"

    • "minute"

    • "hour"

    • "day"

Por exemplo, um size de "3" e um unit de "segundo" aguarda 3 segundos após a lacuna para registros atrasados antes de mover os registros para o próximo estágio.

Se omitido, o padrão é 3 segundos.

Cada documento de entrada que atinge o estágio $sessionWindow recebe uma partição da expressão partitionBy atribuída. Cada documento é atribuído a uma janela de sessão com base em sua partição e carimbo de data e hora. Essa pode ser uma janela de sessão nova ou existente. Quando uma janela é fechada, seus resultados são liberados para o próximo estágio. Os resultados da janela são a saída do pipeline para os documentos na janela.

Uma janela de sessão é fechada quando a marca d'água avança a lacuna e os valores de allowedLateness ultrapassam o carimbo de data e hora limite do documento na sessão. O horário de início de uma janela de sessão fechada é o carimbo de data e hora do primeiro evento na sessão. O horário de término de uma janela de sessão fechada é o carimbo de data e hora do último evento na sessão, mais o intervalo. O horário de término da janela da sessão adiciona o intervalo ao carimbo de data e hora limite da sessão.

Observação

Suponha que exista uma janela de sessão para a partição A, com um carimbo de data e hora limite de 2024-01-01 00:40:00. Com um intervalo de 1 hora(s), se nenhum documento posterior chegar para a partição A, a janela da sessão será fechada quando a marca d'água atingir 2024-01-01 01:40:00.

Se a expressão partitionBy falhar para um documento de entrada, o documento será enviado para o DLQ. Ao processar dados fora de ordem, um documento pode chegar com um carimbo de data/hora dentro da lacuna das janelas de sessão existentes da mesma partição. Isso mesclará a sessão Windows.

Se um documento chegar ao estágio $sessionWindow com um carimbo de data/hora menor que o valor da marca d''d'gua mais recente da origem e não houver nenhuma sessão aberta para esse documento, o documento será enviado para o DLQ. Quando um documento atinge o estágio $sessionWindow, seus campos meta de fluxo window.partition, window.start e window.end são definidos.

Os eventos podem chegar fora de sequência. Dois eventos que têm o mesmo valor de partitionBy podem chegar, e seus carimbos de data e hora podem ser separados por mais do que o intervalo. Inicialmente, esses eventos serão alocados em janelas de sessão distintas. Mais tarde, outro evento que mescla as duas janelas de sessão pode chegar.

Voltar

$externalFunction

Nesta página