复制现有数据
这些用法示例演示了如何使用MongoDB Kafka源Connector将数据从MongoDB复制到Apache Kafka主题。
示例
以下示例展示如何配置源Connector以从单个集合或多个集合复制现有数据。
复制和筛选集合数据
假设您要将MongoDB集合复制到Apache Kafka并过滤某些数据。
要求和解决方案如下:
要求 | 解决方案 |
---|---|
将 MongoDB 部署中的 | See the Copy Data section of this guide. |
仅复制 | See the Filter Data section of this guide. |
customers
集合包含以下文档:
{ "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } } { "_id": 2, "country": "Iceland", "purchases": 8, "last_viewed": { "$date": "2015-07-20T10:00:00.135Z" } }
复制数据
通过在 Source 连接器中指定以下配置选项,复制 shopping
数据库的 customers
集合的内容:
database=shopping collection=customers startup.mode=copy_existing
源连接器通过创建用于描述将每个文档插入集合中的变更事件文档来复制集合。
注意
数据复制可以生成重复事件
如果任何系统在 Source 连接器从数据库转换现有数据时更改数据库中的数据,MongoDB 可能会生成重复的变更流事件以反映最新更改。由于数据复制所依赖的变更流事件是幂等的,因此复制的数据最终是一致的。
要了解有关变更事件文档的更多信息,请参阅变更流指南。
要了解有关 startup.mode
选项的更多信息,请参阅启动属性。
过滤数据
您可以通过在源连接器配置的 startup.mode.copy.existing.pipeline
选项中指定聚合管道来过滤数据。以下配置指定一个聚合管道,与 country
字段中包含“Mexico”的所有文档相匹配:
startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
要了解有关 startup.mode.copy.existing.pipeline
选项的更多信息,请参阅启动属性。
要了解有关聚合管道的更多信息,请参阅以下资源:
自定义管道以筛选更改事件使用示例
MongoDB 手册中的聚合。
指定配置
复制 customers
集合的最终源连接器配置应该如下所示:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping collection=customers startup.mode=copy_existing startup.mode.copy.existing.pipeline=[{ "$match": { "country": "Mexico" } }]
连接器复制数据后,您会看到与 shopping.customers
Apache Kafka 主题中的上述示例集合相对应的以下变更事件文档:
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "country": "Mexico", "purchases": 2, "last_viewed": { "$date": "2021-10-31T20:30:00.245Z" } }, "ns": { "db": "shopping", "coll": "customers" } }
从多个源复制数据
假设您要从 shopping
数据库中名为 products
的另一个集合复制数据,其中包含以下文档:
{ "_id": 1, "item_name": "lipstick", "department": "cosmetics", "quantity": 45 }
您可以使用 startup.mode.copy.existing.namespace.regex
配置设置从 customers
和 products
集合中复制,如以下代码所示:
connector.class=com.mongodb.kafka.connect.MongoSourceConnector connection.uri=<your production MongoDB connection uri> database=shopping startup.mode=copy_existing startup.mode.copy.existing.namespace.regex=^shopping\.(customers|products)$
shopping.customers
除了上一节所述的 Apache Kafka主题中的变更事件文档外,您还可以在shopping.products
主题中看到以下文档:
{ "_id": { "_id": 1, "copyingData": true }, "operationType": "insert", "documentKey": { "_id": 1 }, "fullDocument": { "_id": 1, "item_name": "lipstick", "department": "cosmetics", "quantity": 45 }, "ns": { "db": "shopping", "coll": "products" } }
提示
要学习;了解有关 startup.mode.copy.existing.namespace.regex
设置的更多信息,请参阅“启动属性”指南中的“设置”表。