Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/

使用变更数据捕获处理程序复制数据

在此页面上

  • Overview
  • 使用 CDC 处理器复制数据
  • 完成教程设置
  • 启动交互式 shell
  • 配置 Source Connector
  • 配置 Sink Connector
  • 监控“Kafka 主题”
  • 将数据写入源数据库并观察数据流
  • (可选)生成其他更改
  • 总结
  • 了解详情

跟随本教程学习如何使用变更数据捕获 (CDC) 处理程序通过 MongoDB Kafka Connector 复制数据。 CDC 处理程序是将 CDC 事件转换为 MongoDB 写入操作的应用程序。 当必须将一个数据存储中的更改复制到另一个数据存储中时,请使用 CDC 处理程序。

本教程中,您将配置并运行 MongoDB Kafka 源连接器和接收连接器,以便使用 CDC 使两个 MongoDB 集合包含相同的文档。源连接器将原始集合中的变更流数据写入 Kafka 主题,接收连接器将 Kafka 主题数据写入目标 MongoDB 集合。

如果您想详细了解 CDC 处理程序的工作原理,请参阅变更数据捕获处理程序指南。

1

完成 Kafka Connector 教程设置中的步骤,以启动 Confluent Kafka Connect 和 MongoDB 环境。

2

在不同窗口中的 Docker 容器上启动两个交互式 Shell。在本教程中,您可以使用 Shell 来运行和观察不同任务。

从终端运行以下命令,启动交互式 Shell。

docker exec -it mongo1 /bin/bash

在本教程中,我们将此交互式 shell 称为 CDCShell1

在第二个终端运行以下命令,启动交互式 Shell:

docker exec -it mongo1 /bin/bash

在本教程中,我们将把这个交互式 Shell 称为 CDCShell2

请将屏幕上的两个窗口排列好,以便同时查看实时更新。

使用 CDCShell1 配置连接器并监控 Kafka 主题。使用 CDCShell2 在 MongoDB 中执行写操作。

3

CDCShell1 中,配置一个源连接器,从 CDCTutorial.Source MongoDB 命名空间读取数据并写入 CDCTutorial.Source Kafka 话题。

使用以下命令创建名为 cdc-source.json 的配置文件:

nano cdc-source.json

将以下配置信息粘贴到文件中,然后保存更改:

{
"name": "mongo-cdc-source",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Source"
}
}

CDCShell1 中运行以下命令,使用您创建的配置文件启动源连接器:

cx cdc-source.json

注意

cx 命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:

curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"

在 shell 中运行以下命令检查连接器的状态:

status

如果 Source 连接器成功启动,应会看到以下输出:

Kafka topics:
...
The status of the connectors:
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-source"
]
...
4

CDCShell1 中,配置接收器连接器以将数据从 CDCTutorial.Source Kafka 主题复制到 CDCTutorial.Destination MongoDB 命名空间。

使用以下命令创建名为 cdc-sink.json 的配置文件:

nano cdc-sink.json

将以下配置信息粘贴到文件中,然后保存更改:

{
"name": "mongo-cdc-sink",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "CDCTutorial.Source",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
"connection.uri": "mongodb://mongo1",
"database": "CDCTutorial",
"collection": "Destination"
}
}

在 shell 中运行以下命令,使用您创建的配置文件启动 Sink Connector:

cx cdc-sink.json

在 shell 中运行以下命令检查连接器的状态:

status

如果 Sink Connector 成功启动,应会看到以下输出:

Kafka topics:
...
The status of the connectors:
sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-cdc-sink"
"mongo-cdc-source"
]
...
5

CDCShell1 中,监控 Kafka 主题是否有传入事件。运行以下命令启动 kafkacat 应用程序,将输出发布到主题的数据:

kc CDCTutorial.Source

注意

kc 命令是教程开发环境中包含的自定义脚本,调用 kafkacat 应用程序的选项来连接到 Kafka 并格式化指定主题的输出。

启动后,您将看到以下输出,显示当前没有数据可读取:

% Reached end of topic CDCTutorial.Source [0] at offset 0
6

CDCShell2 中,通过运行以下命令,使用 mongosh (MongoDB Shell) 连接到 MongoDB:

mongosh "mongodb://mongo1"

连接成功后,您应看到以下 MongoDB shell 提示:

rs0 [direct: primary] test>

在提示符处,键入以下命令以将新文档插入到CDCTutorial.Source MongoDB 命名空间中:

use CDCTutorial
db.Source.insertOne({ proclaim: "Hello World!" });

MongoDB 完成插入命令后,您应该会收到类似于以下文本的确认:

{
acknowledged: true,
insertedId: ObjectId("600b38ad...")
}

源连接器接收更改并将其发布到 Kafka 主题。您应该会在 CDCShell1 窗口中看到以下主题信息:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8260..." },
"operationType": "insert",
"clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
"wallTime": { "$date": "..." },
"fullDocument": {
"_id": { "$oid": "600b38ad..." },
"proclaim": "Hello World!"
},
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "600b38a..." } }
}
}

Sink 连接器接收 Kafka 消息并将数据汇入 MongoDB。您可以通过在 CDCShell2 中启动的 MongoDB shell 中运行以下命令,从 MongoDB 中的 CDCTutorial.Destination 命名空间检索文档:

db.Destination.find()

您应该会在结果中看到以下文档:

[
{
_id: ObjectId("600b38a..."),
proclaim: 'Hello World'
}
]
7

尝试通过从 MongoDB Shell 运行以下命令来从 CDCTutorial.Source 命名空间中删除文档:

db.Source.deleteMany({})

您应该会在 CDCShell1 窗口中看到以下主题信息:

{
"schema": { "type": "string", "optional": false },
"payload": {
"_id": { "_data": "8261...." },
...
"operationType": "delete",
"clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
"ns": { "db": "CDCTutorial", "coll": "Source" },
"documentKey": { "_id": { "$oid": "6138..." } }
}
}

运行以下命令以检索集合中的当前文档数:

db.Destination.count()

此操作会返回以下输出,表明集合为空:

0

运行以下命令退出 MongoDB shell:

exit

在本教程中,您将设置一个源连接器来捕获对 MongoDB 集合的更改并将其发送到 Apache Kafka。您还使用 MongoDB CDC 处理程序配置了接收器连接器,以将数据从 Apache Kafka 移动到 MongoDB 集合。

阅读以下资源,详细了解本教程中提到的概念:

后退

开始使用 MongoDB Kafka Sink Connector