AI 에이전트의 경우: 문서 인덱스는 https://www.mongodb.com/ko-kr/docs/llms.txt에서 사용할 수 있으며, 모든 페이지의 마크다운 버전은 어떤 URL 경로에 .md를 추가하여 사용할 수 있습니다.
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

데이터 변경 사항 모니터링

이 가이드 에서는 변경 스트림 을 사용하여 데이터의 실시간 변경 사항을 모니터 하는 방법을 학습 수 있습니다. 변경 스트림 은 애플리케이션 이 컬렉션, 데이터베이스 또는 배포서버 서버의 데이터 변경 사항을 구독 할 수 있도록 하는 MongoDB Server 기능 입니다.

Atlas Stream Processing

변경 스트림의 대안으로 Atlas Stream Processing 사용하여 데이터 스트림을 프로세스 하고 변환할 수 있습니다. 데이터베이스 이벤트만 등록하는 변경 스트림과 달리 Atlas Stream Processing 여러 데이터 이벤트 유형을 관리하고 확장된 데이터 처리 기능을 제공합니다. 이 기능에 대해 자세히 학습하려면 MongoDB Atlas 설명서에서 Atlas Stream Processing 을 참조하세요.

이 가이드 의 예제에서는 Atlas 샘플 데이터 세트sample_restaurants.restaurants 컬렉션 사용합니다. 무료 MongoDB Atlas cluster 생성하고 샘플 데이터 세트를 로드하는 방법을 학습 .NET/ C# 드라이버 시작하기를 참조하세요.

이 페이지의 예시에서는 다음 Restaurant, AddressGradeEntry 클래스를 모델로 사용합니다:

public class Restaurant
{
public ObjectId Id { get; set; }
public string Name { get; set; }
[BsonElement("restaurant_id")]
public string RestaurantId { get; set; }
public string Cuisine { get; set; }
public Address Address { get; set; }
public string Borough { get; set; }
public List<GradeEntry> Grades { get; set; }
}
public class Address
{
public string Building { get; set; }
[BsonElement("coord")]
public double[] Coordinates { get; set; }
public string Street { get; set; }
[BsonElement("zipcode")]
public string ZipCode { get; set; }
}
public class GradeEntry
{
public DateTime Date { get; set; }
public string Grade { get; set; }
public float? Score { get; set; }
}

참고

restaurants 컬렉션 의 문서는 대소문자 명명 규칙을 사용합니다. 이 가이드 의 예제에서는 ConventionPack 를 사용하여 컬렉션 의 필드를 파스칼식 대/소문자로 역직렬화하고 Restaurant 클래스의 속성에 매핑합니다.

사용자 지정 직렬화에 대해 자세히 알아보려면 사용자 지정 직렬화를참조하세요.

변경 스트림 을 열려면 Watch() 또는 WatchAsync() 메서드를 호출합니다. 메서드를 호출하는 인스턴스 에 따라 변경 스트림 이 수신 대기하는 이벤트 범위가 결정됩니다. 다음 클래스에서 Watch() 또는 WatchAsync() 메서드를 호출할 수 있습니다.

  • MongoClient: MongoDB deployment의 모든 변경 사항을 모니터링합니다.

  • Database: 데이터베이스에 있는 모든 컬렉션의 변경 사항을 모니터링합니다.

  • Collection: 컬렉션의 변경 사항을 모니터링합니다.

다음 예시 에서는 restaurants 컬렉션 에서 변경 스트림 을 열고 변경 사항이 발생할 때 출력합니다. Synchronous 또는 Asynchronous 탭 을 선택하여 해당 코드를 확인합니다.

var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change stream and prints the changes as they're received
using (var cursor = collection.Watch())
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
}
}
var database = client.GetDatabase("sample_restaurants");
var collection = database.GetCollection<Restaurant>("restaurants");
// Opens a change streams and print the changes as they're received
using var cursor = await collection.WatchAsync();
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following type of change: " + change.BackingDocument);
});

변경 사항을 확인하려면 애플리케이션 을 실행 합니다. 그런 다음 별도의 애플리케이션 또는 shell 에서 restaurants 컬렉션 을 수정합니다. "name" 값이 "Blarney Castle" 인 문서 를 업데이트하면 다음과 같은 변경 스트림 출력이 생성됩니다.

{ "_id" : { "_data" : "..." }, "operationType" : "update", "clusterTime" : Timestamp(...),
"wallTime" : ISODate("..."), "ns" : { "db" : "sample_restaurants", "coll" : "restaurants" },
"documentKey" : { "_id" : ObjectId("...") }, "updateDescription" : { "updatedFields" : { "cuisine" : "Irish" },
"removedFields" : [], "truncatedArrays" : [] } }

pipeline 매개변수를 Watch()WatchAsync() 메서드에 전달하여 변경 스트림 출력을 수정할 수 있습니다. 이 매개변수를 사용하면 지정된 변경 이벤트만 감시할 수 있습니다. EmptyPipelineDefinition 클래스를 사용하고 관련 집계 단계 메서드를 추가하여 파이프라인 을 만듭니다.

pipeline 매개변수에 다음과 같은 집계 단계를 지정할 수 있습니다.

  • $addFields

  • $changeStreamSplitLargeEvent

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

PipelineDefinitionBuilder 클래스를 사용하여 집계 파이프라인 빌드 방법을 학습하려면 빌더를 사용한 작업 가이드 의 집계 파이프라인 단계를 참조하세요.

변경 스트림 출력 수정에 대해 자세히 알아보려면 MongoDB Server 매뉴얼의 변경 스트림 출력 수정 섹션을 참조하세요.

다음 예시 에서는 pipeline 매개 변수를 사용하여 업데이트 작업만 기록하는 변경 스트림 을 엽니다. Synchronous 또는 Asynchronous 탭 을 선택하여 해당 코드를 확인합니다.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change streams and print the changes as they're received
using (var cursor = collection.Watch(pipeline))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine("Received the following change: " + change);
}
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
// Opens a change stream and prints the changes as they're received
using (var cursor = await collection.WatchAsync(pipeline))
{
await cursor.ForEachAsync(change =>
{
Console.WriteLine("Received the following change: " + change);
});
}

애플리케이션 크기가 16 MB를 초과하는 변경 이벤트를 생성하는 경우 서버 BSONObjectTooLarge 오류를 반환합니다. 이 오류를 방지하려면 $changeStreamSplitLargeEvent 파이프라인 단계를 사용하여 이벤트를 더 작은 조각으로 분할 수 있습니다. .NET/ C# 드라이버 집계 API 에는 변경 스트림 파이프라인 에 $changeStreamSplitLargeEvent 단계를 추가하는 데 사용할 수 있는 ChangeStreamSplitLargeEvent() 메서드가 포함되어 있습니다.

이 예시 운전자 변경 사항을 감시하고 16 MB 제한을 초과하는 변경 이벤트를 분할 지시합니다. 이 코드는 각 이벤트 에 대한 변경 문서 출력하고 헬퍼 메서드를 호출하여 이벤트 조각을 다시 조립합니다.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = collection.Watch(pipeline);
foreach (var completeEvent in GetNextChangeStreamEvent(cursor.ToEnumerable().GetEnumerator()))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.ChangeStreamSplitLargeEvent();
using var cursor = await collection.WatchAsync(pipeline);
await foreach (var completeEvent in GetNextChangeStreamEventAsync(cursor))
{
Console.WriteLine("Received the following change: " + completeEvent.BackingDocument);
}

참고

앞의 예시 와 같이 변경 이벤트 프래그먼트를 리어셈블하는 것이 좋지만 이 단계는 선택 사항입니다. 동일한 로직을 사용하여 분할 및 전체 변경 이벤트를 모두 볼 수 있습니다.

앞의 예시 에서는 GetNextChangeStreamEvent(), GetNextChangeStreamEventAsync()MergeFragment() 메서드를 사용하여 변경 이벤트 조각을 단일 변경 스트림 문서 로 리어셈블합니다. 다음 코드는 이러한 메서드를 정의합니다.

// Fetches the next complete change stream event
private static IEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEvent<TDocument>(
IEnumerator<ChangeStreamDocument<TDocument>> changeStreamEnumerator)
{
while (changeStreamEnumerator.MoveNext())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
changeStreamEnumerator.MoveNext();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}
// Fetches the next complete change stream event
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
var changeStreamEnumerator = GetNextChangeStreamEventFragmentAsync(changeStreamCursor).GetAsyncEnumerator();
while (await changeStreamEnumerator.MoveNextAsync())
{
var changeStreamEvent = changeStreamEnumerator.Current;
if (changeStreamEvent.SplitEvent != null)
{
var fragment = changeStreamEvent;
while (fragment.SplitEvent.Fragment < fragment.SplitEvent.Of)
{
await changeStreamEnumerator.MoveNextAsync();
fragment = changeStreamEnumerator.Current;
MergeFragment(changeStreamEvent, fragment);
}
}
yield return changeStreamEvent;
}
}
private static async IAsyncEnumerable<ChangeStreamDocument<TDocument>> GetNextChangeStreamEventFragmentAsync<TDocument>(
IAsyncCursor<ChangeStreamDocument<TDocument>> changeStreamCursor)
{
while (await changeStreamCursor.MoveNextAsync())
{
foreach (var changeStreamEvent in changeStreamCursor.Current)
{
yield return changeStreamEvent;
}
}
}
// Merges a fragment into the base event
private static void MergeFragment<TDocument>(
ChangeStreamDocument<TDocument> changeStreamEvent,
ChangeStreamDocument<TDocument> fragment)
{
foreach (var element in fragment.BackingDocument)
{
if (element.Name != "_id" && element.Name != "splitEvent")
{
changeStreamEvent.BackingDocument[element.Name] = element.Value;
}
}
}

대규모 변경 이벤트 분할에 대해 자세히 학습 MongoDB Server 매뉴얼에서 $changeStreamSplitLargeEvent를 참조하세요.

Watch()WatchAsync() 메서드는 작업을 구성하는 데 사용할 수 있는 옵션을 나타내는 선택적 매개변수를 허용합니다. 옵션을 지정하지 않으면 운전자 는 작업을 사용자 지정하지 않습니다.

다음 표에서는 Watch()WatchAsync() 의 동작을 사용자 지정하기 위해 설정하다 수 있는 옵션에 대해 설명합니다.

옵션
설명

FullDocument

문서 의 변경 사항만 표시하는 대신 변경 후 전체 문서 표시할지 여부를 지정합니다. 이 옵션에 대해 자세히 학습 사전 이미지 및 사후 이미지 포함을 참조하세요.

FullDocumentBeforeChange

문서 의 변경 사항만 표시하는 대신 변경 전의 전체 문서 표시할지 여부를 지정합니다. 이 옵션에 대해 자세히 학습 사전 이미지 및 사후 이미지 포함을 참조하세요.

ResumeAfter

재개 토큰에 지정된 Watch() WatchAsync() 작업 후 변경 사항 반환을 재개하도록
_id 또는 _id
ResumeAfter 에 지시합니다. 각 변경 스트림 이벤트 문서 필드 로 재개 토큰이 포함되어 있습니다. 이후에 재개하려는 작업을 나타내는 변경 이벤트 문서 의 전체 필드 전달합니다. 는 StartAfter 및 와 상호 StartAtOperationTime 배타적입니다.

StartAfter

Watch() WatchAsync() 재개 토큰에 지정된 작업 후 새 변경
스트림 시작하도록 또는 에 지시합니다._id 무효화 _id 이벤트 후 알림 다시 시작할
StartAfter 수 있습니다. 각 변경 스트림 이벤트 문서 필드 로 재개 토큰이 포함되어 있습니다. 이후에 재개하려는 작업을 나타내는 변경 이벤트 문서 의 전체 필드 전달합니다.ResumeAfter 는 및 와 상호 StartAtOperationTime 배타적입니다.

StartAtOperationTime

지정된 타임스탬프 이후에 발생하는 이벤트만 반환하도록 또는 Watch() WatchAsync()
StartAtOperationTime 에 지시합니다.ResumeAfter 는 및 와 상호 StartAfter 배타적입니다.

MaxAwaitTime

서버 빈 배치 를 반환하기 전에 변경 스트림 커서 에 보고하기 위해 새 데이터 변경 사항을 기다리는 최대 시간(밀리초)을 지정합니다. 기본값은 1000 밀리초입니다.

ShowExpandedEvents

MongoDB Server v6.0 부터 변경 스트림은 createIndexesdropIndexes 이벤트와 같은 데이터 정의 언어(DDL) 이벤트에 대한 변경 알림을 지원합니다. 변경 스트림에 확장 이벤트를 포함하려면 변경 스트림 커서를 생성하고 이 매개변수를 True 로 설정합니다.

batchSize

변경 스트림 이 각 배치 에서 반환할 수 있는 최대 문서 수를 지정하며, 이는 Watch() 또는 WatchAsync()에 적용됩니다. batchSize 옵션을 설정하다 하지 않은 경우, 감시 함수의 초기 배치 크기는 101 문서이고, 각 후속 배치 의 최대 크기는 16 메비바이트(MiB)입니다. 이 옵션은 16 MiB보다 작은 제한을 시행하다 할 수 있지만 더 큰 제한은 적용할 수 없습니다. batchSize 를 16 MiB보다 큰 배치가 발생하는 제한으로 설정하다 경우, 이 옵션은 아무런 효과가 없으며 Watch() 또는 WatchAsync() 는 기본값 배치 크기를 사용합니다.

Collation

변경 스트림 커서 에 사용할 데이터 정렬을 지정합니다. 자세한 내용은 이 페이지의 데이터 정렬 섹션을 참조하세요.

Comment

작업에 주석을 첨부합니다.

작업에 대한 데이터 정렬을 구성하려면 데이터 정렬 클래스의 인스턴스를 만듭니다.

다음 표에서는 Collation 생성자가 허용하는 매개변수에 대해 설명합니다. 또한 각 설정의 값을 읽는 데 사용할 수 있는 해당 클래스 속성 도 나열되어 있습니다.

Parameter
설명
클래스 속성

locale

유니코드용 국제 구성 요소(ICU) 국가 및 언어 설정 및 언어 설정을 지정합니다. 지원되는 국가 및 언어 설정 목록은 MongoDB Server

Collation.Simple Collation locale "simple"매뉴얼의
데이터 정렬 국가 및 언어 설정 및 기본 매개변수를 참조하세요. 단순 이진 비교를 사용하려면 정적 속성 사용하여 가 로 설정하다 객체 반환합니다. 데이터 유형: string

Locale

caseLevel

(선택 사항) 대소문자 비교를 포함할지 여부를 지정합니다.

이 인수가 인 경우 true 드라이버의 동작은 인수의 값에 따라 달라집니다.strength

- 가 인 경우 strength CollationStrength.Primary운전자 기본 문자와 대소문자를 비교합니다.
- 가 strength CollationStrength.Secondary인 경우 운전자 기본 문자, 분음 부호, 기타 세컨더리 차이점 및 대소문자를 비교합니다.
- strength 이 다른 값인 경우 이 인수는 무시됩니다.

이 인수가 인 false 경우 운전자 강도 수준 Primary 또는 에서 대소문자 비교를 포함하지 Secondary 않습니다.

데이터 유형: boolean
기본값: false

CaseLevel

caseFirst

(선택 사항) 3차 수준 비교 시 대소문자 차이의 정렬 순서를 지정합니다.

데이터 유형: CollationCaseFirst
기본값: CollationCaseFirst.Off

CaseFirst

strength

(선택 사항) ICU 문서에 정의된

대로 수행할 비교 수준을 지정합니다. 데이터 유형: CollationStrength
기본값: CollationStrength.Tertiary

Strength

numericOrdering

(선택 사항) 운전자 숫자 문자열을 숫자로 비교할지 여부를 지정합니다.

이 인수가 true 이면 운전자 숫자 문자열을 숫자로 비교합니다. 예시 를 들어 "10문자열과 "2" 문자열을 비교할 때 운전자 값을 10 및 로 2 처리하고 가 10 더 큰

것을 찾습니다. 이 인수가 false 이거나 제외되면 운전자 숫자 문자열을 문자열로 비교합니다. 예시 를 들어10문자열과 '2' 문자열을 비교할 때 운전자 한 번에 한 문자씩 비교합니다. '1'는 ' '보다 작으므로2운전자 '10'이 ' '보다 작은 것을2찾습니다.

자세한 내용은 MongoDB Server 매뉴얼에서

데이터 정렬 제한을 참조하세요. 데이터 유형: boolean
기본값: false

NumericOrdering

alternate

(선택 사항) 운전자 비교를 위해 공백과 문장 부호를 기본 문자로 간주할지 여부를 지정합니다.

데이터 유형: CollationAlternate 기본값:(공백과
CollationAlternate.NonIgnorable 문장 부호는 기본 문자로 간주됨)

Alternate

maxVariable

(선택 사항) 인수가 일 때 운전자 무시할 수 있는 것으로 간주하는 alternate CollationAlternate.Shifted문자를 지정합니다.

데이터 유형: CollationMaxVariable 기본값:(
CollationMaxVariable.Punctuation 운전자 구두점 및 공백 무시)

MaxVariable

normalization

(선택 사항) 운전자 필요에 따라 텍스트를 정규화할지 여부를 지정합니다.

대부분의 텍스트에는 정규화가 필요하지 않습니다. 정규화에 대한 자세한 내용은 ICU 문서를 참조하세요.

데이터 유형: 기본값: boolean
false

Normalization

backwards

(선택 사항) 분음 부호가 포함된 문자열을 문자열 뒤쪽에서 앞쪽으로 정렬할지 여부를 지정합니다.

데이터 유형: boolean
기본값: false

Backwards

데이터 정렬에 대한 자세한 내용은 MongoDB Server 매뉴얼의 데이터 정렬 페이지를 참조하세요.

중요

배포에서 MongoDB v6.0 이상을 사용하는 경우에만 컬렉션에서 사전 이미지 및 사후 이미지를 활성화할 수 있습니다.

기본값 으로 컬렉션 에서 작업을 수행할 때 해당 변경 이벤트 에는 해당 작업에 의해 수정된 필드의 델타만 포함됩니다. 변경 전후의 전체 문서 를 보려면 ChangeStreamOptions 객체 를 만들고 FullDocumentBeforeChange 또는 FullDocument 옵션을 지정합니다. 그런 다음 ChangeStreamOptions 객체 를 Watch() 또는 WatchAsync() 메서드에 전달합니다.

사전 이미지 는 변경 전의 문서 전체 버전입니다. 변경 스트림 이벤트 에 사전 이미지를 포함하려면 FullDocumentBeforeChange 옵션을 다음 값 중 하나로 설정하다 합니다.

  • ChangeStreamFullDocumentBeforeChangeOption.WhenAvailable: 변경 이벤트에는 사전 이미지를 사용할 수 있는 경우에만 변경 이벤트에 대해 수정된 문서의 사전 이미지가 포함됩니다.

  • ChangeStreamFullDocumentBeforeChangeOption.Required: 변경 이벤트에는 변경 이벤트에 대한 수정된 문서의 사전 이미지가 포함됩니다. 사전 이미지를 사용할 수 없는 경우 드라이버에서 오류가 발생합니다.

사후 이미지 는 변경 문서 의 전체 버전입니다. 변경 스트림 이벤트 에 사후 이미지를 포함하려면 FullDocument 옵션을 다음 값 중 하나로 설정하다 합니다.

  • ChangeStreamFullDocumentOption.UpdateLookup: 변경 이벤트에는 변경 후 일정 시간 이후의 변경된 문서 전체의 복사본이 포함됩니다.

  • ChangeStreamFullDocumentOption.WhenAvailable: 변경 이벤트에는 사후 이미지를 사용할 수 있는 경우에만 변경 이벤트에 대해 수정된 문서의 사후 이미지가 포함됩니다.

  • ChangeStreamFullDocumentOption.Required: 변경 이벤트에는 변경 이벤트에 대한 수정된 문서의 사후 이미지가 포함됩니다. 사후 이미지를 사용할 수 없는 경우 드라이버에서 오류가 발생합니다.

다음 예시 에서는 컬렉션 에서 변경 스트림 을 열고 FullDocument 옵션을 지정하여 업데이트된 문서의 사후 이미지를 포함합니다. Synchronous 또는 Asynchronous 탭 을 선택하여 해당 코드를 확인합니다.

var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using (var cursor = collection.Watch(pipeline, options))
{
foreach (var change in cursor.ToEnumerable())
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
}
}
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<Restaurant>>()
.Match(change => change.OperationType == ChangeStreamOperationType.Update);
var options = new ChangeStreamOptions
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup,
};
using var cursor = await collection.WatchAsync(pipeline, options);
await cursor.ForEachAsync(change =>
{
Console.WriteLine(change.FullDocument.ToBsonDocument());
});

앞의 코드 예시 를 실행하고 "name" 값이 "Blarney Castle" 인 문서 를 업데이트하면 다음과 같은 변경 스트림 출력이 생성됩니다.

{ "_id" : ObjectId("..."), "name" : "Blarney Castle", "restaurant_id" : "40366356",
"cuisine" : "Traditional Irish", "address" : { "building" : "202-24", "coord" : [-73.925044200000002, 40.5595462],
"street" : "Rockaway Point Boulevard", "zipcode" : "11697" }, "borough" : "Queens", "grades" : [...] }

사전 이미지 및 사후 이미지에 대해 자세히 알아보려면 Change Streams 매뉴얼에서 문서 사전 및 사후 이미지로 MongoDB Server 을 참조하세요.

변경 스트림에 대해 자세히 알아보려면 Change Streams 매뉴얼의 MongoDB Server 을 참조하세요.

이 가이드에서 사용되는 메서드 또는 유형에 대해 자세히 알아보려면 다음 API 설명서를 참조하세요.