使用变更数据捕获处理程序复制数据
在此页面上
Overview
跟随本教程学习如何使用变更数据捕获 (CDC) 处理程序通过 MongoDB Kafka Connector 复制数据。 CDC 处理程序是将 CDC 事件转换为 MongoDB 写入操作的应用程序。 当必须将一个数据存储中的更改复制到另一个数据存储中时,请使用 CDC 处理程序。
本教程中,您将配置并运行 MongoDB Kafka 源连接器和接收连接器,以便使用 CDC 使两个 MongoDB 集合包含相同的文档。源连接器将原始集合中的变更流数据写入 Kafka 主题,接收连接器将 Kafka 主题数据写入目标 MongoDB 集合。
如果您想详细了解 CDC 处理程序的工作原理,请参阅变更数据捕获处理程序指南。
使用 CDC 处理器复制数据
完成教程设置
完成 Kafka Connector 教程设置中的步骤,以启动 Confluent Kafka Connect 和 MongoDB 环境。
启动交互式 shell
在不同窗口中的 Docker 容器上启动两个交互式 Shell。在本教程中,您可以使用 Shell 来运行和观察不同任务。
从终端运行以下命令,启动交互式 Shell。
docker exec -it mongo1 /bin/bash
在本教程中,我们将此交互式 shell 称为 CDCShell1。
在第二个终端运行以下命令,启动交互式 Shell:
docker exec -it mongo1 /bin/bash
在本教程中,我们将把这个交互式 Shell 称为 CDCShell2。
请将屏幕上的两个窗口排列好,以便同时查看实时更新。
使用 CDCShell1 配置连接器并监控 Kafka 主题。使用 CDCShell2 在 MongoDB 中执行写操作。
配置 Source Connector
在 CDCShell1 中,配置一个源连接器,从 CDCTutorial.Source
MongoDB 命名空间读取数据并写入 CDCTutorial.Source
Kafka 话题。
使用以下命令创建名为 cdc-source.json
的配置文件:
nano cdc-source.json
将以下配置信息粘贴到文件中,然后保存更改:
{ "name": "mongo-cdc-source", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Source" } }
在 CDCShell1 中运行以下命令,使用您创建的配置文件启动源连接器:
cx cdc-source.json
注意
cx
命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:
curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"
在 shell 中运行以下命令检查连接器的状态:
status
如果 Source 连接器成功启动,应会看到以下输出:
Kafka topics: ... The status of the connectors: source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-source" ] ...
配置 Sink Connector
在 CDCShell1 中,配置接收器连接器以将数据从 CDCTutorial.Source
Kafka 主题复制到 CDCTutorial.Destination
MongoDB 命名空间。
使用以下命令创建名为 cdc-sink.json
的配置文件:
nano cdc-sink.json
将以下配置信息粘贴到文件中,然后保存更改:
{ "name": "mongo-cdc-sink", "config": { "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", "topics": "CDCTutorial.Source", "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler", "connection.uri": "mongodb://mongo1", "database": "CDCTutorial", "collection": "Destination" } }
在 shell 中运行以下命令,使用您创建的配置文件启动 Sink Connector:
cx cdc-sink.json
在 shell 中运行以下命令检查连接器的状态:
status
如果 Sink Connector 成功启动,应会看到以下输出:
Kafka topics: ... The status of the connectors: sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector Currently configured connectors [ "mongo-cdc-sink" "mongo-cdc-source" ] ...
将数据写入源数据库并观察数据流
在 CDCShell2 中,通过运行以下命令,使用 mongosh
(MongoDB Shell) 连接到 MongoDB:
mongosh "mongodb://mongo1"
连接成功后,您应看到以下 MongoDB shell 提示:
rs0 [direct: primary] test>
在提示符处,键入以下命令以将新文档插入到CDCTutorial.Source
MongoDB 命名空间中:
use CDCTutorial db.Source.insertOne({ proclaim: "Hello World!" });
MongoDB 完成插入命令后,您应该会收到类似于以下文本的确认:
{ acknowledged: true, insertedId: ObjectId("600b38ad...") }
源连接器接收更改并将其发布到 Kafka 主题。您应该会在 CDCShell1 窗口中看到以下主题信息:
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8260..." }, "operationType": "insert", "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } }, "wallTime": { "$date": "..." }, "fullDocument": { "_id": { "$oid": "600b38ad..." }, "proclaim": "Hello World!" }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "600b38a..." } } } }
Sink 连接器接收 Kafka 消息并将数据汇入 MongoDB。您可以通过在 CDCShell2 中启动的 MongoDB shell 中运行以下命令,从 MongoDB 中的 CDCTutorial.Destination
命名空间检索文档:
db.Destination.find()
您应该会在结果中看到以下文档:
[ { _id: ObjectId("600b38a..."), proclaim: 'Hello World' } ]
(可选)生成其他更改
尝试通过从 MongoDB Shell 运行以下命令来从 CDCTutorial.Source
命名空间中删除文档:
db.Source.deleteMany({})
您应该会在 CDCShell1 窗口中看到以下主题信息:
{ "schema": { "type": "string", "optional": false }, "payload": { "_id": { "_data": "8261...." }, ... "operationType": "delete", "clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } }, "ns": { "db": "CDCTutorial", "coll": "Source" }, "documentKey": { "_id": { "$oid": "6138..." } } } }
运行以下命令以检索集合中的当前文档数:
db.Destination.count()
此操作会返回以下输出,表明集合为空:
0
运行以下命令退出 MongoDB shell:
exit
总结
在本教程中,您将设置一个源连接器来捕获对 MongoDB 集合的更改并将其发送到 Apache Kafka。您还使用 MongoDB CDC 处理程序配置了接收器连接器,以将数据从 Apache Kafka 移动到 MongoDB 集合。
了解详情
阅读以下资源,详细了解本教程中提到的概念: