개요
이 가이드에서는 변경 스트림 을 사용하여 데이터베이스의 실시간 변경 사항을 모니터링하는 방법을 배울 수 있습니다. 변경 스트림은 애플리케이션이 컬렉션, 데이터베이스 또는 배포의 데이터 변경 사항을 구독할 수 있도록 하는 MongoDB Server 기능입니다.
변경 스트림은 새로운 변경 이벤트를 출력하여 실시간 데이터 변경 사항에 대한 액세스를 제공합니다. 컬렉션, 데이터베이스 또는 클라이언트 객체에서 변경 스트림을 열 수 있습니다.
샘플 데이터
이 가이드의 예시에서는 다음 Course
구조체를 courses
collection의 문서 모델로 사용합니다.
type Course struct { Title string Enrollment int32 }
이 가이드의 예시를 실행하려면 다음 스니펫을 사용하여 이러한 문서를 db
데이터베이스의 courses
컬렉션에 로드합니다.
coll := client.Database("db").Collection("courses") docs := []interface{}{ Course{Title: "World Fiction", Enrollment: 35}, Course{Title: "Abstract Algebra", Enrollment: 60}, } result, err := coll.InsertMany(context.TODO(), docs)
팁
존재하지 않는 데이터베이스 및 collection
쓰기 작업을 수행할 때 필요한 데이터베이스 및 collection이 없는 경우 서버는 이를 암시적으로 생성합니다.
각 문서에는 각 문서의 title
및 enrollment
필드에 해당하는 과정의 제목 및 최대 등록 수를 비롯한 University 과정에 대한 설명이 포함되어 있습니다.
참고
드라이버가 고유하게 생성하므로 각 예시 출력에는 잘린 _data
, clusterTime
및 ObjectID
값이 표시됩니다.
변경 스트림 열기
다음 객체에서 Watch()
메서드를 사용하여 MongoDB 의 변경 사항을 관찰할 수 있습니다.
컬렉션: 특정 컬렉션의 변경 사항 모니터링
데이터베이스: 데이터베이스 의 모든 컬렉션에 대한 변경 사항을 모니터링합니다.
MongoClient: 모든 데이터베이스의 변경 사항 모니터링
각 객체에 대해 Watch()
메서드는 변경 스트림을 열어 변경 이벤트 문서가 발생하면 이를 전송합니다.
Watch()
메서드에는 컨텍스트 매개변수와 파이프라인 매개변수가 필요합니다. 모든 변경 사항을 반환하려면 빈 Pipeline
객체 전달합니다.
Watch()
메서드는 선택적으로 첫 번째 매개변수로 집계 단계 배열로 구성된 집계 파이프라인을 취합니다. 집계 단계에서는 변경 이벤트를 필터링하고 변환합니다.
예시
다음 예시 에서는 courses
컬렉션 에서 변경 스트림 열고 변경 스트림 이벤트가 발생할 때 출력합니다.
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the change stream events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
별도의 프로그램이나 셸에서 courses
컬렉션을 수정하는 경우 이 코드는 변경 내용이 발생할 때 출력합니다. title
값이 "Advanced Screenwriting"
이고 enrollment
값이 20
인 문서를 삽입하면 다음과 같은 변경 이벤트가 발생합니다.
map[_id:map[_data:...] clusterTime: {...} documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...") enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db] operationType:insert]
실행 가능한 전체 예시를 보려면 이 가이드의 변경 스트림 예제 열기: 전체 파일 섹션을 참조하세요.
변경 이벤트 필터링
파이프라인 매개변수를 사용하여 변경 스트림 출력을 수정합니다. 이 매개변수를 사용하면 특정 변경 이벤트만 감시하도록 설정할 수 있습니다. 파이프라인 매개변수를 문서 배열로 형식화합니다. 이때 각 문서는 집계 단계를 나타냅니다.
이 매개변수에서 다음 파이프라인 단계를 사용할 수 있습니다.
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
다음 예시에서는 db
데이터베이스에서 변경 스트림을 열지만 새 삭제 작업만 감시합니다.
db := client.Database("db") pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}} changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline}) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) // Iterates over the cursor to print the delete operation change events for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
참고
Watch()
메서드가 db
데이터베이스에서 호출되었으므로 이 코드는 이 데이터베이스 내의 모든 컬렉션에 대해 새 삭제 작업을 출력합니다.
변경 스트림 열기 예시: 전체 파일
참고
설정 예시
이 예시 연결 URI를 사용하여 MongoDB 인스턴스 에 연결합니다. MongoDB 인스턴스에 연결하는 방법에 대해 자세히 학습하려면 MongoClient 만들기 가이드를 참조하세요. 이 예시 Atlas 샘플 데이터 세트에 포함된 sample_restaurants
데이터베이스의 restaurants
컬렉션도 사용합니다. Atlas 시작하기 가이드에 따라 MongoDB Atlas 의 무료 계층 에서 데이터베이스 에 로드할 수 있습니다.
다음 예제에서는 restaurants
collection에서 change stream을 열고 삽입된 문서를 인쇄합니다.
coll := client.Database("sample_restaurants").Collection("restaurants") // Creates instructions to watch for insert operations pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}} // Creates a change stream that receives change events cs, err := coll.Watch(context.TODO(), pipeline) if err != nil { panic(err) } defer cs.Close(context.TODO()) fmt.Println("Waiting For Change Events. Insert something in MongoDB!") // Prints a message each time the change stream receives an event for cs.Next(context.TODO()) { var event bson.M if err := cs.Decode(&event); err != nil { panic(err) } output, err := json.MarshalIndent(event["fullDocument"], "", " ") if err != nil { panic(err) } fmt.Printf("%s\n", output) } if err := cs.Err(); err != nil { panic(err) }
예상 결과
전체 예시 실행 후 다른 셸에서 문서 삽입 전체 파일 예시 실행. 삽입 작업을 실행 하면 다음과 유사한 출력이 표시됩니다.
// results truncated { "_id": ..., "name": "8282", "cuisine": "Korean" }
중요
이 사용 예시 작업을 마치면 터미널을 닫아 종료해야 합니다.
변경 스트림 옵션 구성
options
매개변수를 사용하여 Watch()
메서드의 동작을 수정합니다.
Watch()
메서드에 대해 다음 옵션을 지정할 수 있습니다.
ResumeAfter
StartAfter
FullDocument
FullDocumentBeforeChange
BatchSize
MaxAwaitTime
Collation
StartAtOperationTime
Comment
ShowExpandedEvents
Custom
CustomPipeline
이러한 옵션에 대한 자세한 내용은 db를 참조하세요. 컬렉션.watch() 항목을 참조하세요.
사전 및 사후 이미지
컬렉션에서 CRUD 작업을 수행하는 경우 기본적으로 해당 변경 이벤트 문서에는 작업에 의해 수정된 필드의 델타만 포함됩니다. Watch()
메서드의 options
매개변수에 설정을 지정하여 델타 외에도 변경 전후의 전체 문서를 볼 수 있습니다.
변경 후 문서의 전체 버전을 보려면 매개 문서 FullDocument
의 options
필드 다음 값 중 하나로 설정하다 .
UpdateLookup
: 변경 이벤트 문서에는 변경된 문서 전체의 복사본이 포함됩니다.WhenAvailable
: 변경 이벤트 문서에는 변경 이벤트에 대해 수정된 문서의 사후 이미지가 있는 경우 해당 문서의 사후 이미지가 포함됩니다.Required
: 출력은WhenAvailable
과 동일하지만 사후 이미지를 사용할 수 없는 경우 드라이버에서 서버 측 오류가 발생합니다.
문서의 사전 이미지, 변경 전 문서의 전체 버전을 보려면 options
매개변수의 FullDocumentBeforeChange
필드를 다음 값 중 하나로 설정하다.
WhenAvailable
: 변경 이벤트 문서에는 변경 이벤트에 대한 수정된 문서의 사전 이미지가 있는 경우 해당 문서의 사전 이미지가 포함됩니다.Required
: 출력은WhenAvailable
과 동일하지만 사전 이미지를 사용할 수 없는 경우 드라이버에서 서버 측 오류가 발생합니다.
중요
문서 사전 및 사후 이미지에 액세스 하려면 컬렉션 에 대해 changeStreamPreAndPostImages
를 활성화 해야 합니다. 지침 및 자세한 내용은 MongoDB Server 매뉴얼의 collMod 데이터베이스 명령 가이드에서 Change Streams 섹션을 참조하세요.
참고
삽입된 문서에는 사전 이미지가 없고 삭제된 문서에는 사후 이미지가 없습니다.
예시
다음 예시에서는 courses
컬렉션에서 Watch()
메서드를 호출합니다. options
매개변수의 FullDocument
필드 값을 지정하여 변경된 필드만 출력하는 대신 수정된 전체 문서의 사본을 출력합니다.
opts := options.ChangeStream().SetFullDocument(options.UpdateLookup) changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts) if err != nil { panic(err) } defer changeStream.Close(context.TODO()) for changeStream.Next(context.TODO()) { fmt.Println(changeStream.Current) }
title
이 "World
Fiction"
인 문서의 enrollment
값을 35
에서 30
으로 업데이트하면 다음과 같은 변경 이벤트가 발생합니다.
{"_id": {"_data": "..."},"operationType": "update","clusterTime": {"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id": {"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}}, "ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}}, "updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}}, "removedFields": [],"truncatedArrays": []}}
FullDocument
옵션을 지정하지 않으면 동일한 업데이트 작업이 더 이상 변경 이벤트 문서에 "fullDocument"
값을 출력하지 않습니다.
추가 정보
변경 스트림에 대한 자세한 내용은 서버 매뉴얼의 변경 스트림 을 참조하세요.
API 문서
Watch()
메서드에 대해 자세히 알아보려면 다음 API 문서를 참조하세요.