문서 메뉴

문서 홈애플리케이션 개발MongoDB Kafka Connector

Kafka Connector 빠른 시작

이 페이지의 내용

  • 개요
  • 필수 패키지 설치
  • Sandbox 다운로드
  • 샌드박스 시작
  • Connector 추가
  • 소스 커넥터 추가
  • Sink Connector 추가
  • Connector를 통해 문서 내용 보내기
  • 샌드박스 제거
  • 다음 단계

이 가이드에서는 MongoDB와 Apache Kafka 간에 데이터를 전송하기 위해 MongoDB Kafka Connector를 구성하는 방법을 설명합니다.

이 가이드를 완료하고 나면 Kafka Connect REST API를 사용하여 MongoDB에서 데이터를 읽고 이를 Kafka 주제에 쓰거나, Kafka 주제에서 데이터를 읽고 이를 MongoDB 쓰기 위해 MongoDB Kafka Connector를 구성하는 방법을 알 수 있습니다.

이 가이드의 단계를 모두 완료하려면 샘플 데이터 파이프라인을 구축하는데 필요한 서비스가 포함된, 컨테이너화된 개발 환경인 샌드박스를 다운로드하여 작업해야 합니다.

샌드박스 및 샘플 데이터 파이프라인을 설정하려면 다음 섹션을 읽어보세요.

참고

이 가이드를 완료한 후 샌드박스 제거 섹션의 지침에 따라 환경을 제거할 수 있습니다.

다음 패키지를 다운로드하여 설치하세요.

Docker 문서 읽기

이 가이드에서는 다음과 같은 Docker 관련 용어를 사용합니다:

Docker 공식 시작 가이드에서 Docker에 대해 자세히 알아보세요.

샌드박스는 편의성과 일관성을 위해 Docker를 사용합니다. Apache Kafka의 배포 옵션에 대해 자세히 알아보려면 다음 리소스를 참조하세요.

이 튜토리얼에서는 샘플 데이터 파이프라인을 구축하는 데 필요한 서비스가 포함된 샌드박스를 만들었습니다.

샌드박스를 다운로드하려면 튜토리얼 저장소를 사용 중인 개발 환경에 복제하세요. 그런 다음 빠른 시작 튜토리얼에 해당하는 디렉토리로 이동합니다. Bash 또는 유사한 셸을 사용하는 경우 다음 명령을 사용하세요.

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/mongodb-kafka-base/

샌드박스는 Docker 컨테이너에서 다음 서비스를 시작합니다.

  • 복제본 세트로 구성된 MongoDB

  • Apache Kafka

  • MongoDB Kafka Connector가 설치된 Kafka Connect

  • Apache Kafka의 구성을 관리하는 Apache Zookeeper

샌드박스를 시작하려면 튜토리얼 디렉토리에서 다음 명령을 실행합니다.

docker-compose -p mongo-kafka up -d --force-recreate

샌드박스를 시작하면 Docker는 실행에 필요한 모든 이미지를 다운로드합니다.

참고

다운로드는 얼마나 걸리나요?

이 튜토리얼의 Docker 이미지에는 총 약 2.4GB의 공간이 필요합니다. 다음 목록은 다양한 인터넷 속도로 이미지를 다운로드하는 데 걸리는 시간을 보여줍니다.

  • 초당 40메가비트: 8분

  • 초당 20메가비트: 16분

  • 초당 10메가비트: 32분

Docker가 이미지를 다운로드하고 빌드하면 개발 환경에 다음 출력이 표시됩니다.

...
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating mongo1 ... done
Creating mongo1-setup ... done

참고

포트 매핑

샌드박스는 다음 서비스를 호스트 시스템의 포트에 매핑합니다.

  • 샌드박스 MongoDB 서버가 호스트 시스템의 포트 35001에 매핑됩니다.

  • 샌드박스 Kafka Connect JMX 서버는 호스트 장치의 포트 35000에 매핑됩니다.

샌드박스를 시작하려면 해당 포트가 비어 있어야 합니다.

샘플 데이터 파이프라인을 완료하려면 Kafka Connect에 connector 추가하여 Kafka Connect와 MongoDB 간에 데이터를 전송해야 합니다. 소스 connector를 추가하여 MongoDB에서 Apache Kafka로 데이터를 전송합니다. 싱크 connector를 추가하여 Apache Kafka에서 MongoDB로 데이터를 전송합니다.

샌드박스에 커넥터를 추가하려면 먼저 다음 명령을 사용하여 Docker 컨테이너에서 대화형 bash 셸을 시작합니다.

docker exec -it mongo1 /bin/bash

셸 세션이 시작되면 다음 프롬프트가 표시됩니다.

MongoDB Kafka Connector Sandbox $

Docker 컨테이너의 셸을 사용하여 Kafka Connect REST API를 통해 소스 커넥터를 추가하세요.

다음 API 요청은 다음 속성으로 구성된 소스 커넥터를 추가합니다.

  • Kafka Connect가 커넥터를 인스턴스화하는 데 사용하는 클래스

  • connector가 데이터를 읽는 MongoDB 복제본 세트의 연결 URI와 데이터베이스 및 collection입니다.

  • 커넥터가 MongoDB에서 읽는 삽입된 문서에 값이 "MongoDB Kafka Connector"인 필드 travel을 추가하는 애그리게이션 파이프라인

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"sampleData",
"pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
}
}
' \
http://connect:8083/connectors -w "\n"

참고

'연결 실패' 메시지가 표시되는 이유는 무엇입니까?

Kafka Connect REST API가 시작되는 데 최대 3분이 걸립니다. 다음 오류가 수신되면 3분 동안 기다린 후 이전 명령을 다시 실행합니다.

...
curl: (7) Failed to connect to connect port 8083: Connection refused

소스 커넥터를 추가했는지 확인하려면 다음 명령을 실행합니다.

curl -X GET http://connect:8083/connectors

앞의 명령은 실행 중인 커넥터의 이름을 출력해야 합니다.

["mongo-source"]

소스 커넥터 속성에 대해 자세히 알아보려면 소스 커넥터 구성 속성 페이지를 참조하세요.

집계 파이프라인에 대해 자세히 알아보려면 집계 파이프라인에 대한 MongoDB 매뉴얼 페이지를 참조하세요 .

Docker container 내 셸을 활용하여 Kafka Connect REST API와 상호 작용해 싱크 connector를 추가하세요.

다음 API 요청은 다음 속성으로 구성된 싱크 커넥터를 추가합니다.

  • Kafka Connect가 커넥터를 인스턴스화하는 데 사용하는 클래스

  • 커넥터가 데이터를 쓰는 MongoDB 복제본 세트의 연결 URI, 데이터베이스 및 collection

  • 커넥터가 데이터를 읽는 Apache Kafka 주제

  • MongoDB 변경 이벤트 문서에 대한 변경 데이터 캡처 핸들러

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"quickstart.sampleData",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://connect:8083/connectors -w "\n"

소스 및 싱크 커넥터를 모두 추가했는지 확인하려면 다음 명령을 실행합니다:

curl -X GET http://connect:8083/connectors

앞의 명령은 실행 중인 커넥터의 이름을 출력해야 합니다.

["mongo-source", "mongo-sink"]

싱크 커넥터 속성에 대해 자세히 알아보려면 싱크 커넥터 구성 속성 페이지를 참조하세요.

변경 데이터 캡처 이벤트에 대해 자세히 알아보려면 변경 데이터 캡처 핸들러 가이드를 참조하세요.

커넥터를 통해 문서의 내용을 보내려면 소스 커넥터가 데이터를 읽는 MongoDB collection에 문서를 삽입하세요.

collection에 새 문서를 삽입하려면 다음 명령을 사용해 Docker container의 셸에서 MongoDB 셸을 입력하세요.

mongosh mongodb://mongo1:27017/?replicaSet=rs0

앞의 명령을 실행하면 다음과 같은 메시지가 표시됩니다.

rs0 [primary] test>

MongoDB 셸에서 다음 명령을 사용하여 quickstart 데이터베이스의 sampleData collection에 문서를 삽입합니다.

use quickstart
db.sampleData.insertOne({"hello":"world"})

sampleData collection에 문서를 삽입한 후 커넥터가 변경 사항을 처리했는지 확인하세요. 다음 명령을 사용하여 topicData collection의 내용을 확인합니다.

db.topicData.find()

다음과 같은 내용이 출력되어야 합니다.

[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

다음 명령으로 MongoDB 셸을 종료합니다.

exit

개발 환경의 리소스를 절약하려면 샌드박스를 제거하세요.

샌드박스를 제거하기 전에 다음 명령을 실행하여 Docker 컨테이너의 셸 세션을 종료하세요.

exit

Docker container와 이미지를 모두 제거하거나 container만 제거하도록 선택할 수 있습니다. container와 이미지를 제거할 경우 다시 다운로드해 약 2.4GB 크기의 샌드박스를 다시 시작해야 합니다. container만 제거할 경우 이미지는 재사용할 수 있으며 샘플 데이터 파이프라인에서 대용량 파일 대부분을 다운로드하지 않아도 됩니다.

실행할 제거 작업에 해당하는 탭을 선택합니다.

MongoDB Kafka Connector를 설치하는 방법을 알아보려면 MongoDB Kafka Connector 설치 가이드를 참조하세요.

Apache Kafka에서 MongoDB로 데이터를 처리하고 이동하는 방법에 대해 자세히 알아보려면 싱크 커넥터 가이드를 참조하세요.

MongoDB에서 Apache Kafka로 데이터를 처리하고 이동하는 방법에 대해 자세히 알아보려면 소스 커넥터 가이드를 참조하세요.

← 새로운 기능
서론 →