AI エージェント向け: ドキュメントインデックスは https://www.mongodb.com/ja-jp/docs/llms.txt で利用できます。すべてのページの markdown バージョンは、いずれかの URL パスに .md を追加することで利用できます。
Docs Menu

Sink Connector ポストプロセッサ

このページでは、MongoDB Kafka シンク コネクタでポストプロセッサを構成する方法を学習できます。 ポストプロセッサは、コネクタが MongoDB コレクションに保存する前に、コネクタが Kafka トピックから読み取った Sink レコードを変更します。 ポストプロセッサが行うことができるデータ変更のいくつかの例には、次のようなものがあります。

  • ドキュメント_idフィールドをカスタム値に設定

  • メッセージのキー フィールドまたは値フィールドを含める、または除外する

  • フィールドの名前を変更する

コネクターに含まれる事前に構築されたポストプロセッサを使用するか、独自の実装を使用できます。

ポストプロセッサの詳細については、次のセクションを参照してください。

ポストプロセッサは、 Kafka トピックから読み取られたデータを変更します。 connectorは、 Kafka SinkRecordのキーと値のフィールドの表現を含むSinkDocumentクラスにメッセージを保存します。 コネクタは、構成で指定されたすべてのポストプロセッサを順番に適用し、その結果を MongoDB コレクションに保存します。

ポストプロセッサは、ドキュメント_idフィールドの生成、メッセージ キーまたは値 フィールドのプロジェクション、フィールドの名前変更など、データ変更タスクを実行します。コネクタに含まれる事前構築済みのポストプロセッサを使用することも、PostProcessorクラスを拡張して独自の を実装することもできます。

重要

ポストプロセッサと変更データキャプチャ(CDC)ハンドラー

CDC ハンドラーのイベント データに ポストプロセッサ を適用することはできません。 両方を指定すると、コネクタは警告をログに記録します。

post.processor.chain構成設定では、1 つ以上のポストプロセッサをカンマ区切りのリストとして指定できます。 複数を指定すると、コネクタはそれらを順番に適用し、各ポストプロセッサは前のもののデータ出力を変更します。

connector が MongoDB に書き込むドキュメントに一意の_idフィールドが含まれるようにするには、DocumentIdAdder ポストプロセッサを含めない場合、チェーンの最初の位置にポストプロセッサが自動的に追加されます。

次の設定例では、コネクタが最初にKafkaMetaAdderポストプロセッサを実行し、次にAllowListValueProjectorポストプロセッサを出力で実行することを指定します。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder,com.mongodb.kafka.connect.sink.processor.AllowListValueProjector

次の表には、Sink Connector に含まれるすべてのポストプロセッサのリストが含まれています。

プロセッサ名の変更
説明

DocumentIdAdder

フルパス:

com.mongodb.kafka.connect.sink.processor.DocumentIdAdder

_id構成された戦略によって決定された フィールドを挿入します。デフォルトの戦略は です。
BsonOidStrategy

戦略オプションと構成の詳細については、「 ドキュメントID追加 ポストプロセッサの構成 」セクションを参照してください。

BlockListKeyProjector

フルパス:

com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector

シンク レコードから一致するキー フィールドを除く。
構成の詳細については、「許可リストとブロック リストの例」を参照してください。

BlockListValueProjector

フルパス:

com.mongodb.kafka.connect.sink.processor.BlockListValueProjector

シンク レコードから一致する値フィールドを削除します。
構成の詳細については、「許可リストとブロック リストの例」を参照してください。

AllowListKeyProjector

フルパス:

com.mongodb.kafka.connect.sink.processor.AllowListKeyProjector

シンク レコードから一致するキー フィールドのみを含めます。
構成の詳細については、「許可リストとブロック リストの例」を参照してください。

AllowListValueProjector

フルパス:

com.mongodb.kafka.connect.sink.processor.AllowListValueProjector``

シンク レコードからの一致する値フィールドのみを含めます。
構成の詳細については、「許可リストとブロック リストの例」を参照してください。

KafkaMetaAdder

フルパス:

com.mongodb.kafka.connect.sink.processor.KafkaMetaAdder

「topi- partition-opset」という名前のフィールドを追加し、 の値を Kafka トピック、パーティション、オフセットをドキュメントに連結したものに設定します。

RenameByMapping

フルパス:

com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByMapping

キーまたは値ドキュメント内の指定されたフィールド名と完全に一致するフィールドの名前を変更します。
構成については、「マッピングによる名前変更の例」を参照してください。

RenameByRegex

フルパス:

com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex

キーまたは値ドキュメント内の正規表現に一致するフィールドの名前を変更します。
設定については、正規式による名前変更の例を参照してください。

NullFieldValueRemover

フルパス:

com.mongodb.kafka.connect.sink.processor.NullFieldValueRemover``

null 値を含むすべてのドキュメントフィールドを Sinkレコードから削除します。

DocumentIdAdderポストプロセッサは、戦略を使用して、MongoDB ドキュメントの_idフィールドの形式方法を決定します。 戦略は、ユースケースに合わせてカスタマイズできる事前設定された動作を定義します。

次の例に示すように、 document.id.strategy設定でこのポストプロセッサの戦略を指定できます。

document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy

次の表は、 DocumentIdAdderポストプロセッサを構成するために使用できる戦略の一覧を示しています。

戦略名
説明

BsonOidStrategy

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy

MongoDB BSON ObjectId を生成します。
DocumentIdAdderポストプロセッサのデフォルト戦略。

KafkaMetaDataStrategy

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.KafkaMetaDataStrategy

stringKafkaトピック、パーティション、オフセットを連結した を構築します。

フルキー戦略

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy

Sinkドキュメントの完全なキー構造を使用して、_id
フィールドの値を生成します。キーが存在しない場合は、デフォルトは空白のドキュメントになります。

ProvidedInKeyStratey

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy

_idSinkドキュメントのキー構造で指定された フィールドを使用します。
Sinkドキュメントにフィールドがない場合は、例外がスローされます。

ProvidedInValueStratey

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy

_idSinkドキュメントの値構造で指定された フィールドを使用します。
Sinkドキュメントにフィールドがない場合は、例外がスローされます。

PartialKeyStratey

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.PartialKeyStrategy


Sinkドキュメントキー構造のブロックリストまたは許可リストプロジェクションを使用します。キーが存在しない場合は、デフォルトは空白のドキュメントになります。

PartialValueStratey

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy


Sinkドキュメント値構造のブロックリストまたは許可リストプロジェクションを使用します。値が存在しない場合は、デフォルトは空白のドキュメントになります。

UuidProvidedInKeyStratey

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInKeyStrategy

_idキーフィールドを UUID に変換します。値は string またはバイナリ タイプのいずれかで、 UUID形式に準拠している必要があります。

UuidProvidedInValueStratey

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.UuidInValueStrategy

_id値フィールドを UUID に変換します。値は string またはバイナリ タイプのいずれかで、 UUID形式に準拠している必要があります。

UuidStrategy

フルパス:

com.mongodb.kafka.connect.sink.processor.id.strategy.UuidStrategy``

ランダムに生成された UUID を string 形式で使用します。

組み込みのドキュメント ID 追加戦略でユースケースをカバーしない場合は、以下の手順に従ってカスタムのドキュメント ID 戦略を定義できます。

  1. インターフェース IdStrategy を実装し、カスタム構成ロジックを含むJavaクラスを作成します。

  2. クラスを JAR ファイルにコンパイルします。

  3. コンパイルされた JAR を、すべてのKafkaワーカーのクラスパスまたはプラグインパスに追加します。プラグインパスの詳細については、 Confluent のドキュメントを参照してください。

  4. すべての Kafka ワーカーのカスタム クラスの完全なクラス名にdocument.id.strategy設定を更新します。

注意

選択した戦略は提供セマンティクスに影響を与える可能性があります

BSON ObjectId または UUID 戦略では、コネクタが再試行またはレコードを再度処理するときに新しい ID を生成するため、少なくとも 1 回の配信のみを保証できます。 他の戦略では、ドキュメント ID を構成するフィールドが一意であることを保証できる場合は、1 回限りの配信が許可されます。

IdStrategy インターフェースの実装例については、 コネクタにパッケージ化された ID 戦略の実装 を含むソースコードディレクトリを参照してください。

このセクションでは、次のタイプのポストプロセッサの構成例とサンプル出力を示します。

許可リストブロックリストのプロジェクションのポストプロセッサは、出力に含めるフィールドと除外するフィールドを決定します。

許可リストプロジェクションを使用する場合、ポストプロセッサは指定されたフィールドからのデータのみを出力します。

ブロックリストプロジェクションを使用する場合、指定されたフィールドのデータのみが後処理では省略されます。

注意

を使用できます。 レコード内のネストされたフィールドを参照するための(ドット)表記。 表記を使用して、配列内のドキュメントのフィールドを参照することもできます。

ポストプロセッサ チェーンにプロジェクションを追加する場合は、プロジェクションのタイプと、Sink ドキュメントのキーまたは値の部分にそれを適用するかどうかを指定する必要があります。

プロジェクションの構成と出力の例については、次のセクションを参照してください。

Kafka レコード値ドキュメントが次のユーザー プロファイル データに似ているとします。

{
"name": "Sally Kimball",
"age": 10,
"address": {
"city": "Idaville",
"country": "USA"
},
"hobbies": [
"reading",
"solving crime"
]
}

次の設定を使用して、値ドキュメントの「name」、「address. Atlas」、「趣味」フィールドなどの選択データを保存するようにAllowList値プロジェクションを構成できます。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=name,address.city,hobbies

ポストプロセッサがプロジェクションを適用した後、次のレコードが出力されます。

{
"name": "Sally Kimball",
"address": {
"city": "Idaville"
},
"hobbies": [
"reading",
"solving crime"
]
}

Kafka レコード キー ドキュメントが次のユーザー識別データに似ているとします。

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
"source": "mobile"
},
"authToken": {
"alg": "HS256",
"type": "JWT",
"payload": "zI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODk"
}
}

"authToken" と "registration.source" を省略するようにBlockListキー プロジェクションを構成できます フィールド(次の設定でデータを保存する前に)。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListKeyProjector
key.projection.type=BlockList
key.projection.list=authToken,registration.source

ポストプロセッサがプロジェクションを適用した後、次のレコードが出力されます。

{
"username": "user5983",
"registration": {
"date": "2021-09-13",
}
}

このセクションでは、フィールド名と一致するワイルドカード パターンを一致するようにプロジェクションのポストプロセッサを構成する方法を示します。

パターン

説明

*

現在のレベル内の任意の文字数と一致します。

**

現在のレベルおよびネストされたすべてのレベル内の任意の文字と一致します。

このセクションの許可リストとブロックリストのワイルドカード パターン マッチングの例については、気象測定値を含む次の値ドキュメントを参照してください。

{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
},
"moisture": {
"average": 340,
"units": "mm"
}
}
}

複数のフィールド名を一致させるには、 *ワイルドカードを使用します。 次のサンプル構成では、次のフィールドが一致しています。

  • 最上位のフィールドには「City」

  • "wind_step" という名前で始まる最上位フィールドのサブドキュメントである "average" という名前のフィールド。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=city,wind_speed*.average

ポストプロセッサが許可リストプロジェクションを適用した後、次のレコードを出力します。

{
"city": "Springfield",
"wind_speed_10m": {
"average": 3,
},
"wind_speed_80m": {
"average": 8,
}
}

ワイルドカードを指定したレベルから任意のレベルのオブジェクトに一致する**ワイルドカードを使用できます。 次のワイルドカードに一致する例では、「low」という名前のフィールドを含む任意のドキュメントをプロジェクションします。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=**.low

プロジェクションを適用する ポストプロセッサ は、次のレコードを出力します。

{
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"soil_conditions": {
"temperature": {
"high": 22,
"low": 17,
"units": "C"
}
}
}

次のブロックリスト構成例に示すように、ワイルドカード パターンを使用して、特定のドキュメント レベルのフィールドを一致させることができます。

post.processor.chain=com.mongodb.kafka.connect.sink.processor.BlockListValueProjector
value.projection.type=BlockList
value.projection.list=*.*.temperature
{
"city": "Springfield",
"temperature": {
"high": 28,
"low": 24,
"units": "C"
},
"wind_speed_10m": {
"average": 3,
"units": "km/h"
},
"wind_speed_80m": {
"average": 8,
"units": "km/h"
},
"soil_conditions": {
"moisture": {
"average": 340,
"units": "mm"
}
}
}

このセクションでは、 RenameByMappingRenameByRegexフィールド リネーム ポストプロセッサを構成して、シンク レコードのフィールド名を更新する方法を説明します。 フィールドの名前変更設定では、以下を指定します。

  • レコード内のキーまたは値のドキュメントを更新するかどうか

  • 更新するフィールド名

  • 新しいフィールド名

JSON 配列でRenameByMappingRenameByRegexの設定を指定する必要があります。 ドット表記 または パターン一致 のいずれかを使用して、ネストされたフィールドを指定できます。

フィールドリネームのポストプロセッサの例では、次の例のシンク レコードを使用します。

キー ドキュメント

{
"location": "Provence",
"date_month": "October",
"date_day": 17
}

値ドキュメント

{
"flapjacks": {
"purchased": 598,
"size": "large"
}
}

RenameByMappingポストプロセッサ 設定では、string に一致するフィールドを新しい名前に割り当てる 1 つ以上の JSON オブジェクトを指定します。 各オブジェクトには、以下の表に示すように、 oldName要素に一致するテキストと、 newName要素内の置換テキストが含まれています。

キー名
説明

oldName

キーまたは値のドキュメントと、置き換えるフィールド名を一致させるかどうかを指定します。 設定では「」が使用されます。 文字を使用して、2 つの値を区切ります。

newName

フィールドのすべての一致の置換フィールド名を指定します。

次のサンプル プロパティは、キー ドキュメントの「location」フィールドを照合し、その名前を「country」に変更します。

field.renamer.mapping=[{"oldName":"key.location", "newName":"country"}]

この設定は、 RenameByMappingポストプロセッサ に元のキー ドキュメントを次のドキュメントに変換するように指示します。

{
"country": "Provence",
"date_month": "October",
"date_day": 17
}

次のように、 oldNameフィールドにフィールド名が追加された値のドキュメントを指定することで、値のドキュメントでも同様のフィールド名の割り当てを実行できます。

field.renamer.mapping=[{"oldName":"value.flapjacks", "newName":"crepes"}]

この設定は、 RenameByMappingポストプロセッサ に、元の値ドキュメントを次のドキュメントに変換するように指示します。

{
"crepes": {
"purchased": 598,
"size": "large"
}
}

次の設定に示すように、string 形式の JSON 配列を使用して、 field.renamer.mappingプロパティで 1 つ以上のマッピングを指定することもできます。

field.renamer.mapping=[{ "oldName":"key.location", "newName":"city" }, { "oldName":"value.crepes", "newName":"flapjacks" }]

RenameByRegexポストプロセッサ 設定では、一致する必要があるフィールド名とテキスト パターンと、一致したテキストの置換値を指定します。 次の表で説明されているフィールドを含む JSON オブジェクトに、1 つ以上の名前変更式を指定できます。

キー名
説明

regexp

置換を実行するフィールドに一致する正規表現が含まれます。

パターン

置き換えるテキストに一致する正規表現を含みます。

replace

patternフィールドで定義した正規表現のすべての一致の置換テキストが含まれます。

次の例の設定では、ポストプロセッサに次の処理を実行するように指示します。

  • キー ドキュメント内の「date」で始まる任意のフィールド名と一致します。 一致するフィールドのセットで、パターン_に一致するすべてのテキストを-文字に置き換えます。

  • crepesのサブドキュメントである、値ドキュメント内の任意のフィールド名と一致します。 一致するフィールドのセットで、パターンpurchasedに一致するすべてのテキストをquantityに置き換えます。

field.renamer.regexp=[{"regexp":"^key\\.date.*$","pattern":"_","replace":"-"}, {"regexp":"^value\\.crepes\\..*","pattern":"purchased","replace":"quantity"}]

コネクタがサンプル キー ドキュメントサンプル 値ドキュメントに ポストプロセッサ を適用すると、次の内容を出力します。

キー ドキュメント

{
"location": "Provence",
"date-month": "October",
"date-day": 17
}

値ドキュメント

{
"crepes": {
"quantity": 598,
"size": "large"
}
}

警告

リネーム後のプロセッサは既存のフィールド名を上書きしません

リネーム ポストプロセッサで に設定するターゲット フィールド名により、同じドキュメント内に重複するフィールド名が発生する可能性があります。 これを回避するために、ポストプロセッサは、ドキュメントの同じレベルで既存のフィールド名が重複する場合、名前の変更をスキップします。

組み込みのポストプロセッサがユースケースをカバーしない場合は、次の手順を使用してカスタムのポストプロセッサクラスを作成できます。

  1. PostProcessor 抽象クラスを拡張するJavaクラスを作成します。

  2. クラス内のprocess()メソッドをオーバーライドします。 シンク レコードのキー フィールドと値フィールドの BSON 表現であるSinkDocumentを更新し、 メソッドで元の Kafka SinkRecordにアクセスできます。

  3. クラスを JAR ファイルにコンパイルします。

  4. コンパイルされた JAR を、すべてのKafkaワーカーのクラスパスまたはプラグインパスに追加します。プラグインパスの詳細については、Confluent のドキュメント「 Community Connector の手動インストール 」を参照してください。

  5. ポストプロセッサの完全なクラス名を書き込みプロセッサ チェーン構成に追加します。

ポストプロセッサの例については、組み込みのポストプロセッサ クラスのソースコードを参照できます。