Join us Sept 17 at .local NYC! Use code WEB50 to save 50% on tickets. Learn more >
MongoDB Event
Docs Menu
Docs Home
/
Atlas
/ /

$sessionWindow

$sessionWindow ステージは、データの集計におけるセッション ウィンドウを指定します。セッション ウィンドウを使用すると、入力ストリーム内の各 "セッション" に対してパイプラインを実行できます。2 つのドキュメントが同じパーティションを持ち、かつそのタイムスタンプの差がセッション ギャップ未満であれば、それらは同じセッションと見なされます。ウィンドウが閉じると、その結果は次のステージに渡されます。

ウォーターマークが gap 値と allowedLateness 値の合計をセッション内の最大ドキュメント タイムスタンプに加えた期間継続すると、セッション ウィンドウは閉じます。閉じられたセッション ウィンドウの開始時刻はセッションの最初のイベントのタイムスタンプであり、終了時刻はセッションの最後のイベントのタイムスタンプにこの差を加えたものになります。セッション ウィンドウの終了時刻がセッション内の最大タイムスタンプにその差を加算します。ウィンドウ結果は、ウィンドウに収まるドキュメントに対する $sessionWindow.pipeline の出力です。

$sessionWindow

$sessionWindowパイプライン ステージには次のプロトタイプ形式があります。

{
$sessionWindow: {
partitionBy: "$userId",
gap: {unit: "minute", size: 5},
pipeline: [{$match: {ad: true}}, {$group: { _id: null, total: {$sum: "$value"}}}],
boundary: "eventTime",
allowedLateness: {unit: "second", size: 5}
}
}

$sessionWindowステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

partitionBy

必須

$sessionWindow が分割するフィールド。同じ partitionBy フィールドを共有する各受信ドキュメントは、まとめて処理されます。

gap

ドキュメント

必須

セッションを閉じる前に、partitionBy 値を共有する追加のレコードを待機するための時間を sizeunit の組み合わせとして定義するドキュメント。以下のドキュメントは:

  • sizeの値はゼロ以外の正の整数である必要があります。

  • unitの値は次のいずれかである必要があります。

    • "ms" (ミリ秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

例えば、size が「1」で、unit が「minute」の場合、このステージはセッションウィンドウを閉じる前に、同じ partitionBy 値を持つ追加のレコードを 1 分間待機します。

boundary

string

任意

ウィンドウの境界をイベント時刻に基づいて決定するか、処理時刻に基づいて決定するかを指定する文字列です。値には eventTime または processingTime を指定できます。詳しくは、ストリーム処理のタイミング」を参照してください。このフィールドを省略した場合、デフォルト値は eventTime になります。

boundaryprocessingTimeに設定されている場合、allowedLatenessフィールドを設定することはできません。

pipeline

配列

必須

ウィンドウ内のメッセージに対して評価されるネストされた集計パイプライン。

allowedLateness

duration

任意

sizeunit の組み合わせとして、ソースから生成されたウィンドウを、ウィンドウ終了時刻のドキュメント処理後に、遅れて到着するデータを受け入れるために開いておく時間を指定するドキュメントです。

  • sizeの値はゼロ以外の正の整数である必要があります。

  • unitの値は次のいずれかである必要があります。

    • "ms" (ミリ秒)

    • "second"

    • "minute"

    • "hour"

    • "day"

例、"3" の size と "秒" の unit は、レコードを次のステージに移動する前に遅延レコードのギャップが 3 秒後に待機します。

省略した場合、デフォルトは 3 秒となります。

$sessionWindow ステージに到達した各入力ドキュメントには、partitionBy 式に基づいてパーティションが割り当てられます。各ドキュメントは、そのパーティションとタイムスタンプに基づいてセッションウィンドウに割り当てられます。これは新しいセッション ウィンドウまたは既存のセッション ウィンドウである可能性があります。ウィンドウが閉じると、その結果は次のステージに渡されます。ウィンドウ結果は、ウィンドウ内のドキュメントに対してパイプラインを実行した際の出力です。

ウォーター マークがギャップを進め、allowedLateness の値がセッション内のドキュメントの最大タイムスタンプを超えると、セッション ウィンドウが閉じられます。クローズされたセッション ウィンドウの開始時刻は、セッション内の最初のイベントのタイムスタンプです。クローズされたセッション ウィンドウの終了時刻は、セッション内の最後のイベントのタイムスタンプにギャップを加えた時刻です。セッション ウィンドウの終了時刻は、セッション内の最大タイムスタンプにギャップを加えたものです。

注意

パーティション A に対するセッション ウィンドウがあり、その最大タイムスタンプが 2024-01-01 00:40:00 であるとします。ギャップが 1 時間の場合、パーティション A にそれ以降のドキュメントが到着しなければ、ウォーターマークが 2024-01-01 01:40:00 に達した時点でセッション ウィンドウは閉じられます。

入力ドキュメントに対して partitionBy式が失敗した場合、ドキュメントはDLQ に送信されます。順序外のデータを処理する場合、同じパーティションの既存のセッションWindowsのギャップ内にあるタイムスタンプでドキュメントが到達する可能性があります。 これにより、セッションWindowsがマージされます。

ソースからの最新のウォータマークの値より小さいタイムスタンプを持つドキュメントが $sessionWindow ステージに到達し、 かつ そのドキュメントのオープン セッションがない場合、ドキュメントはDLQ に送信されます。ドキュメントが$sessionWindow ステージに到達すると、そのストリーム メタ window.partitionwindow.startwindow.end フィールドが設定されます。

イベントは順不同で到着することがあります。同じpartitionBy値を持つ 2 つのイベントが到着し、それらのタイムスタンプがギャップよりも大きく離れている可能性があります。最初は、これらのイベントは別々のセッション ウィンドウに割り当てられます。その後、2 つのセッションウィンドウを結合する別のイベントが到着する可能性があります。

戻る

$externalFunction

項目一覧