Atlas Data Federation과 Atlas 예정된 트리거를 사용하여 Atlas 클러스터에서 Amazon Web Services S3 버킷으로 Apache Parquet 형식의 데이터를 복사합니다. Parquet는 데이터를 문서가 아닌 파일로 예상하는 분석 및 머신 러닝 워크로드에 적합한 컬럼 형식입니다. 운영 클러스터에서 분석 쿼리를 오프로드하기 위해 반복되는 예정에 따라 복사본을 실행합니다.
이 작업에 대하여
이 튜토리얼은 델타 접근 방식을 사용하는데, 이는 각 trigger 실행이 지난 60 초 동안의 문서를 복사한다는 의미입니다. 대안으로는 매번 전체 컬렉션을 복사하는 완전 스냅샷이 있습니다. 올바른 접근 방식은 데이터 볼륨과 다운스트림 사용자의 요구 사항에 따라 달라집니다.
이 튜토리얼에서의 maxFileSize 및 maxRowGroupSize 값은 테스트에 최적화되어 있으며 생산에는 적합하지 않습니다. 생산 워크로드의 경우 $out 스테이지 옵션 을 검토하고 쿼리 패턴에 따라 파일 크기 및 분할을 조정하십시오.
시작하기 전에
이 튜토리얼을 시작하기 전에 다음 작업을 완료하세요.
복사하려는 데이터가 있는 클러스터로 Atlas 계정을 만듭니다. 시작하려면 클러스터 만들기를 참조하십시오.
Amazon Web Services 계정을 만들고 IAM 역할 및 S3 버킷을 만들 권한을 부여합니다. Atlas Data Federation에 필요한 권한을 구성하려면 연합 데이터베이스 인스턴스 데이터 저장소 배포를 참조하세요.
AWS CLI를 설치하고 구성합니다.
단계
S3 과 Atlas 데이터 저장을 사용하여 연합 데이터베이스 인스턴스를 배포합니다.
연합 데이터베이스 인스턴스는 여러 데이터 소스를 하나의 쿼리 가능한 인터페이스로 통합합니다. 이 튜토리얼에서는 S3 버킷과 Atlas 클러스터를 동일한 연합 데이터베이스 인스턴스의 데이터 저장소로 연결합니다. 두 데이터 저장소를 연결하면 복사 trigger가 클러스터에서 읽어 S3에 쓰기 (write)할 수 있습니다.
S3 데이터 저장을 사용하여 연합 데이터베이스 인스턴스를 배포합니다. 자세한 내용은 연합 데이터베이스 인스턴스 데이터 저장 배포를 참조하세요. S3 데이터 저장을 구성할 때 Atlas Data Federation이 Parquet 파일을 쓰기 위해 IAM 역할 Read and write 에 버킷에 대한 액세스 권한을 부여합니다.
Atlas 클러스터를 연합 데이터베이스 인스턴스의 두 번째 데이터 저장소로 추가합니다.
이 단계를 완료한 후 연합 데이터베이스 인스턴스 서비스의 이름을 적어 두세요. 이 이름은 다음 단계에서 필요합니다.
테스트 문서를 삽입하는 예정된 trigger를 만듭합니다.
매분 클러스터에 새 문서를 삽입하는 예정된 trigger를 만듭니다. 이를 통해 테스트 데이터를 생성하여 복사 trigger가 작동하는지 확인할 수 있습니다.
Atlas에서 Triggers 페이지로 이동합니다.
아직 표시되지 않은 경우 탐색 표시줄의 Organizations 메뉴에서 프로젝트가 포함된 조직을 선택합니다.
아직 표시되지 않은 경우 내비게이션 바의 Projects 메뉴에서 프로젝트를 선택합니다.
사이드바에서 Streaming Data 제목 아래의 Triggers를 클릭합니다.
트리거 페이지가 표시됩니다.
Add Trigger를 클릭합니다.
Trigger Type(으)로 Scheduled 을(를) 선택합니다.
Trigger Details에서 다음 구성을 설정하다 .
설정값Trigger NameCreate_Event_Every_Min_TriggerSchedule TypeBasicInterval매
1분Event TypeFunctionFunction 섹션에서 + New Function 을 선택하고 다음 코드를 입력합니다. 플레이스홀더 값을 Atlas 서비스, 데이터베이스 및 컬렉션 이름으로 대체합니다.
exports = function () { const mongodb = context.services.get( "NAME_OF_YOUR_ATLAS_SERVICE" ); const db = mongodb.db("NAME_OF_YOUR_DATABASE"); const events = db.collection( "NAME_OF_YOUR_COLLECTION" ); const event = events.insertOne({ time: new Date(), aNumber: Math.random() * 100, type: "event" }); return JSON.stringify(event); }; Save를 클릭합니다.
trigger 실행 후 매분 클러스터 컬렉션에 새 문서가 표시됨을 확인합니다.
S3으로 데이터를 복사하려면 예정된 트리거를 만들어야 합니다.
$out 단계를 사용하여 집계 파이프라인을 실행하는 예정된 트리거를 만들어 클러스터에서 최신 문서를 Parquet 형식의 S3 버킷으로 매분 복사합니다.
Triggers 페이지에서 Add Trigger를 클릭합니다.
Trigger Type(으)로 Scheduled 을(를) 선택합니다.
Trigger Details에서 다음 구성을 설정하다 .
설정값Trigger NameCopy_Events_To_S3_TriggerSchedule TypeBasicInterval매
1분Event TypeFunctionFunction 섹션에서 + New Function 을 선택하고 다음 코드를 입력합니다. 연합 데이터베이스 인스턴스 서비스, 가상 데이터베이스, 가상 컬렉션, S3 버킷 및 AWS 리전의 이름으로 자리 표시자 값을 대체합니다.
exports = function () { const service = context.services.get( "NAME_OF_YOUR_FEDERATED_DATA_SERVICE" ); const db = service.db( "NAME_OF_YOUR_VIRTUAL_DATABASE" ); const events = db.collection( "NAME_OF_YOUR_VIRTUAL_COLLECTION" ); const pipeline = [ { $match: { "time": { $gt: new Date( Date.now() - 60 * 1000 ), $lt: new Date(Date.now()) } } }, { "$out": { "s3": { "bucket": "YOUR_S3_BUCKET_NAME", "region": "YOUR_AWS_REGION", "filename": "events", "format": { "name": "parquet", "maxFileSize": "10GB", "maxRowGroupSize": "100MB" } } } } ]; return events.aggregate(pipeline); }; Save를 클릭합니다.
After the trigger runs, confirm that a Parquet file named
eventsappears in your S3 bucket.