Atlas Stream Processing は、Atlas データベースで使用されているものと同じ集計操作を使用して、複雑なデータストリームの読み取り、書き込み、変換を可能にします。Atlas Stream Processing を使用すると、次のことが可能になります。
継続的な検証を実行して、メッセージが正しく作成されているかどうかを確認し、メッセージの破損を検出し、遅延データを検出します。
ドキュメントがパイプラインを通過するときにフィールドを変換し、各ドキュメント内のフィールドまたは式をキーとして使用して、それらのドキュメントを個別のデータベース、 Kafkaトピック、またはその他の外部シンクにルーティングします。
Atlas コレクションまたはApache Kafka クラスターに結果を継続的に公開し、最新のビューとデータ分析を確保します。
Atlas Stream Processing コンポーネントは Atlas プロジェクトに直接属し、Atlas クラスターとは独立して動作します。
注意
Atlas Stream Processing は MongoDB変更ストリームを超える拡張機能を提供し、複数のデータイベントタイプの管理や、Kafka、外部 API、クラウドストレージなど様々なソースからの複雑なデータストリームのプロセシングを行います。データベースイベントに制限されている変更ストリームとは異なり、Atlas Stream Processing は、Atlas データベースで使用されるのと同じクエリAPIを使用して包括的な Stream Processing ワークフローを提供します。
Stream Processing ワークスペースの構成
Atlas Stream Processing を使い始めるには、まずStream Processing ワークスペースを構成する必要があります。これには、Atlas Stream Processing ワークスペースを作成、変更、削除してストリーミングデータのプロセシングを開始する方法が含まれます。
データのストリーミング
ストリームは、1 つ以上のソースから発生する不変データの連続したフローです。データストリームの例には、センサーからの温度や負荷の読み取り、金融トランザクションの記録、または変更データをキャプチャするイベントが含まれます。
データ ストリームは、 Apache Kafkaトピック やMongoDB変更ストリーム などのソースに基づきます。その後、 Apache Kafkaトピック 、 Atlas コレクション、外部関数、またはクラウドデータ ストアなどの処理されたデータをシンクに書き込むことができます。
Atlas Stream Processing は、 保存データベースの時間や計算上の制約なしに連続データを操作するネイティブの Stream Processing 機能を提供します。
ストリームプロセッサの構造
ストリーム プロセッサは、概念的に3つのフェーズに分割できるパイプラインの形式を取ります。この構造を理解したら、ストリーム プロセッサを作成および管理して、ストリーミングデータを継続的に処理できます。
ソース
ストリーム プロセッサは、Atlas Stream Processing が接続されているストリーミングデータのソースからドキュメントを取り込むことで始まります。これらは、Apache Kafka などのサーバー システムや、Atlas の読み取り/書き込み操作によって生成されるようなデータベース変更ストリームでもあります。これらの入力は有効な json または ejson ドキュメントである必要があります。$source ステージがドキュメントを取り込むと、必要に応じてそのドキュメントにMongoDB集計を適用して変換できます。
Atlas Stream Processing は、ストリーミングソースからのデータを取り込むだけでなく、接続された Atlas クラスターのデータを結合するために、HTTPS requests と $lookup 操作のデータを使用してドキュメントを強化することもサポートされています。
パイプライン
ストリーム プロセッサは、集計パイプラインステージと集計演算子に加えて、標準のMongoDB スイートの集計演算子とステージを活用して、取り込んだデータを変換し、価値あるインサイトを抽出します。Atlas Stream Processing の集計パイプラインを定義する方法については、集計パイプラインのドキュメントをご覧ください。Atlas Stream Processing は処理できないドキュメントをデッドレターキュー(DLQ)に書き込むことができます。
ドキュメントを再構築したり、フィールドを追加または削除したり、コレクションから情報を検索したりすることで、ドキュメントを増やします。Atlas Stream Processing では、Windowsを使用してイベントを収集し、任意の関数を実行することもできます。
Windows
Windows は、設定された期間内のストリーミングデータを集計するパイプラインステージです。これにより、データをグループ化したり、平均を取得したり、最小値と最大値を見つけたり、ストリーミングデータには適用できないその他のさまざまな操作を実行したりできます。各ストリーム プロセッサには、ウィンドウステージが 1 つだけ含めることができます。
関数
Atlas Stream Processing は、ストリーム プロセスが渡す各ドキュメントに対して実行されるカスタム JavaScript 関数 または Amazon Web Services Lambda 関数 への呼び出しをサポートしています。
Sinks
取り込まれたデータを処理した後、ストリーム プロセッサはそれを Sink に書き込むようにします。Atlas Stream Processing は、さまざまな Sink タイプへの書き込みのための $emit ステージと $merge ステージを提供します。これらのステージは互いに排他的であり、各ストリーム プロセッサが持つことができる Sink ステージは 1 つだけです。パイプラインには、同じ Sink 接続内の別のKafkaトピックまたは Atlas コレクションに処理されたドキュメントを書込むロジックを含めることができます。
Atlas Stream Processing リージョン
Atlas Stream Processing は、AWS、Azure、Google Cloud 上で Stream Processing ワークスペースの作成をサポートしています。利用可能なリージョンのリストについては、次の Stream Processing ワークスペースのセクションを参照してください。
ストリーム プロセッサは、異なるクラウドプロバイダーまたは異なるリージョンでホストされているクラスターから読み取り、書き込みができます。
請求
請求の詳細については、Atlas Stream Processing の請求ページを参照してください。
次のステップ
Atlas Stream Processing を使ったサンプル処理を開始するには、Atlas Stream Processing を使い始める を参照してください。
Atlas Stream Processing の主要概念の詳細については、以下を参照してください。
Atlas Stream Processing の特定の制限について学びます。