定義
$tumblingWindowステージでは、データ集計用のローリング ウィンドウを指定します。 Atlas Stream Processing Windowsはステートフルで、中断された場合に復元でき、遅延データを処理するメカニズムがあります。 このウィンドウ ステージ内のストリーミング データに他のすべての集計クエリを適用する必要があります。
$tumblingWindow$tumblingWindowパイプライン ステージには次のプロトタイプ形式があります。{ "$tumblingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { size: <int>, unit: "<unit-of-time>" } } } あるいは、
$tumblingWindowパイプラインステージには、以下に示すように、整数値 0 のallowedLatenessフィールドとidleTimeoutフィールドを持たせることができます。{ "$tumblingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": 0, "allowedLateness": 0 } }
構文
$tumblingWindowステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
|---|---|---|---|
| string | 任意 | ウィンドウの境界がイベント時間によって決定されるか、処理時間によって決定されるかを指定する文字列。値は
|
| ドキュメント | 必須 | ホスティング ウィンドウの間隔をサイズと時間単位の組み合わせとして指定するドキュメント。
たとえば、 |
| 配列 | 必須 | ウィンドウ内のメッセージに対して評価されるネストされた集計パイプライン。 |
| ドキュメント | 任意 | UTC に対するウィンドウ境界の時間オフセットを指定するドキュメント。 ドキュメントは、サイズ フィールド
たとえば、 |
| ドキュメント | 任意 |
たとえば、 12 : 00時から1 : 00時系列と あるいは、0 の整数値でこの設定を定義することもできます。詳細は、パイプラインの定義をご覧ください。 |
| ドキュメント | 任意 |
動作
Atlas Stream Processing は、パイプラインごとに 1 つのウィンドウステージのみをサポートします。
ウィンドウ ステージに$groupステージを適用すると、単一のグループ キーによる RAM 制限は100 MB に制限されます。
Windows内では特定の集計ステージのサポートが制限されているか、利用できない場合があります。 詳細については、 「サポートされている集計パイプライン ステージ」 を参照してください。
サービスが中断された場合、ウィンドウの内部パイプラインを中断時点の状態から再開できます。 詳細については、「チェックポイント 」を参照してください。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
$sourceステージでは、 という名前のトピックでこれらのレポートを収集するApache Kafkamy_weatherdataプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれるときに公開します。$tumblingWindowステージは、30 秒の連続したウィンドウを定義します。各ウィンドウは内部pipelineを実行し、そのウィンドウの時間の平均、中央値、最大、および最小atmosphericPressureObservation.altimeterSetting.valueを見つけます。次に、pipelineは、表示ウィンドウの開始タイムスタンプとそのウィンドウの指定された値と同等の_idを持つ単一のドキュメントを出力します。$mergeステージは、sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$tumblingWindow': { interval: { size: 30, unit: "second" }, pipeline: [{ $group: { _id: { $meta: "stream.window.start"}, averagePressure: { $avg: "$atmosphericPressureObservation.altimeterSetting.value" }, medianPressure: { $median: { input: "$atmosphericPressureObservation.altimeterSetting.value", method: "approximate" } }, maxPressure: { $max: "$atmosphericPressureObservation.altimeterSetting.value" }, minPressure: { $min: "$atmosphericPressureObservation.altimeterSetting.value" } } }] } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-09-26T16:34:00.000Z'), averagePressure: 5271.47894736842, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }, { _id: ISODate('2024-09-26T16:34:30.000Z'), averagePressure: 5507.9, maxPressure: 9999.9, medianPressure: 1015.9, minPressure: 1015.9 }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。