开始使用 MongoDB Kafka Sink Connector
跟随本教程操作,学习如何配置 MongoDB Kafka 接收器连接器,以从 Apache Kafka 主题读取数据并将其写入 MongoDB 集合。
开始使用 MongoDB Kafka Sink Connector
完成教程设置
完成 Kafka Connector 教程设置中的步骤,启动 Confluence Kafka Connect 和 MongoDB 环境。
配置 Sink Connector
使用以下命令在教程 Docker 容器上创建交互式 Shell 会话:
docker exec -it mongo1 /bin/bash
使用以下命令创建名为 simplesink.json
的源配置文件:
nano simplesink.json
将以下配置信息粘贴到文件中,然后保存更改:
{ "name": "mongo-tutorial-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "Tutorial2.pets", "connection.uri": "mongodb://mongo1", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false, "database": "Tutorial2", "collection": "pets" } }
注意
配置属性中突出显示的行指定了转换器,用于指示连接器如何转换来自 Kafka 的数据。
在 shell 中运行以下命令,使用您创建的配置文件启动 Sink Connector:
cx simplesink.json
注意
cx
命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:
curl -X POST -H "Content-Type: application/json" -d @simplesink.json http://connect:8083/connectors -w "\n"
在 shell 中运行以下命令检查连接器的状态:
status
如果 Sink Connector 成功启动,应会看到以下输出:
Kafka topics: ... The status of the connectors: sink | mongo-tutorial-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector Currently configured connectors [ "mongo-tutorial-sink" ] ...
将数据写入 Kafka 主题
在同一 Shell 中,创建一个 Python 脚本以将数据写入到 Kafka 主题。
nano kafkawrite.py
将以下代码粘贴到文件中,然后保存更改:
from kafka import KafkaProducer import json from json import dumps p = KafkaProducer(bootstrap_servers = ['broker:29092'], value_serializer = lambda x:dumps(x).encode('utf-8')) data = {'name': 'roscoe'} p.send('Tutorial2.pets', value = data) p.flush()
运行 Python 脚本:
python3 kafkawrite.py
查看 MongoDB 集合中的数据
在同一 Shell 中,通过运行以下命令,使用 mongosh
(MongoDB Shell) 连接到 MongoDB:
mongosh "mongodb://mongo1"
连接成功后,您应看到以下 MongoDB shell 提示:
rs0 [direct: primary] test>
在提示符处,键入以下命令以检索 Tutorial2.pets
MongoDB 命名空间中的所有文档:
use Tutorial2 db.pets.find()
您应该会在结果中看到以下文档:
{ _id: ObjectId("62659..."), name: 'roscoe' }
输入 exit
命令以退出 MongoDB Shell。
(可选)停止 Docker 容器
完成本教程后,通过停止或删除 Docker 资产来释放计算机上的资源。您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们以重新启动 MongoDB Kafka Connector 开发环境,该环境大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。
提示
更多教程
如果您计划完成更多 MongoDB Kafka Connector 教程,请考虑仅删除容器。如果您不打算完成更多 MongoDB Kafka Connector 教程,请考虑删除容器和映像。
选择与要运行的删除任务对应的选项卡。
运行以下 shell 命令删除开发环境中的 Docker 容器和镜像:
docker-compose -p mongo-kafka down --rmi all
运行以下 shell 命令以删除 Docker 容器,但保留开发环境的映像:
docker-compose -p mongo-kafka down
要重新启动容器,请按照教程设置中启动容器所需的相同步骤进行操作。
总结
在本教程中,您配置了一个接收器连接器,以将数据从 Kafka 主题保存到 MongoDB 集群中的集合。
了解详情
阅读以下资源,详细了解本教程中提到的概念: