Overview
このページでは、MongoDB Kafka シンク コネクタでポストプロセッサを構成する方法を学習できます。 ポストプロセッサは、コネクタが MongoDB コレクションに保存する前に、コネクタが Kafka トピックから読み取った Sink レコードを変更します。 ポストプロセッサが行うことができるデータ変更のいくつかの例には、次のようなものがあります。
- ドキュメント - _idフィールドをカスタム値に設定
- メッセージのキー フィールドまたは値フィールドを含める、または除外する 
- フィールドの名前を変更する 
コネクターに含まれる事前に構築されたポストプロセッサを使用するか、独自の実装を使用できます。
ポストプロセッサの詳細については、次のセクションを参照してください。
ポストプロセッサによるデータの変更方法
ポストプロセッサは、 Kafka トピックから読み取られたデータを変更します。 connectorは、 Kafka SinkRecordのキーと値のフィールドの表現を含むSinkDocumentクラスにメッセージを保存します。 コネクタは、構成で指定されたすべてのポストプロセッサを順番に適用し、その結果を MongoDB コレクションに保存します。
ポストプロセッサは、ドキュメント_idフィールドの生成、メッセージ キーまたは値 フィールドのプロジェクション、フィールドの名前変更など、データ変更タスクを実行します。コネクタに含まれる事前構築済みのポストプロセッサを使用することも、PostProcessorクラスを拡張して独自の実装を実装することもできます。
重要
ポストプロセッサと変更データキャプチャ(CDC)ハンドラー
CDC ハンドラーイベント データに ポストプロセッサ を適用することはできません。 両方を指定すると、コネクタは警告をログに記録します。
ポストプロセッサの指定方法
post.processor.chain構成設定では、1 つ以上のポストプロセッサをカンマ区切りのリストとして指定できます。 複数を指定すると、コネクタはそれらを順番に適用し、各ポストプロセッサは前のもののデータ出力を変更します。
connector が MongoDB に書き込むドキュメントに一意の_idフィールドが含まれるようにするには、DocumentIdAdder ポストプロセッサを含めない場合、チェーンの最初の位置にポストプロセッサが自動的に追加されます。
次の設定例では、コネクタが最初にKafkaMetaAdderポストプロセッサを実行し、次にAllowListValueProjectorポストプロセッサを出力で実行することを指定します。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector 
事前に構築されたポストプロセッサ
次の表には、Sink Connector に含まれるすべてのポストプロセッサのリストが含まれています。
| プロセッサ名の変更 | 説明 | |
|---|---|---|
| DocumentIdAdder | Full Path: Inserts an  _idfield determined by the configured strategy.The default strategy is  BsonOidStrategy.For information on strategy options and configuration, see the
Configure the Document Id Adder Post Processor
section. | |
| BlockListKeyProjector | Full Path: Removes matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
| BlockListValueProjector | Full Path: Removes matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
| AllowListKeyProjector | Full Path: Includes only matching key fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
| AllowListValueProjector | Full Path: Includes only matching value fields from the sink record. For more information on configuration, see the
Allow List and Block List Examples. | |
| KafkaMetaAdder | Full Path: Adds a field named "topic-partition-offset" and sets the value
to the concatenation of Kafka topic, partition, and offset to the
document. | |
| RenameByMapping | Full Path: Renames fields that are an exact match to a specified field name in
the key or value document. For information on configuration, see the
Renaming by Mapping Example. | |
| RenameByRegex | Full Path: Renames fields that match a regular expression in the key or
value document. For information on configuration, see the
Renaming by Regular Expression Example. | |
| NullFieldValueRemover | Full Path: Removes all document fields that contain  nullvalues from the sink record. | 
ドキュメント ID 追加用ポストプロセッサの構成
DocumentIdAdderポストプロセッサは、戦略を使用して、MongoDB ドキュメントの_idフィールドの形式方法を決定します。 戦略は、ユースケースに合わせてカスタマイズできる事前設定された動作を定義します。
次の例に示すように、 document.id.strategy設定でこのポストプロセッサの戦略を指定できます。
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy 
次の表は、 DocumentIdAdderポストプロセッサを構成するために使用できる戦略の一覧を示しています。
| 戦略名 | 説明 | |
|---|---|---|
| BsonOidStrategy | Full Path: Generates a MongoDB BSON ObjectId. Default strategy for the  DocumentIdAdderpost processor. | |
| KafkaMetaDataStrategy | Full Path: Builds a string composed of the concatenation of Kafka topic,
partition, and offset. | |
| FullKeyStrategy | Full Path: Uses the complete key structure of the sink document to generate the
value for the  _idfield.Defaults to a blank document if no key exists. | |
| ProvidedInKeyStrategy | Full Path: Uses the  _idfield specified in the key structure of the sink
document.Throws an exception if the field is missing from the sink document. | |
| ProvidedInValueStrategy | Full Path: Uses the  _idfield specified in the value structure of the
sink document.Throws an exception if the field is missing from the sink document. | |
| PartialKeyStrategy | Full Path: Uses a block list or allow list projection of the sink document key
structure. Defaults to a blank document if no key exists. | |
| PartialValueStrategy | Full Path: Uses a block list or allow list projection of the sink document
value structure. Defaults to a blank document if no value exists. | |
| UuidProvidedInKeyStrategy | Full Path: Converts the  _idkey field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
| UuidProvidedInValueStrategy | Full Path: Converts the  _idvalue field to a UUID. The value must be either a
string or binary type and must conform to the
UUID format. | |
| UuidStrategy | Full Path: Uses a randomly generated UUID in string format. | 
カスタムドキュメント ID 戦略の作成
組み込みのドキュメント ID 追加戦略でユースケースをカバーしない場合は、以下の手順に従ってカスタムのドキュメント ID 戦略を定義できます。
- インターフェース IdStrategy を実装し、カスタム構成ロジックを含むJavaクラスを作成します。 
- クラスを JAR ファイルにコンパイルします。 
- コンパイルされた JAR を、すべてのKafkaワーカーのクラスパスまたはプラグインパスに追加します。プラグインパスの詳細については、 Confluent のドキュメントを参照してください。 
- すべての Kafka ワーカーのカスタム クラスの完全なクラス名に - document.id.strategy設定を更新します。
注意
選択した戦略は提供セマンティクスに影響を与える可能性があります
BSON ObjectId または UUID 戦略では、コネクタが再試行またはレコードを再度処理するときに新しい ID を生成するため、少なくとも 1 回の配信のみを保証できます。 他の戦略では、ドキュメント ID を構成するフィールドが一意であることを保証できる場合は、1 回限りの配信が許可されます。
IdStrategy インターフェースの実装例については、 コネクタにパッケージ化された ID 戦略の実装 を含むソースコードディレクトリを参照してください。
ポストプロセッサの例
このセクションでは、次のタイプのポストプロセッサの構成例とサンプル出力を示します。
許可リストとブロック リストの例
許可リストとブロックリストのプロジェクションのポストプロセッサは、出力に含めるフィールドと除外するフィールドを決定します。
許可リストプロジェクションを使用する場合、ポストプロセッサは指定されたフィールドからのデータのみを出力します。
ブロックリストプロジェクションを使用する場合、指定されたフィールドのデータのみが後処理では省略されます。
注意
を使用できます。 レコード内のネストされたフィールドを参照するための(ドット)表記。 表記を使用して、配列内のドキュメントのフィールドを参照することもできます。
ポストプロセッサ チェーンにプロジェクションを追加する場合は、プロジェクションのタイプと、Sink ドキュメントのキーまたは値の部分にそれを適用するかどうかを指定する必要があります。
プロジェクションの構成と出力の例については、次のセクションを参照してください。
許可リストプロジェクションの例
Kafka レコード値ドキュメントが次のユーザー プロファイル データに似ているとします。
{   "name": "Sally Kimball",   "age": 10,   "address": {     "city": "Idaville",     "country": "USA"   },   "hobbies": [     "reading",     "solving crime"   ] } 
次の設定を使用して、値ドキュメントの「name」、「address. Atlas」、「趣味」フィールドなどの選択データを保存するようにAllowList値プロジェクションを構成できます。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=name,address.city,hobbies 
ポストプロセッサがプロジェクションを適用した後、次のレコードが出力されます。
{   "name": "Sally Kimball",   "address": {     "city": "Idaville"   },   "hobbies": [     "reading",     "solving crime"   ] } 
ブロック リスト プロジェクションの例
Kafka レコード キー ドキュメントが次のユーザー識別データに似ているとします。
{   "username": "user5983",   "registration": {     "date": "2021-09-13",     "source": "mobile"   },   "authToken": {     "alg": "HS256",     "type": "JWT",     "payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"   } } 
"authToken" と "registration.source" を省略するようにBlockListキー プロジェクションを構成できます フィールド(次の設定でデータを保存する前に)。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector key.projection.type=BlockList key.projection.list=authToken,registration.source 
ポストプロセッサがプロジェクションを適用した後、次のレコードが出力されます。
{   "username": "user5983",   "registration": {     "date": "2021-09-13",   } } 
プロジェクション ワイルドカード パターン マッチングの例
このセクションでは、フィールド名と一致するワイルドカード パターンを一致するようにプロジェクションのポストプロセッサを構成する方法を示します。
| パターン | 説明 | 
| 
 | 現在のレベル内の任意の文字数と一致します。 | 
| 
 | 現在のレベルおよびネストされたすべてのレベル内の任意の文字と一致します。 | 
このセクションの許可リストとブロックリストのワイルドカード パターン マッチングの例については、気象測定値を含む次の値ドキュメントを参照してください。
{   "city": "Springfield",   "temperature": {     "high": 28,     "low": 24,     "units": "C"   },   "wind_speed_10m": {     "average": 3,     "units": "km/h"   },   "wind_speed_80m": {     "average": 8,     "units": "km/h"   },   "soil_conditions": {     "temperature": {       "high": 22,       "low": 17,       "units": "C"     },     "moisture": {       "average": 340,       "units": "mm"     }   } } 
許可リスト ワイルドカードの例
複数のフィールド名を一致させるには、 *ワイルドカードを使用します。 次のサンプル構成では、次のフィールドが一致しています。
- 最上位のフィールドには「City」 
- "wind_step" という名前で始まる最上位フィールドのサブドキュメントである "average" という名前のフィールド。 
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=city,wind_speed*.average 
ポストプロセッサが許可リストプロジェクションを適用した後、次のレコードを出力します。
{   "city": "Springfield",   "wind_speed_10m": {     "average": 3,   },   "wind_speed_80m": {     "average": 8,   } } 
ワイルドカードを指定したレベルから任意のレベルのオブジェクトに一致する**ワイルドカードを使用できます。 次のワイルドカードに一致する例では、「low」という名前のフィールドを含む任意のドキュメントをプロジェクションします。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector value.projection.type=AllowList value.projection.list=**.low 
プロジェクションを適用する ポストプロセッサ は、次のレコードを出力します。
{   "temperature": {     "high": 28,     "low": 24,     "units": "C"   },   "soil_conditions": {     "temperature": {       "high": 22,       "low": 17,       "units": "C"     }   } } 
ブロック リスト ワイルドカードの例
次のブロックリスト構成例に示すように、ワイルドカード パターンを使用して、特定のドキュメント レベルのフィールドを一致させることができます。
post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector value.projection.type=BlockList value.projection.list=*.*.temperature 
{   "city": "Springfield",   "temperature": {     "high": 28,     "low": 24,     "units": "C"   },   "wind_speed_10m": {     "average": 3,     "units": "km/h"   },   "wind_speed_80m": {     "average": 8,     "units": "km/h"   },   "soil_conditions": {     "moisture": {       "average": 340,       "units": "mm"     }   } } 
フィールドの名前変更の例
このセクションでは、 RenameByMappingとRenameByRegexフィールド リネーム ポストプロセッサを構成して、シンク レコードのフィールド名を更新する方法を説明します。 フィールドの名前変更設定では、以下を指定します。
- レコード内のキーまたは値のドキュメントを更新するかどうか 
- 更新するフィールド名 
- 新しいフィールド名 
JSON 配列でRenameByMappingとRenameByRegexの設定を指定する必要があります。 ドット表記 または パターン一致 のいずれかを使用して、ネストされたフィールドを指定できます。
フィールドリネームのポストプロセッサの例では、次の例のシンク レコードを使用します。
キー ドキュメント
{   "location": "Provence",   "date_month": "October",   "date_day": 17 } 
値ドキュメント
{   "flapjacks": {     "purchased": 598,     "size": "large"   } } 
マッピング例による名前の変更
RenameByMappingポストプロセッサ 設定では、string に一致するフィールドを新しい名前に割り当てる 1 つ以上の JSON オブジェクトを指定します。 各オブジェクトには、以下の表に示すように、 oldName要素に一致するテキストと、 newName要素内の置換テキストが含まれています。
| キー名 | 説明 | 
|---|---|
| oldName | キーまたは値のドキュメントと、置き換えるフィールド名を一致させるかどうかを指定します。 設定では「」が使用されます。 文字を使用して、2 つの値を区切ります。 | 
| newName | フィールドのすべての一致の置換フィールド名を指定します。 | 
次のサンプル プロパティは、キー ドキュメントの「location」フィールドを照合し、その名前を「country」に変更します。
field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}] 
この設定は、 RenameByMappingポストプロセッサ に元のキー ドキュメントを次のドキュメントに変換するように指示します。
{   "country": "Provence",   "date_month": "October",   "date_day": 17 } 
次のように、 oldNameフィールドにフィールド名が追加された値のドキュメントを指定することで、値のドキュメントでも同様のフィールド名の割り当てを実行できます。
field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}] 
この設定は、 RenameByMappingポストプロセッサ に、元の値ドキュメントを次のドキュメントに変換するように指示します。
{   "crepes": {     "purchased": 598,     "size": "large"   } } 
次の設定に示すように、string 形式の JSON 配列を使用して、 field.renamer.mappingプロパティで 1 つ以上のマッピングを指定することもできます。
field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }] 
正規表現による名前の変更
RenameByRegexポストプロセッサ 設定では、一致する必要があるフィールド名とテキスト パターンと、一致したテキストの置換値を指定します。 次の表で説明されているフィールドを含む JSON オブジェクトに、1 つ以上の名前変更式を指定できます。
| キー名 | 説明 | 
|---|---|
| regexp | 置換を実行するフィールドに一致する正規表現が含まれます。 | 
| パターン | 置き換えるテキストに一致する正規表現を含みます。 | 
| replace | 
 | 
次の例の設定では、ポストプロセッサに次の処理を実行するように指示します。
- キー ドキュメント内の「date」で始まる任意のフィールド名と一致します。 一致するフィールドのセットで、パターン - _に一致するすべてのテキストを- -文字に置き換えます。
- crepesのサブドキュメントである、値ドキュメント内の任意のフィールド名と一致します。 一致するフィールドのセットで、パターン- purchasedに一致するすべてのテキストを- quantityに置き換えます。
field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}] 
コネクタがサンプル キー ドキュメントとサンプル 値ドキュメントに ポストプロセッサ を適用すると、次の内容を出力します。
キー ドキュメント
{   "location": "Provence",   "date-month": "October",   "date-day": 17 } 
値ドキュメント
{   "crepes": {     "quantity": 598,     "size": "large"   } } 
警告
リネーム後のプロセッサは既存のフィールド名を上書きしません
リネーム ポストプロセッサで に設定するターゲット フィールド名により、同じドキュメント内に重複するフィールド名が発生する可能性があります。 これを回避するために、ポストプロセッサは、ドキュメントの同じレベルで既存のフィールド名が重複する場合、名前の変更をスキップします。
カスタムポストプロセッサの作成方法
組み込みのポストプロセッサがユースケースをカバーしない場合は、次の手順を使用してカスタムのポストプロセッサクラスを作成できます。
- PostProcessor 抽象クラスを拡張するJavaクラスを作成します。 
- クラス内の - process()メソッドをオーバーライドします。 シンク レコードのキー フィールドと値フィールドの BSON 表現である- SinkDocumentを更新し、 メソッドで元の Kafka- SinkRecordにアクセスできます。
- クラスを JAR ファイルにコンパイルします。 
- コンパイルされた JAR を、すべてのKafkaワーカーのクラスパスまたはプラグインパスに追加します。プラグインパスの詳細については、Confluent のドキュメント「 Community Connector の手動インストール 」を参照してください。 
- ポストプロセッサの完全なクラス名を書き込みプロセッサ チェーン構成に追加します。 
ポストプロセッサの例については、組み込みのポストプロセッサ クラスのソースコードを参照できます。