このガイドを使用して、コミュニティが作成した Kafka Connect MongoDB Sink コネクタから 公式のMongoDB Kafka コネクタ にKafka配置を移行します。
次のセクションでは、MongoDB Kafka Sink Connector に移行するために Kafka Connect Sink Connector の構成設定とカスタム クラスに加える必要がある変更を一覧にしています。
注意
Atlas Stream Processing は、ストリーミングデータを継続的に処理し、スキーマを検証し、ビューを AtlasデータベースコレクションまたはApache Kafkaトピックに具体化するためのMongoDBネイティブ ツールを提供します。
Atlas Stream Processing の詳細については、「 Atlas Stream Processing 」を参照してください。
構成設定の更新
Kafka Connect 配置の構成設定を次のように変更してから、 MongoDB Kafka Connector配置で使用します。
パッケージ
at.grahsl.kafka.connect.mongodbを含む値をパッケージcom.mongodb.kafka.connectで置き換えます。connector.classの設定を MongoDB Kafka シンク コネクタ クラスに置き換えます。connector.class=com.mongodb.kafka.connect.MongoSinkConnector Kafka Connect プロパティ名から
mongodb.プレフィックスを削除します。 たとえば、mongodb.connection.uriをconnection.uriに変更します。document.id.strategies設定が存在する場合は、削除します。 この設定の値がカスタム戦略を参照する場合は、document.id.strategy設定に移動します。 カスタム クラスに加える必要がある変更については、「カスタム クラスのアップデート」セクションをお読みください。コネクタの Kafka トピック構成トピック プロパティで、
mongodb.collectionプレフィックスを含むトピックごとまたはコレクションごとのオーバーライドを指定するために使用するプロパティ名を同等のキーに置き換えます。
カスタム クラスの更新
Kafka Connect Sink Connector の配置でカスタム クラスを使用する場合は、MongoDB Kafka Connector の配置に追加する前に、それらに次の変更を加えてください。
at.grahsl.kafka.connect.mongodbを含むインポートをcom.mongodb.kafka.connectに置き換えます。MongoDbSinkConnectorクラスへの参照をMongoSinkConnectorクラスに置き換えます。カスタム シンク コネクタ戦略クラスを更新して
com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategyインターフェースを実装します。MongoDbSinkConnectorConfigクラスへの参照を更新します。 MongoDB Kafka Connectorでは、そのクラスのロジックは次のクラスに分割されます。
ポストプロセッサ サブクラスの更新
Kafka Connectコネクタの配置にポストプロセッサをサブクラスするクラスがある場合は、 Kafka ConnectPostProcessor クラス内のものをオーバーライドするメソッドを、 MongoDB Kafka コネクタ PostProcessorクラスのメソッド署名と一致するように更新します。