AIエージェントの場合: ドキュメントインデックスはhttps://www.mongodb.com/ja-jp/docs/llms.txt で利用可能です。任意のURLパスに .md を追加することで、すべてのページのマークダウン バージョンが利用できます。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

チュートリアル: MongoDBからAWS S3 にデータを継続的にコピーする

Atlas Data Federation と Atlas Scheduled Triggers を使用して、Atlas クラスターからApache Perquet形式のAWS S3 バケットにデータをコピーします。 Partquet は、ドキュメントではなくファイルとしてデータを必要とする分析や機械学習のワークロードに適した列指向形式です。定期的にスケジュールしてコピーを実行し、運用クラスターから分析クエリをオフロードします。

チュートリアルではデルタ アプローチが使用されます。つまり、trigger が実行されるたびに過去 60 秒のドキュメントがコピーされます。代替策として、毎回コレクション全体がコピーされる完全なスナップショットが挙げられます。正しいアプローチは、データ量と下流の利用者の要件によって異なります。

このチュートリアルの とmaxFileSize maxRowGroupSizeの値は、本番環境ではなくテスト用に最適化されています。本番環境のワークロードの場合は、 $out ステージのオプションを確認し、クエリ パターンに基づいてファイルサイズとパーティショニングを調整します。

このチュートリアルを開始する前に、次のタスクを完了してください。

1

フェデレーティッドデータベースインスタンスは、複数のデータソースを単一のクエリ可能なインターフェースに統合します。このチュートリアルでは、 S3 バケットと Atlas クラスターを同じフェデレーティッドデータベースインスタンス内のデータ ストアとして接続します。両方のデータ ストアを接続すると、コピー trigger はクラスターから読み取り、S3 に書込むことができます。

  1. S3 データストアを使用してフェデレーティッドデータベースインスタンスを配置します。方法については、「 フェデレーティッドデータベースインスタンス データストアの配置 」を参照してください。 S データストアを構成するときは、IAM3Read and write ロール にバケットへのアクセスを許可して、Atlas Data Federation が Perquet ファイルを書き込めます。

  2. Atlas クラスターを 2 つ目のデータストアとしてフェデレーティッドデータベースインスタンスに追加します。

これらの手順を完了したら、フェデレーティッドデータベースインスタンスサービスの名前を書き留めておきます。この名前は後の手順で必要になります。

2

毎分ごとに新しいドキュメントをクラスターに挿入する予定されたトリガーを作成します。これによりテスト データが生成され、コピー trigger が動作することを確認できます。

  1. Atlas で、 Triggers ページに移動します。

    1. まだ表示されていない場合は、プロジェクトを含む組織をナビゲーション バーの Organizations メニューで選択します。

    2. まだ表示されていない場合は、ナビゲーション バーの Projects メニューからプロジェクトを選択します。

    3. サイドバーで、 Streaming Data見出しの下のTriggersをクリックします。

    Triggersページが表示されます。

  2. [Add Trigger] をクリックします。

  3. Trigger Type として Scheduled を選択します。

  4. Trigger Details で、次の構成を設定します。

    設定
    Trigger Name

    Create_Event_Every_Min_Trigger

    Schedule Type
    Basic
    Interval

    1分ごと

    Event Type
    Function
  5. Function セクションで、+ New Function を選択し、次のコードを入力します。プレースホルダー値を、Atlas サービス、データベース、コレクションの名前に置き換えます。

    exports = function () {
    const mongodb = context.services.get(
    "NAME_OF_YOUR_ATLAS_SERVICE"
    );
    const db = mongodb.db("NAME_OF_YOUR_DATABASE");
    const events = db.collection(
    "NAME_OF_YOUR_COLLECTION"
    );
    const event = events.insertOne({
    time: new Date(),
    aNumber: Math.random() * 100,
    type: "event"
    });
    return JSON.stringify(event);
    };
  6. [Save] をクリックします。

    trigger の実行後、1 分ごとに新しいドキュメントがクラスターコレクションに表示されることを確認します。

3

$out ステージを使用して集計パイプラインを実行する予定されたトリガーを作成し、1 分ごとに最近のドキュメントをクラスターから S3 バケットにコピーし形式。

  1. Triggersページで、 Add Triggerをクリックします。

  2. Trigger Type として Scheduled を選択します。

  3. Trigger Details で、次の構成を設定します。

    設定
    Trigger Name

    Copy_Events_To_S3_Trigger

    Schedule Type
    Basic
    Interval

    1分ごと

    Event Type
    Function
  4. Functionセクションで、+ New Function を選択し、次のコードを入力します。プレースホルダー値を、フェデレーティッドデータベースインスタンスサービス、 仮想データベース、 仮想コレクション、 S3 バケット、およびAWSリージョンの名前に置き換えます。

    exports = function () {
    const service = context.services.get(
    "NAME_OF_YOUR_FEDERATED_DATA_SERVICE"
    );
    const db = service.db(
    "NAME_OF_YOUR_VIRTUAL_DATABASE"
    );
    const events = db.collection(
    "NAME_OF_YOUR_VIRTUAL_COLLECTION"
    );
    const pipeline = [
    {
    $match: {
    "time": {
    $gt: new Date(
    Date.now() - 60 * 1000
    ),
    $lt: new Date(Date.now())
    }
    }
    },
    {
    "$out": {
    "s3": {
    "bucket": "YOUR_S3_BUCKET_NAME",
    "region": "YOUR_AWS_REGION",
    "filename": "events",
    "format": {
    "name": "parquet",
    "maxFileSize": "10GB",
    "maxRowGroupSize": "100MB"
    }
    }
    }
    }
    ];
    return events.aggregate(pipeline);
    };
  5. [Save] をクリックします。

    trigger の実行後、 という名前のevents PerquetファイルがS3 バケットに表示されることを確認します。