从 Kafka Connect 迁移 MongoDB
使用本指南从社区创建的Kafka KafkaConnect 迁移您的MongoDB 部署 Sinkconnector 到 官方MongoDBKafka Connector 。
以下部分列出了必须对 Kafka Connect connector 配置设置和自定义类进行的更改,以转换到 MongoDB Kafka connector。
更新配置设置
对 Kafka Connect 部署的配置设置进行以下更改,然后再将其用于 MongoDB Kafka Connector 部署:
将包含包
at.grahsl.kafka.connect.mongodb
的值替换为com.mongodb.kafka.connect
包。将
connector.class
设置替换为 MongoDB Kafka connector 类。connector.class=com.mongodb.kafka.connect.MongoSinkConnector 从 Kafka Connect 属性名称中删除
mongodb.
前缀。 例如,将mongodb.connection.uri
更改为connection.uri
。删除
document.id.strategies
设置(如果存在)。 如果此设置的值引用自定义策略,请将其移至document.id.strategy
设置。 阅读“更新自定义类”部分,了解必须对自定义类进行哪些更改。将用于指定按主题或集合覆盖的任何包含
mongodb.collection
前缀的属性名称替换为接收器连接器 Kafka 主题配置主题属性中的等效键。
更新自定义类
如果在 Kafka Connect Sink Connector 部署中使用任何自定义类,请在将它们添加到 MongoDB Kafka Connector 部署之前对它们进行以下更改:
将包含
at.grahsl.kafka.connect.mongodb
的导入替换为com.mongodb.kafka.connect
。将对
MongoDbSinkConnector
类的引用替换为MongoSinkConnector
类。更新自定义sink connector策略类以实施
com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy
接口。更新对
MongoDbSinkConnectorConfig
类的引用。 在 MongoDB Kafka Connector 中,该类的逻辑被分割为以下类:
更新帖子处理器子类
如果您的类在Kafka Connectconnector 部署中对后处理器进行了子类化,请更新覆盖Kafka ConnectPostProcessor
类中方法的方法,以匹配MongoDBKafka Connector PostProcessor 类的方法签名。