Overview
本指南介绍了如何配置 MongoDB Kafka Connector,以在 MongoDB 和 Apache Kafka 之间发送数据。
完成本指南后,您应该了解如何使用 Kafka Connect REST API 来配置 MongoDB Kafka Connector,以便从 MongoDB 读取数据并将其写入 Kafka 主题,以及从 Kafca 主题读取数据并将其写入 MongoDB。
要完成本指南中的步骤,您必须下载沙盒并在其中工作,沙盒是一种容器化开发环境,其中包含构建示例数据管道所需的服务。
阅读以下部分,设置沙盒和示例数据管道。
注意
完成本指南后,您可以按照 删除沙盒部分的说明删除环境。
安装所需的软件包
下载并安装以下软件包:
沙箱使用 Docker 来实现便利性和一致性。 要了解有关 Apache Kafka 部署选项的更多信息,请参阅以下资源:
下载沙盒
我们创建了一个沙箱,其中包含本教程中构建样本数据管道所需的服务。
要下载沙盒,请将教程存储库克隆到您的开发环境中。然后导航到快速入门教程对应的目录。如果使用 bash 或类似的 Shell,请使用以下命令:
git clone https://github.com/mongodb-university/kafka-edu.git cd kafka-edu/docs-examples/mongodb-kafka-base/ 
启动沙盒
沙箱在 Docker 容器中启动以下服务:
- MongoDB,配置为副本集 
- Apache Kafka 
- 安装了 MongoDB Kafka Connector 的 Kafka Connect 
- 管理 Apache Kafka 配置的 Apache Zookeeper 
要启动沙盒,请从教程目录运行以下命令:
docker compose -p mongo-kafka up -d --force-recreate 
在您启动沙箱时,Docker 下载它运行所需的任何映像。
注意
下载需要多长时间?
总的来说,本教程的 Docker 映像需要大约 2.4 GB 空间。以下列表显示了以不同网速下载这些映像所需的时间:
- 每秒 40 兆比特:8 分钟 
- 20 兆位/秒:16 分钟 
- 10 兆位/秒:32 分钟 
在 Docker 下载并构建这些映像后,您应该会在开发环境中看到以下输出:
... Creating zookeeper ... done Creating broker    ... done Creating schema-registry ... done Creating connect         ... done Creating rest-proxy      ... done Creating mongo1          ... done Creating mongo1-setup    ... done 
注意
端口映射
沙箱会将以下服务映射到主机上的端口:
- 沙箱 MongoDB 服务器映射到主机上的 - 35001端口
- 沙箱 Kafka Connect JMX 服务器映射到主机上的端口 - 35000
这些端口必须空闲才能启动沙箱。
添加连接器
要完成样本数据管道,必须向 Kafka Connect 添加连接器,以在 Kafka Connect 和 MongoDB 之间传输数据。添加源连接器,将数据从 MongoDB 传输到 Apache Kafka。添加接收器连接器以将数据从 Apache Kafka 传输到 MongoDB。
要在沙箱中添加连接器,请首先使用以下命令在 Docker 容器中启动交互式 bash Shell:
docker exec -it mongo1 /bin/bash 
在 Shell 会话启动后,您应该会看到以下提示:
MongoDB Kafka Connector Sandbox $ 
添加源连接器
使用 Docker 容器中的 Shell 通过 Kafka Connect REST API 添加源连接器。
以下 API 请求会添加一个配置了以下属性的源连接器:
- Kafka Connect 用于实例化连接器的类 
- 连接器从中读取数据的连接 URI、数据库和 MongoDB 副本集集合 
- 聚合管道,将值为 - "MongoDB Kafka Connector"的字段- travel添加到 Connector 从 MongoDB 读取的插入文档
curl -X POST \      -H "Content-Type: application/json" \      --data '      {"name": "mongo-source",       "config": {          "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",          "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",          "database":"quickstart",          "collection":"sampleData",          "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"          }      }      ' \      http://connect:8083/connectors -w "\n" 
注意
为什么我会看到“Failed to connect”(无法连接)消息?
Kafka Connect REST API 启动最多需要三分钟。如果收到以下错误,请等待三分钟,然后再次运行上述命令:
... curl: (7) Failed to connect to connect port 8083: Connection refused 
要确认添加了源连接器,请运行以下命令:
curl -X GET http://connect:8083/connectors 
上述命令应输出正在运行的连接器的名称:
["mongo-source"] 
要学习;了解有关源Connector属性的更多信息,请参阅源连接器配置属性页面。
要了解有关聚合管道的更多信息,请参阅 MongoDB 手册中的聚合管道页面。
添加接收器连接器
使用 Docker container 中的 shell 通过 Kafka Connect REST API 添加 sink connector。
以下 API 请求添加一个配置了以下属性的接收器连接器:
- Kafka Connect 用于实例化连接器的类 
- 连接器写入数据的 MongoDB 副本集的连接 URI、数据库和集合 
- 该连接器从中读取数据的 Apache Kafka 主题 
- 用于 MongoDB 变更事件文档的变更数据捕获处理程序 
curl -X POST \      -H "Content-Type: application/json" \      --data '      {"name": "mongo-sink",       "config": {          "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",          "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",          "database":"quickstart",          "collection":"topicData",          "topics":"quickstart.sampleData",          "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"          }      }      ' \      http://connect:8083/connectors -w "\n" 
要确认已添加 Source Connector 和 Sink Connector,请运行以下命令:
curl -X GET http://connect:8083/connectors 
上述命令应输出正在运行的连接器的名称:
["mongo-source", "mongo-sink"] 
要了解有关接收器连接器属性的更多信息,请参阅接收器连接器配置属性页面。
要了解有关变更数据捕获事件的更多信息,请参阅变更数据捕获处理指南。
通过 Connector 发送文档内容
要通过连接器发送文档内容,请将文档插入到源连接器从中读取数据的 MongoDB 集合中。
要将新文档插入到集合中,请使用以下命令从 Docker 容器中的 Shell 进入 MongoDB Shell:
mongosh mongodb://mongo1:27017/?replicaSet=rs0 
运行前面的命令后,应该会看到以下提示:
rs0 [primary] test> 
在 MongoDB Shell 中,使用以下命令,将文档插入 quickstart 数据库的 sampleData 集合:
use quickstart db.sampleData.insertOne({"hello":"world"}) 
在将文档插入到 sampleData 集合后,请确认连接器已处理该更改。使用以下命令检查 topicData 集合内容:
db.topicData.find() 
您应该会看到如下所示的输出:
[     {       _id: ObjectId(...),       hello: 'world',       travel: 'MongoDB Kafka Connector'     } ] 
运行以下命令以退出 MongoDB Shell:
exit 
删除沙盒
要节省开发环境中的资源,请删除沙盒。
在删除沙箱之前,运行以下命令以退出 Docker 容器中的 Shell 会话:
exit 
您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们才能重新启动沙盒,其大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。
选择与要运行的删除任务对应的选项卡。
运行以下 shell 命令以从沙箱中删除 Docker 容器和映像:
docker-compose -p mongo-kafka down --rmi all 
运行以下 shell 命令以删除 Docker 容器,但保留沙盒的映像:
docker-compose -p mongo-kafka down 
后续步骤
要了解如何安装 MongoDB Kafka Connector,请参阅安装 MongoDB Kafka Connector 指南。
要了解有关如何处理数据并将数据从 Apache Kafka 转移到 MongoDB 的更多信息,请参阅 Sink Connector 指南。
要了解有关如何处理数据并将数据从 MongoDB 移动到 Apache Kafka 的更多信息,请参阅源连接器指南。