Atlas Data Federation と Atlas Scheduled Triggers を使用して、Atlas クラスターからApache Perquet形式のAWS S3 バケットにデータをコピーします。 Partquet は、ドキュメントではなくファイルとしてデータを必要とする分析や機械学習のワークロードに適した列指向形式です。定期的にスケジュールしてコピーを実行し、運用クラスターから分析クエリをオフロードします。
このタスクについて
チュートリアルではデルタ アプローチが使用されます。つまり、trigger が実行されるたびに過去 60 秒のドキュメントがコピーされます。代替策として、毎回コレクション全体がコピーされる完全なスナップショットが挙げられます。正しいアプローチは、データ量と下流の利用者の要件によって異なります。
このチュートリアルの とmaxFileSize maxRowGroupSizeの値は、本番環境ではなくテスト用に最適化されています。本番環境のワークロードの場合は、 $out ステージのオプションを確認し、クエリ パターンに基づいてファイルサイズとパーティショニングを調整します。
始める前に
このチュートリアルを開始する前に、次のタスクを完了してください。
コピーするデータを持つクラスターで Atlas アカウントを作成します。開始するには、「 クラスターの作成 」を参照してください。
IAM ロールと S3 バケットを作成する特権を持つAWSアカウントを作成します。 Atlas Data Federation に必要な権限を構成するには、「 フェデレーティッドデータベースインスタンス データストアの配置 」を参照してください。
AWS CLI をインストールして構成します。
手順
S3 と Atlas データ ストアを使用してフェデレーティッドデータベースインスタンスを配置します。
フェデレーティッドデータベースインスタンスは、複数のデータソースを単一のクエリ可能なインターフェースに統合します。このチュートリアルでは、 S3 バケットと Atlas クラスターを同じフェデレーティッドデータベースインスタンス内のデータ ストアとして接続します。両方のデータ ストアを接続すると、コピー trigger はクラスターから読み取り、S3 に書込むことができます。
S3 データストアを使用してフェデレーティッドデータベースインスタンスを配置します。方法については、「 フェデレーティッドデータベースインスタンス データストアの配置 」を参照してください。 S データストアを構成するときは、IAM3Read and write ロール にバケットへのアクセスを許可して、Atlas Data Federation が Perquet ファイルを書き込めます。
Atlas クラスターを 2 つ目のデータストアとしてフェデレーティッドデータベースインスタンスに追加します。
これらの手順を完了したら、フェデレーティッドデータベースインスタンスサービスの名前を書き留めておきます。この名前は後の手順で必要になります。
テスト ドキュメントを挿入するための予定されたトリガーを作成します。
毎分ごとに新しいドキュメントをクラスターに挿入する予定されたトリガーを作成します。これによりテスト データが生成され、コピー trigger が動作することを確認できます。
Atlas で、 Triggers ページに移動します。
まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。
まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。
サイドバーで、 Streaming Data見出しの下のTriggersをクリックします。
Triggersページが表示されます。
[Add Trigger] をクリックします。
Trigger Type として Scheduled を選択します。
Trigger Details で、次の構成を設定します。
設定値Trigger NameCreate_Event_Every_Min_TriggerSchedule TypeBasicInterval1分ごとEvent TypeFunctionFunction セクションで、+ New Function を選択し、次のコードを入力します。プレースホルダー値を、Atlas サービス、データベース、コレクションの名前に置き換えます。
exports = function () { const mongodb = context.services.get( "NAME_OF_YOUR_ATLAS_SERVICE" ); const db = mongodb.db("NAME_OF_YOUR_DATABASE"); const events = db.collection( "NAME_OF_YOUR_COLLECTION" ); const event = events.insertOne({ time: new Date(), aNumber: Math.random() * 100, type: "event" }); return JSON.stringify(event); }; [Save] をクリックします。
trigger の実行後、1 分ごとに新しいドキュメントがクラスターコレクションに表示されることを確認します。
S3 にデータをコピーするための予定されたトリガーを作成します。
$out ステージを使用して集計パイプラインを実行する予定されたトリガーを作成し、1 分ごとに最近のドキュメントをクラスターから S3 バケットにコピーし形式。
Triggersページで、 Add Triggerをクリックします。
Trigger Type として Scheduled を選択します。
Trigger Details で、次の構成を設定します。
設定値Trigger NameCopy_Events_To_S3_TriggerSchedule TypeBasicInterval1分ごとEvent TypeFunctionFunctionセクションで、+ New Function を選択し、次のコードを入力します。プレースホルダー値を、フェデレーティッドデータベースインスタンスサービス、 仮想データベース、 仮想コレクション、 S3 バケット、およびAWSリージョンの名前に置き換えます。
exports = function () { const service = context.services.get( "NAME_OF_YOUR_FEDERATED_DATA_SERVICE" ); const db = service.db( "NAME_OF_YOUR_VIRTUAL_DATABASE" ); const events = db.collection( "NAME_OF_YOUR_VIRTUAL_COLLECTION" ); const pipeline = [ { $match: { "time": { $gt: new Date( Date.now() - 60 * 1000 ), $lt: new Date(Date.now()) } } }, { "$out": { "s3": { "bucket": "YOUR_S3_BUCKET_NAME", "region": "YOUR_AWS_REGION", "filename": "events", "format": { "name": "parquet", "maxFileSize": "10GB", "maxRowGroupSize": "100MB" } } } } ]; return events.aggregate(pipeline); }; [Save] をクリックします。
trigger の実行後、 という名前の
eventsPerquetファイルがS3 バケットに表示されることを確認します。