此用法示例演示了如何配置管道以自定义 MongoDB Kafka connector 使用的数据。管道是 MongoDB 聚合管道,由发送给数据库的用于过滤或转换数据的指令组成。
MongoDB会通知Connector与变更流上的聚合管道相匹配的数据变更。 变更流是描述客户端实时MongoDB 部署所做的数据更改的一系列事件。 有关更多信息,请参阅MongoDB服务器手册中有关 Change Streams的条目。
例子
假设您正在协调一项事件,并希望收集参加特定事件的每位来宾的姓名和到达时间。 每当访客签入事件时,应用程序都会插入一份包含以下详细信息的新文档:
{   "_id": ObjectId(...),   "eventId": 321,   "name": "Dorothy Gale",   "arrivalTime": 2021-10-31T20:30:00.245Z } 
您可以定义您的connector pipeline设置,以指示change stream筛选事件信息,如下所示:
- 为插入操作创建变更事件,并为所有其他类型的操作创建省略事件。 
- 仅为与 - fullDocument.eventId值“321”匹配的文档创建更改事件,并忽略所有其他文档。
- 使用投影省略 - fullDocument对象中的- _id和- eventId字段。
要应用这些转换,请将以下聚合管道分配给您的pipeline设置:
pipeline=[{"$match": { "$and": [{"operationType": "insert"}, { "fullDocument.eventId": 321 }] } }, {"$project": { "fullDocument._id": 0, "fullDocument.eventId": 0 } } ] 
重要
确保管道结果包含payload对象的顶级_id字段,MongoDB 将其用作恢复令牌的值。
当应用程序插入样本文档时,配置的connector会将以下记录发布到你的Kafka主题:
{   ...   "payload": {     _id: { _data: ... },     "operationType": "insert",     "fullDocument": {       "name": "Dorothy Gale",       "arrivalTime": "2021-10-31T20:30:00.245Z",     },     "ns": { ... },     "documentKey": {       _id: {"$oid": ... }     }   } } 
有关使用源Connector管理变更流的更多信息,请参阅有关Change Streams 的Connector文档。