定義
$mergeステージは、メッセージを書き込む接続レジストリで接続を指定します。 接続は Atlas 接続である必要があります。
$mergeパイプライン ステージには次のプロトタイプ形式があります。
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>", "whenNotMatched": "insert | discard | expression", "parallelism": <integer> } }
構文
Atlas Stream Processing バージョンの $merge は、Atlas Data Federation バージョンと同じフィールドの大半を使用します。Atlas Stream Processing では、$merge の実装に固有のものか、それに合うように変更された次のフィールドも使用します。Atlas Data Federation $merge と共有されるフィールドの詳細については「$merge 構文」を参照してください。
フィールド | 必要性 | 説明 |
|---|---|---|
| 必須 | Atlas Stream Processing が 詳細については、Atlas Data Federation |
| 任意 | Atlas Data Federation の
動的な式の値を使用する場合、次のいずれかの文字列に解決されなければなりません。
|
| 任意 | 動的な式のサポートのある Atlas Data Federation の 動的な式の値を使用する場合、次のいずれかの文字列に解決されなければなりません。
|
| 条件付き | 書き込み操作を分散するスレッドの数。
|
動作
制限
$merge は、表示されるすべてのパイプラインの 最後のステージ である必要があります。 パイプラインごとに使用できる$mergeステージは 1 つだけです。
onフィールドには、シャーディングされたコレクションに対する$mergeに対する特別な要件があります。 詳細については、「 $merge 構文 」を参照してください。
into.coll または into.db に動的な式の値を使用する場合、parallelism 値を 1 より大きく設定することはできません。
$merge は時系列コレクションには書き込みできません。時系列コレクションにドキュメントを書き込むには、$emit ステージを使用してください。
$merge をシャーディングされたコレクションで使用するには、Atlas 管理者ロールが必要です。
ダイナミックな式
次のフィールドの値として動的式を使用できます。
into.dbinto.coll
これにより、ストリーム プロセッサは、メッセージごとに異なるターゲット Atlas コレクションにメッセージを書き込むことができます。
例
次の形式のメッセージを生成するトランザクション イベントのストリームがあります。
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
これらをそれぞれ個別の Atlas データベースとコレクションに並べ替えるには、次の$mergeステージを記述します。
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" } }
この$mergeステージ:
Very Important IndustriesメッセージをVIP.subscriptionという名前の Atlas コレクションに書き込みます。N. E. Buddyメッセージをemployee.requisitionという名前の Atlas コレクションに書き込みます。Khan Traktorメッセージをcontractor.billableHoursという名前の Atlas コレクションに書き込みます。
動的式は string として評価されるもののみを使用できます。 動的式の詳細については、「式演算子 」を参照してください。
動的な式を使用してデータベースまたはコレクションを指定した場合に、Atlas Stream Processing が特定のメッセージに対してその式を評価できないと、構成されていればそのメッセージはデッドレターキュー(DLQ)に送信され、以降のメッセージは処理されます。デッドレターキュー(DLQ)が構成されていない場合、Atlas Stream Processing はそのメッセージを完全にスキップし、以降のメッセージを処理します。
Kafka トピックからデータを保存
複数の Apache Kafka トピック からのストリーミングデータを Atlas クラスターのコレクションに保存するには、$merge ステージと $source ステージを使用します。$source ステージは、データを読み取るトピックを指定します。$merge ステージはデータをターゲットコレクションに書き込みます。
次の構文を使用します。
{ "$source": { "connectionName": "<registered-kafka-connection>", "topic": [ "<topic-name-1>", "<topic-name-2>", ... ] } }, { "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> } }, ... }
例
基本的な例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
$sourceステージでは、 という名前のトピックでこれらのレポートを収集するApachemy_weatherdataKafkaプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれる際に公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTimeに設定されます。$matchステージでは、dewPoint.value5.0が 以下のドキュメントを除外し、 がdewPoint.value5.0より大きいドキュメントを次のステージに渡します。$mergeステージは、sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。
変更ストリーム イベントを複製する
$merge.whenMatched パラメータと $merge.whenNotMatchedパラメータを使用すると、操作タイプに応じて変更ストリーム イベントの効果を複製できます。
次の集計には 4 つのステージがあります。
$sourceステージは、atlas1接続を介して Atlas クラスター上のdb1.coll1コレクションへの接続を確立します。$addFieldsステージは、取り込まれたドキュメントに、各ドキュメントの"$operationTypeの値と"delete"の値が等しいかどうかを判定した結果をfullDocument._isDeleteフィールドとして追加します。この等価性はブール値として評価されます。$replaceRootステージは、ドキュメントを、拡張された$fullDocumentフィールドの値と置き換えます。$mergeステージはatlas2接続を介してdb1.coll1に書き込み、各ドキュメントに対して 2 回のチェックを実行します。まず、
whenMatchedフィールドは、db1.coll1コレクション内の既存のドキュメントと_idで一致するかどうかを確認します。onが明示的に設定されていないため、デフォルトの一致フィールドです。それが一致し、かつfullDocument._isDeleteがtrueに設定されている場合、Atlas は一致するドキュメントを削除します。それが一致し、かつfullDocument._isDeleteがfalseに設定されている場合、Atlas は一致するドキュメントをストリーミング データソースからの新しいドキュメントに置き換えます。次に、Atlas Stream Processing がそのような一致するドキュメントを見つけられず、
fullDocument._isDeleteが true の場合、Atlas はそのドキュメントをコレクションに書き込む代わりに破棄します。そのような一致するドキュメントが存在せず、fullDocument._isDeleteが false の場合、Atlas はストリーミング データソースからドキュメントをコレクションに挿入します。
{ $source: { connectionName: “atlas1”, db: “db1”, coll: “coll1”, fullDocument: “required” } }, { $addFields: { “fullDocument._isDelete”: { $eq: [ “$operationType”, “delete” ] } } }, { $replaceRoot: { newRoot: “$fullDocument” } }, { $merge: { into: { connectionName: “atlas2”, db: “db1”, coll: “coll1” }, whenMatched: { $cond: { if: “$_isDelete”, then: “delete”, else: “replace” } }, whenNotMatched: { $cond: { if: “$_isDelete”, then: “discard”, else: “insert” } }, } }