开始使用 MongoDB Kafka Source Connector
按照本教程学习如何配置 MongoDB Kafka 源连接器以读取变更流中的数据并将其发布到 Apache Kafka 主题。
MongoDB Kafka 源连接器入门
完成教程设置
完成 Kafka Connector 教程设置中的步骤,启动 Confluence Kafka Connect 和 MongoDB 环境。
配置 Source Connector
使用以下命令,在为教程设置下载的教程 Docker 容器上创建交互式 Shell 会话:
docker exec -it mongo1 /bin/bash
使用以下命令创建名为 simplesource.json
的源配置文件:
nano simplesource.json
将以下配置信息粘贴到文件中,然后保存更改:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "Tutorial1", "collection": "orders" } }
在 shell 中运行以下命令,使用您创建的配置文件启动源连接器:
cx simplesource.json
注意
cx
命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:
curl -X POST -H "Content-Type: application/json" -d @simplesource.json http://connect:8083/connectors -w "\n"
在 shell 中运行以下命令检查连接器的状态:
status
如果 Source 连接器成功启动,应会看到以下输出:
Kafka topics: ... The status of the connectors: source | mongo-simple-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-simple-source" ] ...
创建变更事件
在同一 Shell 中,通过运行以下命令,使用 mongosh
(MongoDB Shell) 连接到 MongoDB:
mongosh "mongodb://mongo1"
连接成功后,您应看到以下 MongoDB shell 提示:
rs0 [direct: primary] test>
在提示符处,键入以下命令以插入新文档:
use Tutorial1 db.orders.insertOne( { 'order_id' : 1, 'item' : 'coffee' } )
MongoDB 完成插入命令后,您应该会收到类似于以下文本的确认:
{ acknowledged: true, insertedId: ObjectId("627e7e...") }
输入 exit
命令以退出 MongoDB Shell。
使用以下命令检查 Kafka 环境的状态:
status
在上述命令的输出中,您应该看到源连接器在收到变更事件后创建的新主题:
... "topic": "Tutorial1.orders", ...
通过运行以下命令,确认新 Kafka 主题上的数据内容:
kc Tutorial1.orders
注意
kc
命令是一个帮助程序脚本,用于输出 Kafka 主题的内容。
运行上述命令时,您应该会看到以下 Kafka 主题数据,这些数据是按“Key”和“Value”部分组织的:
在输出的“Value”部分,您可以在以下格式化的 JSON 文档中找到包含突出显示的 fullDocument
数据的 payload
部分:
{ "_id": { "_data": "8262655A..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1650809557, "i": 2 } }, "wallTime": { "$date": "2022-10-13T17:06:23.409Z" }, "fullDocument": { "_id": { "$oid": "62655a..." }, "order_id": 1, "item": "coffee" }, "ns": { "db": "Tutorial1", "coll": "orders" }, "documentKey": { "_id": { "$oid": "62655a..." } } }
重新配置变更流
您可以将变更流配置为仅返回 fullDocument
字段,忽略变更流创建的事件中的元数据。
使用以下命令停止连接器:
del mongo-simple-source
注意
del
命令是一个帮助脚本,用于调用 Kafka Connect REST API 来停止连接器,相当于以下命令:
curl -X DELETE connect:8083/connectors/<parameter>
使用以下命令创建名为 simplesource.json
的源配置文件:
nano simplesource.json
删除现有配置,添加以下配置并保存文件:
{ "name": "mongo-simple-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "publish.full.document.only": true, "database": "Tutorial1", "collection": "orders" } }
在 shell 中运行以下命令,使用您更新的配置文件启动源连接器:
cx simplesource.json
使用以下命令通过 mongosh
连接 MongoDB:
mongosh "mongodb://mongo1"
在提示符处,键入以下命令以插入新文档:
use Tutorial1 db.orders.insertOne( { 'order_id' : 2, 'item' : 'oatmeal' } )
通过运行以下命令退出 mongosh
:
exit
通过运行以下命令,确认新 Kafka 主题上的数据内容:
kc Tutorial1.orders
“Value”文档的 payload
字段应仅包含以下文档数据:
{ "_id": { "$oid": "<your _id value>" }, "order_id": 2, "item": "oatmeal" }
(可选)停止 Docker 容器
完成本教程后,通过停止或删除 Docker 资产来释放计算机上的资源。您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们以重新启动 MongoDB Kafka Connector 开发环境,该环境大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。
提示
更多教程
如果您计划完成更多 MongoDB Kafka Connector 教程,请考虑仅删除容器。如果您不打算完成更多 MongoDB Kafka Connector 教程,请考虑删除容器和映像。
选择与要运行的删除任务对应的选项卡。
运行以下 shell 命令删除开发环境中的 Docker 容器和镜像:
docker-compose -p mongo-kafka down --rmi all
运行以下 shell 命令以删除 Docker 容器,但保留开发环境的映像:
docker-compose -p mongo-kafka down
要重新启动容器,请按照教程设置中启动容器所需的相同步骤进行操作。
总结
在本教程中,您使用不同的配置启动源连接器,更改发布到 Kafka 主题的变更流事件数据。
了解详情
阅读以下资源,详细了解本教程中提到的概念: