Docs 菜单
Docs 主页
/
Kafka Connector
/ /

复制现有数据

这些用法示例演示了如何使用MongoDB Kafka源Connector将数据从MongoDB复制到Apache Kafka主题。

以下示例展示如何配置源Connector以从单个集合或多个集合复制现有数据。

假设您要将MongoDB集合复制到Apache Kafka并过滤某些数据。

要求和解决方案如下:

要求
解决方案

将 MongoDB 部署中的 shopping 数据库的 customers 集合复制到 Apache Kafka 主题上。

See the Copy Data section of this guide.

仅复制 country 字段中值为“Mexico”的文档。

See the Filter Data section of this guide.

customers 集合包含以下文档:

{
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
}
{
"_id": 2,
"country": "Iceland",
"purchases": 8,
"last_viewed": { "$date": "2015-07-20T10:00:00.135Z" }
}

通过在 Source 连接器中指定以下配置选项,复制 shopping 数据库的 customers 集合的内容:

database=shopping
collection=customers
startup.mode=copy_existing

源连接器通过创建用于描述将每个文档插入集合中的变更事件文档来复制集合。

注意

数据复制可以生成重复事件

如果任何系统在 Source 连接器从数据库转换现有数据时更改数据库中的数据,MongoDB 可能会生成重复的变更流事件以反映最新更改。由于数据复制所依赖的变更流事件是幂等的,因此复制的数据最终是一致的。

要了解有关变更事件文档的更多信息,请参阅变更流指南。

要了解有关 startup.mode 选项的更多信息,请参阅启动属性

您可以通过在源连接器配置的 startup.mode.copy.existing.pipeline 选项中指定聚合管道来过滤数据。以下配置指定一个聚合管道,与 country 字段中包含“Mexico”的所有文档相匹配:

startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

要了解有关 startup.mode.copy.existing.pipeline 选项的更多信息,请参阅启动属性

要了解有关聚合管道的更多信息,请参阅以下资源:

  • 自定义管道以筛选更改事件使用示例

  • MongoDB 手册中的聚合

复制 customers 集合的最终源连接器配置应该如下所示:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
collection=customers
startup.mode=copy_existing
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]

连接器复制数据后,您会看到与 shopping.customers Apache Kafka 主题中的上述示例集合相对应的以下变更事件文档:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"country": "Mexico",
"purchases": 2,
"last_viewed": { "$date": "2021-10-31T20:30:00.245Z" }
},
"ns": { "db": "shopping", "coll": "customers" }
}

注意

将主题中的数据写入集合

使用变更数据捕获处理程序将 Apache Kafka 主题中的变更事件文档转换为 MongoDB 写入操作。要了解更多信息,请参阅变更数据捕获处理程序指南。

假设您要从 shopping数据库中名为 products 的另一个集合复制数据,其中包含以下文档:

{
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
}

您可以使用 startup.mode.copy.existing.namespace.regex 配置设置从 customersproducts 集合中复制,如以下代码所示:

connector.class=com.mongodb.kafka.connect.MongoSourceConnector
connection.uri=<your production MongoDB connection uri>
database=shopping
startup.mode=copy_existing
startup.mode.copy.existing.namespace.regex=^shopping\.(customers|products)$

shopping.customers除了上一节所述的 Apache Kafka主题中的变更事件文档外,您还可以在shopping.products 主题中看到以下文档:

{
"_id": { "_id": 1, "copyingData": true },
"operationType": "insert",
"documentKey": { "_id": 1 },
"fullDocument": {
"_id": 1,
"item_name": "lipstick",
"department": "cosmetics",
"quantity": 45
},
"ns": { "db": "shopping", "coll": "products" }
}

提示

要学习;了解有关 startup.mode.copy.existing.namespace.regex 设置的更多信息,请参阅“启动属性”指南中的“设置”表

后退

主题命名

在此页面上