AI エージェント向け: ドキュメントインデックスは https://www.mongodb.com/ja-jp/docs/llms.txt で利用できます。すべてのページの markdown バージョンは、いずれかの URL パスに .md を追加することで利用できます。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

ストリーム プロセッサの開発

An Atlas Stream Processing ストリーム プロセッサは、一意の名前を持つストリーム集計パイプラインのロジックをストリーミング データに適用します。Atlas Stream Processing は、各ストリーム プロセッサの定義を永続ストレージに保存し、再利用できるようにします。指定したストリーム プロセッサは、その定義が保存されている Stream Processing ワークスペースでのみ使用できます。

ストリーム プロセッサを作成および管理するには、次のものが必要です。

多くのストリーム プロセッサ コマンドでは、メソッド呼び出しで関連するストリーム プロセッサの名前を指定する必要があります。 次のセクションで説明される構文では、厳密に英数字の名前を使用することを前提としています。 ストリーム プロセッサの名前にハイフン(-)や完全停止(.)など、英数字以外の文字が含まれている場合は、名前を角括弧([])とdouble引用符("")で囲む必要があります。メソッド呼び出し(sp.["special-name-stream"].stats() と同様)。

注意

Atlas Stream Processing は、stopped 45日以上で であったストリーム プロセッサの内部状態を破棄します。このようなプロセッサを起動すると、最初の実行と同じように動作し、統計情報を報告します。

注意

Atlas Stream Processing は、stopped 45日以上で であったストリーム プロセッサの内部状態を破棄します。このようなプロセッサを起動すると、最初の実行と同じように動作し、統計情報を報告します。

既存のストリーム プロセッサの次の要素を変更できます。

ストリーム プロセッサを変更するには、次の手順に従います。

1

ストリーム プロセッサの停止」を参照してください。

2

次の手順」を参照してください。

3

ストリーム プロセッサを起動するを参照してください。

デフォルトでは、変更されたプロセッサは最後のチェックポイントから復元されます。あるいは、resumeFromCheckpoint=false を設定することもできます。この場合、プロセッサは要約統計のみを保持します。ウィンドウが開いているプロセッサーを変更した場合、ウィンドウは更新されたパイプラインで全て再計算されます。

注意

Operatoriscontains などのマッチャー式が含まれる)を使用して、ストリーム プロセッサ ステートが失敗した場合のアラートを構成したストリーム プロセッサの名前を変更すると、Atlas は、マッチャー式がその新しい名前と一致しない場合、名前が変更されたストリーム プロセッサに対してアラートをトリガーしません。名前が変更されたストリーム プロセッサを監視するには、アラートを再構成してください。

デフォルト設定 resumeFromCheckpoint=true が有効になっている場合、以下の制限が適用されます。

  • $source ステージは変更できません。

  • ウィンドウの間隔は変更できません。

  • ウィンドウを削除することはできません。

  • ウィンドウを使用してパイプラインを変更できるのは、そのウィンドウの内部パイプラインに $group または $sort ステージのいずれかが含まれている場合のみです。

  • 既存のウィンドウタイプを変更することはできません。たとえば、$tumblingWindow から $hoppingWindow への変更、またはその逆の変更はできません。

  • ウィンドウを持つプロセッサは、ウィンドウの再計算の結果として、一部のデータを再処理する可能性があります。

  • 変更操作後は、演算子ごとの統計は保持されません。

既存のストリーム プロセッサからサンプル結果の配列を mongosh を使用して STDOUT に返すには、sp.processor.sample() メソッドを使用します。例、次のコマンドは、proc01 という名前のストリーム プロセッサからサンプリングしています。

sp.proc01.sample()

このコマンドは、 CTRL-Cを使用してキャンセルするまで、または返されたサンプルのサイズが累積40 MB になるまで継続的に実行されます。 ストリーム プロセッサは、サンプル内の無効なドキュメントを次の形式の_dlqMessageドキュメントで報告します。

{
_dlqMessage: {
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
workspaceName: '<workspaceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

これらのメッセージを使用して、デッド レター キューコレクションを定義せずにデータのクリーンアップの問題を診断できます。

注意

Atlas Stream Processing は、stopped 45日以上で であったストリーム プロセッサの内部状態を破棄します。このようなプロセッサを起動すると、最初の実行と同じように動作し、統計情報を報告します。