Docs 菜单
Docs 主页
/
MongoDB Kafka Connector

Kafka Connector 快速入门

在此页面上

  • 概述
  • 安装所需的软件包
  • 下载沙盒
  • 启动沙盒
  • 添加连接器
  • 添加源连接器
  • 添加接收器连接器
  • 通过 Connector 发送文档内容
  • 删除沙盒
  • 后续步骤

本指南介绍了如何配置 MongoDB Kafka Connector,以在 MongoDB 和 Apache Kafka 之间发送数据。

完成本指南后,您应该了解如何使用 Kafka Connect REST API 来配置 MongoDB Kafka Connector,以便从 MongoDB 读取数据并将其写入 Kafka 主题,以及从 Kafca 主题读取数据并将其写入 MongoDB。

要完成本指南中的步骤,您必须下载沙盒并在其中工作,沙盒是一种容器化开发环境,其中包含构建示例数据管道所需的服务。

阅读以下部分,设置沙盒和示例数据管道。

注意

完成本指南后,您可以按照 删除沙盒部分的说明删除环境。

下载并安装以下软件包:

提示

阅读 Docker 文档

本指南使用以下 Docker 特定的术语:

从 Docker 官方 入门指南了解有关 Docker 的更多信息。

沙箱使用 Docker 来实现便利性和一致性。 要了解有关 Apache Kafka 部署选项的更多信息,请参阅以下资源:

我们创建了一个沙箱,其中包含本教程中构建样本数据管道所需的服务。

要下载沙盒,请将教程存储库克隆到您的开发环境中。然后导航到快速入门教程对应的目录。如果使用 bash 或类似的 Shell,请使用以下命令:

git clone https://github.com/mongodb-university/kafka-edu.git
cd kafka-edu/docs-examples/mongodb-kafka-base/

沙箱在 Docker 容器中启动以下服务:

  • MongoDB,配置为副本集

  • Apache Kafka

  • 安装了 MongoDB Kafka Connector 的 Kafka Connect

  • 管理 Apache Kafka 配置的 Apache Zookeeper

要启动沙盒,请从教程目录运行以下命令:

docker-compose -p mongo-kafka up -d --force-recreate

在您启动沙箱时,Docker 下载它运行所需的任何映像。

注意

下载需要多长时间?

总的来说,本教程的 Docker 映像需要大约 2.4 GB 空间。以下列表显示了以不同网速下载这些映像所需的时间:

  • 每秒 40 兆比特:8 分钟

  • 20 兆位/秒:16 分钟

  • 10 兆位/秒:32 分钟

在 Docker 下载并构建这些映像后,您应该会在开发环境中看到以下输出:

...
Creating zookeeper ... done
Creating broker ... done
Creating schema-registry ... done
Creating connect ... done
Creating rest-proxy ... done
Creating mongo1 ... done
Creating mongo1-setup ... done

注意

端口映射

沙箱会将以下服务映射到主机上的端口:

  • 沙箱 MongoDB 服务器映射到主机上的 35001 端口

  • 沙箱 Kafka Connect JMX 服务器映射到主机上的端口 35000

这些端口必须空闲才能启动沙箱。

要完成样本数据管道,必须向 Kafka Connect 添加连接器,以在 Kafka Connect 和 MongoDB 之间传输数据。添加源连接器,将数据从 MongoDB 传输到 Apache Kafka。添加接收器连接器以将数据从 Apache Kafka 传输到 MongoDB。

要在沙箱中添加连接器,请首先使用以下命令在 Docker 容器中启动交互式 bash Shell:

docker exec -it mongo1 /bin/bash

在 Shell 会话启动后,您应该会看到以下提示:

MongoDB Kafka Connector Sandbox $

使用 Docker 容器中的 Shell 通过 Kafka Connect REST API 添加源连接器。

以下 API 请求会添加一个配置了以下属性的源连接器:

  • Kafka Connect 用于实例化连接器的类

  • 连接器从中读取数据的连接 URI、数据库和 MongoDB 副本集集合

  • 聚合管道,将值为 "MongoDB Kafka Connector" 的字段 travel 添加到 Connector 从 MongoDB 读取的插入文档

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-source",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"sampleData",
"pipeline":"[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
}
}
' \
http://connect:8083/connectors -w "\n"

注意

为什么我会看到“Failed to connect”(无法连接)消息?

Kafka Connect REST API 启动最多需要三分钟。如果收到以下错误,请等待三分钟,然后再次运行上述命令:

...
curl: (7) Failed to connect to connect port 8083: Connection refused

要确认添加了源连接器,请运行以下命令:

curl -X GET http://connect:8083/connectors

上述命令应输出正在运行的连接器的名称:

["mongo-source"]

要了解有关源连接器属性的更多信息,请参阅源连接器配置属性页面。

要了解有关聚合管道的更多信息,请参阅 MongoDB 手册中的聚合管道页面。

使用 Docker container 中的 shell 通过 Kafka Connect REST API 添加 sink connector。

以下 API 请求添加一个配置了以下属性的接收器连接器:

  • Kafka Connect 用于实例化连接器的类

  • 连接器写入数据的 MongoDB 副本集的连接 URI、数据库和集合

  • 该连接器从中读取数据的 Apache Kafka 主题

  • 用于 MongoDB 变更事件文档的变更数据捕获处理程序

curl -X POST \
-H "Content-Type: application/json" \
--data '
{"name": "mongo-sink",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongo1:27017/?replicaSet=rs0",
"database":"quickstart",
"collection":"topicData",
"topics":"quickstart.sampleData",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
}
}
' \
http://connect:8083/connectors -w "\n"

要确认已添加 Source Connector 和 Sink Connector,请运行以下命令:

curl -X GET http://connect:8083/connectors

上述命令应输出正在运行的连接器的名称:

["mongo-source", "mongo-sink"]

要了解有关接收器连接器属性的更多信息,请参阅接收器连接器配置属性页面。

要了解有关变更数据捕获事件的更多信息,请参阅变更数据捕获处理指南。

要通过连接器发送文档内容,请将文档插入到源连接器从中读取数据的 MongoDB 集合中。

要将新文档插入到集合中,请使用以下命令从 Docker 容器中的 Shell 进入 MongoDB Shell:

mongosh mongodb://mongo1:27017/?replicaSet=rs0

运行前面的命令后,应该会看到以下提示:

rs0 [primary] test>

在 MongoDB Shell 中,使用以下命令,将文档插入 quickstart 数据库的 sampleData 集合:

use quickstart
db.sampleData.insertOne({"hello":"world"})

在将文档插入到 sampleData 集合后,请确认连接器已处理该更改。使用以下命令检查 topicData 集合内容:

db.topicData.find()

您应该会看到如下所示的输出:

[
{
_id: ObjectId(...),
hello: 'world',
travel: 'MongoDB Kafka Connector'
}
]

运行以下命令以退出 MongoDB Shell:

exit

要节省开发环境中的资源,请删除沙盒。

在删除沙箱之前,运行以下命令以退出 Docker 容器中的 Shell 会话:

exit

您可以选择同时删除 Docker 容器和映像,或仅删除容器。如果删除容器和映像,则必须重新下载它们才能重新启动沙盒,其大小约为 2.4 GB。如果您仅删除容器,则可以重复使用图像,并避免下载样本数据管道中的大多数大文件。

选择与要运行的删除任务对应的选项卡。

运行以下 shell 命令以从沙箱中删除 Docker 容器和映像:

docker-compose -p mongo-kafka down --rmi all

运行以下 shell 命令以删除 Docker 容器,但保留沙盒的映像:

docker-compose -p mongo-kafka down

要了解如何安装 MongoDB Kafka Connector,请参阅安装 MongoDB Kafka Connector 指南。

要了解有关如何处理数据并将数据从 Apache Kafka 转移到 MongoDB 的更多信息,请参阅 Sink Connector 指南。

要了解有关如何处理数据并将数据从 MongoDB 移动到 Apache Kafka 的更多信息,请参阅源连接器指南。

后退

新增功能

来年

简介