定義
$sessionWindow
ステージは、データの集計におけるセッション ウィンドウを指定します。セッション ウィンドウを使用すると、入力ストリーム内の各 "セッション" に対してパイプラインを実行できます。2 つのドキュメントが同じパーティションを持ち、かつそのタイムスタンプの差がセッション ギャップ未満であれば、それらは同じセッションと見なされます。ウィンドウが閉じると、その結果は次のステージに渡されます。
ウォーターマークが gap
値と allowedLateness
値の合計をセッション内の最大ドキュメント タイムスタンプに加えた期間継続すると、セッション ウィンドウは閉じます。閉じられたセッション ウィンドウの開始時刻はセッションの最初のイベントのタイムスタンプであり、終了時刻はセッションの最後のイベントのタイムスタンプにこの差を加えたものになります。セッション ウィンドウの終了時刻がセッション内の最大タイムスタンプにその差を加算します。ウィンドウ結果は、ウィンドウに収まるドキュメントに対する $sessionWindow.pipeline
の出力です。
構文
$sessionWindow
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| 式 | 必須 |
|
| ドキュメント | 必須 | セッションを閉じる前に、
例えば、 |
| string | 任意 | ウィンドウの境界をイベント時刻に基づいて決定するか、処理時刻に基づいて決定するかを指定する文字列です。値には
|
| 配列 | 必須 | ウィンドウ内のメッセージに対して評価されるネストされた集計パイプライン。 |
| duration | 任意 |
例、"3" の 省略した場合、デフォルトは 3 秒となります。 |
動作
$sessionWindow
ステージに到達した各入力ドキュメントには、partitionBy
式に基づいてパーティションが割り当てられます。各ドキュメントは、そのパーティションとタイムスタンプに基づいてセッションウィンドウに割り当てられます。これは新しいセッション ウィンドウまたは既存のセッション ウィンドウである可能性があります。ウィンドウが閉じると、その結果は次のステージに渡されます。ウィンドウ結果は、ウィンドウ内のドキュメントに対してパイプラインを実行した際の出力です。
ウォーター マークがギャップを進め、allowedLateness
の値がセッション内のドキュメントの最大タイムスタンプを超えると、セッション ウィンドウが閉じられます。クローズされたセッション ウィンドウの開始時刻は、セッション内の最初のイベントのタイムスタンプです。クローズされたセッション ウィンドウの終了時刻は、セッション内の最後のイベントのタイムスタンプにギャップを加えた時刻です。セッション ウィンドウの終了時刻は、セッション内の最大タイムスタンプにギャップを加えたものです。
注意
パーティション A に対するセッション ウィンドウがあり、その最大タイムスタンプが 2024-01-01 00:40:00
であるとします。ギャップが 1 時間の場合、パーティション A にそれ以降のドキュメントが到着しなければ、ウォーターマークが 2024-01-01 01:40:00
に達した時点でセッション ウィンドウは閉じられます。
入力ドキュメントに対して partitionBy
式が失敗した場合、ドキュメントはDLQ に送信されます。順序外のデータを処理する場合、同じパーティションの既存のセッションWindowsのギャップ内にあるタイムスタンプでドキュメントが到達する可能性があります。 これにより、セッションWindowsがマージされます。
ソースからの最新のウォータマークの値より小さいタイムスタンプを持つドキュメントが $sessionWindow
ステージに到達し、 かつ そのドキュメントのオープン セッションがない場合、ドキュメントはDLQ に送信されます。ドキュメントが$sessionWindow
ステージに到達すると、そのストリーム メタ window.partition
、window.start
、window.end
フィールドが設定されます。
イベントは順不同で到着することがあります。同じpartitionBy
値を持つ 2 つのイベントが到着し、それらのタイムスタンプがギャップよりも大きく離れている可能性があります。最初は、これらのイベントは別々のセッション ウィンドウに割り当てられます。その後、2 つのセッションウィンドウを結合する別のイベントが到着する可能性があります。