An Atlas Stream Processing ストリーム プロセッサは、一意の名前を持つストリーム集計パイプラインのロジックをストリーミング データに適用します。Atlas Stream Processing は、各ストリーム プロセッサの定義を永続ストレージに保存し、再利用できるようにします。指定したストリーム プロセッサは、その定義が保存されている Stream Processing ワークスペースでのみ使用できます。
前提条件
ストリーム プロセッサを作成および管理するには、次のものが必要です。
ストリーム プロセッサを作成および実行する
atlasAdminロールを持つデータベースユーザーAtlas クラスター
Considerations
多くのストリーム プロセッサ コマンドでは、メソッド呼び出しで関連するストリーム プロセッサの名前を指定する必要があります。 次のセクションで説明される構文では、厳密に英数字の名前を使用することを前提としています。 ストリーム プロセッサの名前にハイフン(-)や完全停止(.)など、英数字以外の文字が含まれている場合は、名前を角括弧([])とdouble引用符("")で囲む必要があります。メソッド呼び出し(sp.["special-name-stream"].stats() と同様)。
ストリーム プロセッサを対話的に作成する
ストリーム プロセッサを作成する
ストリーム プロセッサを起動する
ストリーム プロセッサの停止
ストリームプロセッサを変更するには
既存のストリーム プロセッサの次の要素を変更できます。
名前
ストリーム プロセッサを変更するには、次の手順に従います。
ストリーム プロセッサを停止します。
「ストリーム プロセッサの停止」を参照してください。
ストリーム プロセッサを変更します。
「次の手順」を参照してください。
ストリーム プロセッサを再起動します。
ストリーム プロセッサを起動するを参照してください。
デフォルトでは、変更されたプロセッサは最後のチェックポイントから復元されます。あるいは、resumeFromCheckpoint=false を設定することもできます。この場合、プロセッサは要約統計のみを保持します。ウィンドウが開いているプロセッサーを変更した場合、ウィンドウは更新されたパイプラインで全て再計算されます。
注意
Operator(is、contains などのマッチャー式が含まれる)を使用して、ストリーム プロセッサ ステートが失敗した場合のアラートを構成したストリーム プロセッサの名前を変更すると、Atlas は、マッチャー式がその新しい名前と一致しない場合、名前が変更されたストリーム プロセッサに対してアラートをトリガーしません。名前が変更されたストリーム プロセッサを監視するには、アラートを再構成してください。
制限
デフォルト設定 resumeFromCheckpoint=true が有効になっている場合、以下の制限が適用されます。
$sourceステージは変更できません。ウィンドウの間隔は変更できません。
ウィンドウを削除することはできません。
ウィンドウを使用してパイプラインを変更できるのは、そのウィンドウの内部パイプラインに
$groupまたは$sortステージのいずれかが含まれている場合のみです。既存のウィンドウタイプを変更することはできません。たとえば、
$tumblingWindowから$hoppingWindowへの変更、またはその逆の変更はできません。ウィンドウを持つプロセッサは、ウィンドウの再計算の結果として、一部のデータを再処理する可能性があります。
変更操作後は、演算子ごとの統計は保持されません。
ストリーム プロセッサの削除
使用可能なストリーム プロセッサを一覧表示する
ワークスペースのデフォルトを一覧表示する
ストリーム プロセッサからのサンプル
既存のストリーム プロセッサからサンプル結果の配列を mongosh を使用して STDOUT に返すには、sp.processor.sample() メソッドを使用します。例、次のコマンドは、proc01 という名前のストリーム プロセッサからサンプリングしています。
sp.proc01.sample()
このコマンドは、 CTRL-Cを使用してキャンセルするまで、または返されたサンプルのサイズが累積40 MB になるまで継続的に実行されます。 ストリーム プロセッサは、サンプル内の無効なドキュメントを次の形式の_dlqMessageドキュメントで報告します。
{ _dlqMessage: { errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', workspaceName: '<workspaceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
これらのメッセージを使用して、デッド レター キューコレクションを定義せずにデータのクリーンアップの問題を診断できます。