定義
$hoppingWindowステージでは、データ集計用のホスティング ウィンドウを指定します。 Atlas Stream Processing Windowsはステートフルで、中断された場合に復元でき、遅延データを処理するメカニズムがあります。 このウィンドウ ステージ内のストリーミング データに他のすべての集計クエリを適用する必要があります。
$hoppingWindow$hoppingWindowパイプライン ステージには次のプロトタイプ形式があります。{ "$hoppingWindow": { "boundary": "eventTime" | "processingTime", "interval": { "size": <int>, "unit": "<unit-of-time>" }, "hopSize": { "size": <int>, "unit": "<unit-of-time>" }, "pipeline" : [ <aggregation-stage-array> ], "offset": { "offsetFromUtc": <int>, "unit": "<unit-of-time>" }, "idleTimeout": { "size": <int>, "unit": "<unit-of-time>" }, "allowedLateness": { "size": <int>, "unit": "<unit-of-time>" }, } }
構文
$hoppingWindowステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
|---|---|---|---|
| string | 任意 | ウィンドウの境界がイベント時間によって決定されるか、処理時間によって決定されるかを指定する文字列。値は
|
| ドキュメント | 必須 | ホスティング ウィンドウの間隔をサイズと時間単位の組み合わせとして指定するドキュメント。
たとえば、 |
| ドキュメント | 必須 | ウィンドウの開始時間間の経過時間の長さを、
たとえば、 |
| 配列 | 必須 | ウィンドウ内のメッセージに対して評価されるネストされた集計パイプライン。 |
| ドキュメント | 任意 | UTC に対するウィンドウ境界の時間オフセットを指定するドキュメント。 ドキュメントは、サイズ フィールド
たとえば、 |
| ドキュメント | 任意 |
たとえば、 12 : 00時から1 : 00時系列と |
| ドキュメント | 任意 | ウィンドウ 終了時間のドキュメント処理後に遅延データを受け入れるために、ソースから生成されたWindowsを起動したままにしておく時間を指定するドキュメント。 省略した場合、デフォルトは3秒になります。 |
動作
Atlas Stream Processing は、パイプラインごとに 1 つのウィンドウステージのみをサポートします。
ウィンドウ ステージに$groupステージを適用すると、単一のグループ キーによる RAM 制限は100 MB に制限されます。
Windows内では特定の集計ステージのサポートが制限されているか、利用できない場合があります。 詳細については、 「サポートされている集計パイプライン ステージ」 を参照してください。
サービスが中断された場合、ウィンドウの内部パイプラインを中断時点の状態から再開できます。 詳細については、「チェックポイント 」を参照してください。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
$sourceステージでは、 という名前のトピックでこれらのレポートを収集するApache Kafkamy_weatherdataプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれるときに公開します。$hoppingWindowステージでは、期間が 100 秒で、20 秒ごとに開始される重複する時間のWindowsを定義します。各ウィンドウは内部pipelineを実行し、特定のウィンドウの期間にわたってApache Kafkaプロバイダーからストリーミングされたsample_weatherdataドキュメントで定義される平均liquidPrecipitation.depthを見つけます。次に、pipelineは、表すウィンドウの開始タイムスタンプと同等の_idとそのウィンドウのaveragePrecipitationを含む単一のドキュメントを出力します。$mergeステージは、sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
pipeline = [ { $source: { "connectionName": "streamsExampleConnectionToKafka", "topic": "my_weatherdata" } }, { $hoppingWindow: { "interval": { "size": 100, "unit": "second" }, "hopSize": { "size": 20, "unit": "second" }, "pipeline" : [ { $group: { _id: { $meta: "stream.window.start" }, averagePrecipitation: { $avg: "$liquidPrecipitation.depth" } } } ], } }, { $merge: { "into": { "connectionName":"streamsExampleConnectionToAtlas", "db":"streamDB", "coll":"streamCollection" } } } ]
結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ISODate('2024-08-28T19:30:20.000Z'), averagePrecipitation: 2264.3973214285716 }, { _id: ISODate('2024-08-28T19:30:40.000Z'), averagePrecipitation: 2285.7061611374406 }, { _id: ISODate('2024-08-28T19:31:00.000Z'), averagePrecipitation: 2357.6940154440153 }, { _id: ISODate('2024-08-28T19:31:20.000Z'), averagePrecipitation: 2378.374061433447 }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。