Docs 菜单
Docs 主页
/
MongoDB Kafka Connector
/

将现有集合迁移到时间序列集合

在此页面上

  • 将集合迁移到时间序列集合
  • 总结
  • 了解详情

按照本教程学习如何使用 MongoDB Kafka Connector 将现有 MongoDB 集合转换为时间序列集合

时间序列集合可有效存储时间序列数据。 时间序列数据由按时间间隔进行的测量、描述测量的元数据以及测量时间组成。

要使用 将数据从MongoDB connector集合转换为时间序列集合,您必须执行以下任务:

  1. 标识collection中所有文档共有的时间字段。

  2. 配置connector以将现有collection数据复制到 Kafka 主题。

  3. 配置connector以将 Kafka 主题数据复制到time-series collection。

在本教程中,您将执行上述任务,将股票数据从集合迁移到时间序列集合。 time-series collection 更有效地存储和索引数据,并保留使用聚合操作符分析一段时间内股票表现的能力。

1

完成 Kafka Connector 教程设置中的步骤,启动 Confluence Kafka Connect 和 MongoDB 环境。

2

运行以下命令,在 Docker 环境中启动脚本,生成样本collection,其中包含教程 MongoDB 副本集中的虚假股票代码及其价格:

docker exec -ti mongo1 /bin/bash -c "cd /stockgenmongo/ && python3 stockgen.py -db Stocks -col PriceData"

数据生成器开始运行后,您应该会看到生成的数据如下所示:

...
1 _id=528e9... MSP MASSIVE SUBMARINE PARTNERS traded at 31.08 2022-05-25 21:15:15
2 _id=528e9... RWH RESPONSIVE_WHOLESALER HOLDINGS traded at 18.42 2022-05-25 21:15:15
3 _id=528e9... FAV FUZZY ATTACK VENTURES traded at 31.08 2022-05-25 21:15:15
...
3

在另一个窗口中,使用以下命令在为教程设置下载的教程 Docker container 上创建交互式 shell 会话:

docker exec -it mongo1 /bin/bash

使用以下命令创建名为 stock-source.json 的源配置文件:

nano stock-source.json

将以下配置信息粘贴到文件中,然后保存更改:

{
"name": "mongo-source-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"publish.full.document.only": "true",
"connection.uri": "mongodb://mongo1",
"topic.prefix": "marketdata",
"database": "Stocks",
"collection": "PriceData",
"copy.existing": "true"
}
}

此配置指示connector将现有数据从PriceData MongoDB collection复制到marketdata.Stocks.PriceData Kafka 主题,并在完成后将任何未来数据插入该collection中。

在 shell 中运行以下命令,使用您创建的配置文件启动源连接器:

cx stock-source.json

注意

cx 命令是教程开发环境中包含的自定义脚本。此脚本对 Kafka Connect REST API 运行以下等效请求以创建新连接器:

curl -X POST -H "Content-Type: application/json" -d @stock-source.json http://connect:8083/connectors -w "\n"

在 shell 中运行以下命令检查连接器的状态:

status

如果 Source 连接器成功启动,应会看到以下输出:

Kafka topics:
...
The status of the connectors:
source | mongo-source-marketdata | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
Currently configured connectors
[
"mongo-source-marketdata"
]
...

源连接器启动后,运行以下命令以确认 Kafka 主题已收到集合数据:

kafkacat -b broker:29092 -C -t marketdata.Stocks.PriceData

输出应显示源连接器发布的主题数据,如下所示:

{"schema":{ ... }, "payload": "{ "_id": { "$oid": "628e9..."}, "company_symbol": "MSP", "Company_name": "MASSIVE SUBMARINE PARTNERS", "price": 309.98, "tx_time": { "$date": 16535..." }"}

您可以通过键入 CTRL+C退出kafkacat }。

4

配置connector以从Kafka主题读取数据,并将其写入名为Stocks的数据库中名为StockDataMigrate的时间序列集合。

使用以下命令创建名为stock-sink.json的接收器配置文件:

nano stock-sink.json

将以下配置信息粘贴到文件中,然后保存更改:

{
"name": "mongo-sink-marketdata",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "marketdata.Stocks.PriceData",
"connection.uri": "mongodb://mongo1",
"database": "Stocks",
"collection": "StockDataMigrate",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"timeseries.timefield": "tx_time",
"timeseries.timefield.auto.convert": "true",
"timeseries.timefield.auto.convert.date.format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
}
}

提示

上面的connector配置使用时间字段日期格式转换器。或者,您可以使用TimestampConverter单一消息转换 (SMT) 将tx_time字段从String转换为ISODate 。 使用TimestampConverter SMT 时,您必须为 Kafka 主题中的数据定义模式。

有关如何使用TimestampConverter SMT 的信息,请参阅 TimestampConverter Confluent 文档。

在 shell 中运行以下命令,使用更新的配置文件启动 sink connector:

cx stock-sink.json

接收器connector完成对主题数据的处理后,StockDataMigrate time-series collection中的文档将包含具有tx_time ISODate类型值的field。

5

接收器连接器完成对主题数据的处理后, StockDataMigrate时间序列集合应包含PriceData集合中的所有市场数据。

要查看 MongoDB 中的数据,请运行以下命令以使用mongosh连接到副本集:

mongosh "mongodb://mongo1"

在提示符处,键入以下命令以检索 Stocks.StockDataMigrate MongoDB 命名空间中的所有文档:

use Stocks
db.StockDataMigrate.find()

您应该会看到该命令返回的文档列表,类似于以下文档:

{
tx_time: ISODate("2022-05-25T21:16:35.983Z"),
_id: ObjectId("628e9..."),
symbol: 'FAV',
price: 18.43,
company_name: 'FUZZY ATTACK VENTURES'
}

在本教程中,您创建了一个股票报价机数据生成器,用于定期将数据写入 MongoDB collection。您配置了源连接器以将数据复制到 Kafka 主题中,并配置了接收器连接器以将该数据写入新的 MongoDB 时间序列集合。

阅读以下资源,详细了解本教程中提到的概念:

后退

使用变更数据捕获处理程序复制数据