使用Atlas Data Federation和Atlas Scheduled Triggers,以Apache Parquet 格式将数据从Atlas 集群复制到 AWS S3 存储桶。 Parquet 是一种柱状格式,适用于期望数据为文件而不是文档的分析和机器学习负载。按重复安排运行副本,以卸载操作集群的分析查询负担。
关于此任务
本教程使用增量方法,这意味着每次触发器运行复制过去 60 秒的文档。另一种选择是完整快照,每次都会复制整个集合。正确的方法取决于您的数据量和下游消费者的要求。
本教程中的 maxFileSize和maxRowGroupSize 值针对测试而不是生产进行优化。对于生产工作负载,查看$out 阶段选项,并根据查询模式调整文件大小和分区。
开始之前
在开始本教程之前,请完成以下任务:
安装并配置 AWS CLI。
步骤
部署具有 S3 和Atlas数据存储的联合数据库实例。
联合数据库实例将多个数据源整合到单个可查询接口中。在本教程中,您将连接 S3 存储桶和Atlas 集群,作为同一联合数据库实例中的数据存储。连接两个数据存储后,副本触发器可从集群读取数据并写入S3 。
部署具有 S3 数据存储的联合数据库实例。要学习;了解如何操作,请参阅部署联合数据库实例数据存储。配置 S3 数据存储时,请授予 IAM角色Read and write 对该存储桶的访问权限,以便Atlas Data Federation可以写入Parquet 文件。
将Atlas 集群添加为联合数据库实例中的第二个数据存储。
完成这些步骤后,请记下联合数据库实例服务的名称。您在后续步骤中需要使用此名称。
创建定时触发器以插入测试文档。
创建定时触发器,每分钟向集群插入一个新文档。这会生成测试数据,以便您验证副本触发器是否有效。
在 Atlas 中,前往 Triggers 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含项目的组织。
如果尚未显示,请从导航栏的 Projects 菜单中选择您的项目。
在侧边栏中,单击 Streaming Data 标题下的 Triggers。
会显示触发器页面。
单击 Add Trigger(连接)。
选择 Scheduled 作为 Trigger Type。
在 Trigger Details 中,设立以下配置:
设置值Trigger NameCreate_Event_Every_Min_TriggerSchedule TypeBasicInterval每
1分钟Event TypeFunction在 Function 部分中,选择 + New Function 并输入以下代码。将占位符值替换为您的Atlas服务名称、数据库和集合。
exports = function () { const mongodb = context.services.get( "NAME_OF_YOUR_ATLAS_SERVICE" ); const db = mongodb.db("NAME_OF_YOUR_DATABASE"); const events = db.collection( "NAME_OF_YOUR_COLLECTION" ); const event = events.insertOne({ time: new Date(), aNumber: Math.random() * 100, type: "event" }); return JSON.stringify(event); }; 单击 Save(连接)。
触发器运行后,确认每分钟都有新文档出现在集群集合中。
创建定时触发器以将数据复制到 S3。
创建定时触发器,使用 $out 阶段运行聚合管道,每分钟将最新文档以 Parquet 格式从集群复制到 S 存储桶。3
在 Triggers 页面上,单击 Add Trigger。
选择 Scheduled 作为 Trigger Type。
在 Trigger Details 中,设立以下配置:
设置值Trigger NameCopy_Events_To_S3_TriggerSchedule TypeBasicInterval每
1分钟Event TypeFunction在Function 部分中,选择+ New Function 并输入以下代码。将占位符值替换为联合数据库实例服务、虚拟数据库、虚拟集合、S3 存储桶和 AWS地区的名称。
exports = function () { const service = context.services.get( "NAME_OF_YOUR_FEDERATED_DATA_SERVICE" ); const db = service.db( "NAME_OF_YOUR_VIRTUAL_DATABASE" ); const events = db.collection( "NAME_OF_YOUR_VIRTUAL_COLLECTION" ); const pipeline = [ { $match: { "time": { $gt: new Date( Date.now() - 60 * 1000 ), $lt: new Date(Date.now()) } } }, { "$out": { "s3": { "bucket": "YOUR_S3_BUCKET_NAME", "region": "YOUR_AWS_REGION", "filename": "events", "format": { "name": "parquet", "maxFileSize": "10GB", "maxRowGroupSize": "100MB" } } } } ]; return events.aggregate(pipeline); }; 单击 Save(连接)。
触发器运行后,确认名为 的 Parquet文件出现在您的
eventsS3 存储桶中。