Kafka Connector 快速入门
概述
本指南介绍了如何配置 MongoDB Kafka Connector,以在 MongoDB 和 Apache Kafka 之间发送数据。
完成本指南后,您应该了解如何使用 Kafka Connect REST API 来配置 MongoDB Kafka Connector,以便从 MongoDB 读取数据并将其写入 Kafka 主题,以及从 Kafca 主题读取数据并将其写入 MongoDB。
要完成本指南中的步骤,您必须下载沙盒并在其中工作,沙盒是一种容器化开发环境,其中包含构建示例数据管道所需的服务。
阅读以下部分,设置沙盒和示例数据管道。
注意
完成本指南后,您可以按照 删除沙盒部分的说明删除环境。
安装所需的软件包
下载并安装以下软件包:
沙箱使用 Docker 来实现便利性和一致性。 要了解有关 Apache Kafka 部署选项的更多信息,请参阅以下资源:
下载沙盒
我们创建了一个沙箱,其中包含本教程中构建样本数据管道所需的服务。
要下载沙盒,请将教程存储库克隆到您的开发环境中。然后导航到快速入门教程对应的目录。如果使用 bash 或类似的 Shell,请使用以下命令:
git clone https://github.com/mongodb-university/kafka-edu.git cd kafka-edu/docs-examples/mongodb-kafka-base/
启动沙盒
沙箱在 Docker 容器中启动以下服务:
MongoDB,配置为副本集
Apache Kafka
安装了 MongoDB Kafka Connector 的 Kafka Connect
管理 Apache Kafka 配置的 Apache Zookeeper
要启动沙盒,请从教程目录运行以下命令:
docker-compose -p mongo-kafka up -d --force-recreate
在您启动沙箱时,Docker 下载它运行所需的任何映像。
注意
下载需要多长时间?
总的来说,本教程的 Docker 映像需要大约 2.4 GB 空间。以下列表显示了以不同网速下载这些映像所需的时间:
每秒 40 兆比特:8 分钟
20 兆位/秒:16 分钟
10 兆位/秒:32 分钟
在 Docker 下载并构建这些映像后,您应该会在开发环境中看到以下输出:
... Creating zookeeper ... done Creating broker ... done Creating schema-registry ... done Creating connect ... done Creating rest-proxy ... done Creating mongo1 ... done Creating mongo1-setup ... done
注意
端口映射
沙箱会将以下服务映射到主机上的端口:
沙箱 MongoDB 服务器映射到主机上的
35001
端口沙箱 Kafka Connect JMX 服务器映射到主机上的端口
35000
这些端口必须空闲才能启动沙箱。
添加连接器
要完成样本数据管道,必须向 Kafka Connect 添加连接器,以在 Kafka Connect 和 MongoDB 之间传输数据。添加源连接器,将数据从 MongoDB 传输到 Apache Kafka。添加接收器连接器以将数据从 Apache Kafka 传输到 MongoDB。
要在沙箱中添加连接器,请首先使用以下命令在 Docker 容器中启动交互式 bash Shell:
docker exec -it mongo1 /bin/bash
在 Shell 会话启动后,您应该会看到以下提示:
MongoDB Kafka Connector Sandbox $
添加源连接器
使用 Docker 容器中的 Shell 通过 Kafka Connect REST API 添加源连接器。
以下 API 请求会添加一个配置了以下属性的源连接器:
Kafka Connect 用于实例化连接器的类
连接器从中读取数据的连接 URI、数据库和 MongoDB 副本集集合
聚合管道,将值为
"MongoDB Kafka Connector"
的字段travel
添加到 Connector 从 MongoDB 读取的插入文档
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-source", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"sampleData", "pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]" } } ' \ http://connect:8083/connectors -w "\n"
注意
为什么我会看到“Failed to connect”(无法连接)消息?
Kafka Connect REST API 启动最多需要三分钟。如果收到以下错误,请等待三分钟,然后再次运行上述命令:
... curl: (7) Failed to connect to connect port 8083: Connection refused
要确认添加了源连接器,请运行以下命令:
curl -X GET http://connect:8083/connectors
上述命令应输出正在运行的连接器的名称:
["mongo-source"]
要了解有关源连接器属性的更多信息,请参阅源连接器配置属性页面。
要了解有关聚合管道的更多信息,请参阅 MongoDB 手册中的聚合管道页面。
添加接收器连接器
使用 Docker container 中的 shell 通过 Kafka Connect REST API 添加 sink connector。
以下 API 请求添加一个配置了以下属性的接收器连接器:
Kafka Connect 用于实例化连接器的类
连接器写入数据的 MongoDB 副本集的连接 URI、数据库和集合
该连接器从中读取数据的 Apache Kafka 主题
用于 MongoDB 变更事件文档的变更数据捕获处理程序
curl -X POST \ -H "Content-Type: application/json" \ --data ' {"name": "mongo-sink", "config": { "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0", "database":"quickstart", "collection":"topicData", "topics":"quickstart.sampleData", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler" } } ' \ http://connect:8083/connectors -w "\n"
要确认已添加 Source Connector 和 Sink Connector,请运行以下命令:
curl -X GET http://connect:8083/connectors
上述命令应输出正在运行的连接器的名称:
["mongo-source", "mongo-sink"]
要了解有关接收器连接器属性的更多信息,请参阅接收器连接器配置属性页面。
要了解有关变更数据捕获事件的更多信息,请参阅变更数据捕获处理指南。
通过 Connector 发送文档内容
要通过连接器发送文档内容,请将文档插入到源连接器从中读取数据的 MongoDB 集合中。
要将新文档插入到集合中,请使用以下命令从 Docker 容器中的 Shell 进入 MongoDB Shell:
mongosh mongodb://mongo1:27017/?replicaSet=rs0
运行前面的命令后,应该会看到以下提示:
rs0 [primary] test>
在 MongoDB Shell 中,使用以下命令,将文档插入 quickstart
数据库的 sampleData
集合:
use quickstart db.sampleData.insertOne({"hello":"world"})
在将文档插入到 sampleData
集合后,请确认连接器已处理该更改。使用以下命令检查 topicData
集合内容:
db.topicData.find()
您应该会看到如下所示的输出:
[ { _id: ObjectId(...), hello: 'world', travel: 'MongoDB Kafka Connector' } ]
运行以下命令以退出 MongoDB Shell:
exit
删除沙盒
要节省开发环境中的资源,请删除沙盒。
在删除沙箱之前,运行以下命令以退出 Docker 容器中的 Shell 会话:
exit
您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们才能重新启动沙盒,其大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。
选择与要运行的删除任务对应的选项卡。
运行以下 shell 命令以从沙箱中删除 Docker 容器和映像:
docker-compose -p mongo-kafka down --rmi all
运行以下 shell 命令以删除 Docker 容器,但保留沙盒的映像:
docker-compose -p mongo-kafka down
后续步骤
要了解如何安装 MongoDB Kafka Connector,请参阅安装 MongoDB Kafka Connector 指南。
要了解有关如何处理数据并将数据从 Apache Kafka 转移到 MongoDB 的更多信息,请参阅 Sink Connector 指南。
要了解有关如何处理数据并将数据从 MongoDB 移动到 Apache Kafka 的更多信息,请参阅源连接器指南。