AI 에이전트의 경우: 문서 인덱스는 https://www.mongodb.com/ko-kr/docs/llms.txt에서 사용할 수 있으며, 모든 페이지의 마크다운 버전은 어떤 URL 경로에 .md를 추가하여 사용할 수 있습니다.
Docs Menu

Change Stream으로 데이터 모니터링

이 가이드에서는 변경 스트림 을 사용하여 데이터베이스의 실시간 변경 사항을 모니터링하는 방법을 배울 수 있습니다. 변경 스트림은 애플리케이션이 컬렉션, 데이터베이스 또는 배포의 데이터 변경 사항을 구독할 수 있도록 하는 MongoDB Server 기능입니다.

Atlas Stream Processing

변경 스트림의 대안으로 Atlas Stream Processing 사용하여 데이터 스트림을 프로세스 하고 변환할 수 있습니다. 데이터베이스 이벤트만 등록하는 변경 스트림과 달리 Atlas Stream Processing 여러 데이터 이벤트 유형을 관리하고 확장된 데이터 처리 기능을 제공합니다. 이 기능에 대해 자세히 학습하려면 MongoDB Atlas 설명서에서 Atlas Stream Processing 을 참조하세요.

이 가이드 의 예제에서는 Atlas 샘플 데이터 세트sample_restaurants.restaurants 컬렉션 을 사용합니다. 무료 MongoDB Atlas cluster 를 생성하고 샘플 데이터 세트를 로드하는 방법을 학습 보려면 PyMongo 시작하기를 참조하세요.

변경 스트림을 열려면 watch() 메서드를 호출합니다. watch() 메서드를 호출하는 인스턴스에 따라 변경 스트림이 수신 대기하는 이벤트 범위가 결정됩니다. 다음 클래스에서 watch() 메서드를 호출할 수 있습니다.

  • MongoClient: MongoDB deployment의 모든 변경 사항을 모니터링합니다.

  • Database: 데이터베이스에 있는 모든 컬렉션의 변경 사항을 모니터링합니다.

  • Collection: 컬렉션의 변경 사항을 모니터링합니다.

다음 예시 에서는 restaurants 컬렉션 에서 변경 스트림 열고 변경 사항이 발생할 때 출력합니다. Synchronous 또는 Asynchronous 탭 선택하여 해당 코드를 확인합니다.

database = client["sample_restaurants"]
collection = database["restaurants"]
with collection.watch() as stream:
for change in stream:
print(change)
database = client["sample_restaurants"]
collection = database["restaurants"]
async with await collection.watch() as stream:
async for change in stream:
print(change)

변경 사항을 확인하려면 애플리케이션 실행 . 그런 다음 별도의 애플리케이션 또는 셸 에서 restaurants 컬렉션 수정합니다. 다음 예시 name 필드 값이 Blarney Castle인 문서 업데이트합니다. Synchronous 또는 Asynchronous 탭 선택하여 해당 코드를 확인합니다.

database = client["sample_restaurants"]
collection = database["restaurants"]
query_filter = { "name": "Blarney Castle" }
update_operation = { '$set' :
{ "cuisine": "Irish" }
}
result = collection.update_one(query_filter, update_operation)
database = client["sample_restaurants"]
collection = database["restaurants"]
query_filter = { "name": "Blarney Castle" }
update_operation = { '$set' :
{ "cuisine": "Irish" }
}
result = await collection.update_one(query_filter, update_operation)

컬렉션을 업데이트하면 변경 스트림 애플리케이션은 변경 사항이 발생하는 즉시 출력합니다. 인쇄된 변경 이벤트는 다음과 유사합니다.

{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}

pipeline 매개변수를 watch() 메서드에 전달하여 변경 스트림 출력을 수정할 수 있습니다. 이 매개변수를 사용하면 지정된 변경 이벤트만 감시할 수 있습니다. 매개변수의 형식을 각각 애그리게이션 단계를 나타내는 객체 목록으로 지정합니다.

pipeline 매개변수에 다음 단계를 지정할 수 있습니다.

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

다음 예시 pipeline 매개 변수를 사용하여 업데이트 작업만 기록하는 변경 스트림 엽니다. Synchronous 또는 Asynchronous 탭 선택하여 해당 코드를 확인합니다.

change_pipeline = { "$match": { "operationType": "update" }},
with collection.watch(pipeline=change_pipeline) as stream:
for change in stream:
print(change)
change_pipeline = { "$match": { "operationType": "update" }},
async with await collection.watch(pipeline=change_pipeline) as stream:
async for change in stream:
print(change)

변경 스트림 출력 수정에 대해 자세히 알아보려면 MongoDB Server 매뉴얼의 변경 스트림 출력 수정 섹션을 참조하세요.

watch() 메서드는 작업을 구성하는 데 사용할 수 있는 옵션을 나타내는 선택적 매개변수를 허용합니다. 옵션을 지정하지 않으면 드라이버는 작업을 사용자 지정하지 않습니다.

다음 표에서는 watch() 의 동작을 사용자 지정하기 위해 설정할 수 있는 옵션에 대해 설명합니다.

속성
설명

pipeline

변경 스트림의 출력을 수정하는 집계 파이프라인 단계 목록입니다.

full_document

문서의 변경 사항만 표시하지 않고 변경 후 전체 문서를 표시할지 여부를 지정합니다. 이 옵션에 대해 자세히 알아보려면 사전 이미지 및 사후 이미지 포함 을 참조하세요.

full_document_before_change

문서의 변경 사항만 표시하는 대신 변경 전의 전체 문서를 표시할지 여부를 지정합니다. 이 옵션에 대해 자세히 알아보려면 사전 이미지 및 사후 이미지 포함 을 참조하세요.

resume_after

watch() 재개 토큰에 지정된 작업 후 변경 사항 반환을 재개하도록 에 지시합니다.
각 변경 스트림 이벤트 문서 필드 로 재개 토큰이 포함되어 있습니다._id _id 이후에 재개하려는 작업을 나타내는 변경 이벤트 문서 의 전체 필드 전달합니다.
resume_after 은 및 와 상호 start_after start_at_operation_time배타적입니다.

start_after

watch()
_id 재개 _id 토큰에 지정된 작업 후 새 변경 스트림 시작하도록 에 지시합니다. 무효화 이벤트 후 알림 다시 시작할
start_after 수 있습니다. 각 변경 스트림 이벤트 문서 필드 로 재개 토큰이 포함되어 있습니다. 이후에 재개하려는 작업을 나타내는 변경 이벤트 문서 의 전체 필드 전달합니다.resume_after 은 및 와 상호 start_at_operation_time 배타적입니다.

start_at_operation_time

watch() 지정된 타임스탬프 이후에 발생하는 이벤트만 반환하도록
start_at_operation_time 에 지시합니다.resume_after 은 및 와 상호 start_after 배타적입니다.

max_await_time_ms

서버가 빈 배치를 반환하기 전에 변경 스트림 커서에 보고할 새 데이터 변경 사항을 기다리는 최대 시간(밀리초)입니다. 기본값은 1000 밀리초입니다.

show_expanded_events

MongoDB Server v6.0 부터 변경 스트림은 createIndexesdropIndexes 이벤트와 같은 데이터 정의 언어(DDL) 이벤트에 대한 변경 알림을 지원합니다. 변경 스트림에 확장 이벤트를 포함하려면 변경 스트림 커서를 생성하고 이 매개변수를 True 로 설정합니다.

batch_size

MongoDB cluster 의 각 응답 배치에서 반환할 변경 이벤트의 최대 개수입니다.

collation

변경 스트림 커서에 사용할 데이터 정렬입니다.

session

ClientSession 의 인스턴스입니다.

comment

작업에 첨부할 주석입니다.

중요

배포에서 MongoDB v6.0 이상을 사용하는 경우에만 컬렉션에서 사전 이미지 및 사후 이미지를 활성화할 수 있습니다.

기본적으로 컬렉션에서 작업을 수행할 때 해당 변경 이벤트에는 해당 작업에 의해 수정된 필드의 델타만 포함됩니다. 변경 전후의 전체 문서를 보려면 watch() 메서드에서 full_document_before_change 또는 full_document 매개변수를 지정합니다.

사전 이미지 는 변경 전의 문서 전체 버전입니다. 변경 스트림 이벤트에 사전 이미지를 포함하려면 full_document_before_change 매개변수를 다음 값 중 하나로 설정합니다.

  • whenAvailable: 변경 이벤트에는 사전 이미지를 사용할 수 있는 경우에만 변경 이벤트에 대해 수정된 문서의 사전 이미지가 포함됩니다.

  • required: 변경 이벤트에는 변경 이벤트에 대한 수정된 문서의 사전 이미지가 포함됩니다. 사전 이미지를 사용할 수 없는 경우 드라이버에서 오류가 발생합니다.

사후 이미지 는 변경 문서의 전체 버전입니다. 변경 스트림 이벤트에 사후 이미지를 포함하려면 full_document 매개변수를 다음 값 중 하나로 설정합니다.

  • updateLookup: 변경 이벤트에는 변경 후 일정 시간 이후의 변경된 문서 전체의 복사본이 포함됩니다.

  • whenAvailable: 변경 이벤트에는 사후 이미지를 사용할 수 있는 경우에만 변경 이벤트에 대해 수정된 문서의 사후 이미지가 포함됩니다.

  • required: 변경 이벤트에는 변경 이벤트에 대한 수정된 문서의 사후 이미지가 포함됩니다. 사후 이미지를 사용할 수 없는 경우 드라이버에서 오류가 발생합니다.

다음 예시 에서는 컬렉션 에서 watch() 메서드를 호출하고 fullDocument 매개변수를 지정하여 업데이트된 문서의 사후 이미지를 포함합니다. Synchronous 또는 Asynchronous 탭 선택하여 해당 코드를 확인합니다.

database = client["sample_restaurants"]
collection = database["restaurants"]
with collection.watch(full_document='updateLookup') as stream:
for change in stream:
print(change)
database = client["sample_restaurants"]
collection = database["restaurants"]
async with await collection.watch(full_document='updateLookup') as stream:
async for change in stream:
print(change)

변경 스트림 애플리케이션이 실행 중인 상태에서 앞의 업데이트 예시 를 사용하여 restaurants 컬렉션의 문서를 업데이트하면 다음과 유사한 변경 이벤트가 출력됩니다.

{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...),
'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens',
'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'},
'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')},
'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}

사전 이미지 및 사후 이미지에 대해 자세히 알아보려면 Change Streams 매뉴얼에서 문서 사전 및 사후 이미지로 MongoDB Server 을 참조하세요.

변경 스트림에 대해 자세히 알아보려면 Change Streams 매뉴얼의 MongoDB Server 을 참조하세요.

이 가이드에서 사용되는 메서드 또는 유형에 대해 자세히 알아보려면 다음 API 설명서를 참조하세요.