定義
$meta 式は、ドキュメントのすべてのストリーミング メタデータを含むオブジェクトを返します。このデータは、ストリーム全体、または以下の Atlas Stream Processing の集計ステージのいずれかに対して公開できます。
$meta 式には次のプロトタイプ形式があります。
{ "$meta": <string> }
"source": { "type": "<source-type>", "ts": { "$date": "<datetime>" }, "topic": "<string>", "partition": <int>, "offset": <int>, "key": "<kafka-key>", "headers": [ { "k": "<header-key>", "v": "<header-value>" } ], "operationType": "<db-operation>", "ns": { "db": "<namespace-db>", "coll": "<namespace-coll>" }, "documentKey": { "_id": { "$oid": "<object-id>" } }, "initialSync": { "phase": "<sync-state>" } "kinesisStream": "<kinesis-name>", "shardId": "<kinesis-shard-id>", "sequenceNumber": "<doc-uuid>", "partitionKey": "<partition-id>", } "window": { "start": <ISODate>, "end": <ISODate>, "partition": "<session-partition>" }, "https": { "url": "<target-url>", "method": "<request-method>", "httpStatusCode": <http-code>, "responseTimeMs": <response-time-ms> }
構文
$meta 式は、メタデータのソースの完全修飾ドット構文パスに対応する単一の文字列入力を受け取ります。このパスのルートは "stream" である必要があります。次のパスをクエリできます。
パス | タイプ | 条件 | 説明 |
|---|---|---|---|
| オブジェクト | 常に | |
| ドキュメント | 常に |
|
| string | 常に | ソースとして使用される接続のタイプ。 |
| ISODate | 常に | 取り込み時点でのレコードの日時。 |
| string | 条件付き | ストリームがレコードを取り込むKafkaトピック。 Kafkaソースにのみ適用されます。 |
| integer | 条件付き | ストリームがレコードを取り込むKafkaトピックのパーティション。 Kafkaソースにのみ適用されます。 |
| integer | 条件付き | Kafkaソース パーティション内のメッセージ順序とキューの位置をオフセットする。 Kafkaソースにのみ適用されます。 |
| string | int | long | double |オブジェクト| binData | 条件付き | パーティショニングと負荷分散のためにKafkaメッセージに割り当てられたキー。 Kafkaソースにのみ適用されます。 |
| 配列 | 条件付き | Kafkaメッセージメタデータを記述するキーと値のペアのセット。 Kafkaソースにのみ適用されます。 |
| string | 条件付き | 指定されたドキュメントに対して Atlas Stream Processing が実行を試みたデータベース操作のタイプ。 Atlas変更ストリームソースにのみ適用されます。 |
| ドキュメント | 条件付き | Atlas Stream Processing のソース ドキュメントの名前空間を含むドキュメント。 Atlas変更ストリームソースにのみ適用されます。 |
| string | 条件付き | |
| string | 条件付き | Atlas Stream Processing が操作を試行するコレクションの名前。 Atlas変更ストリームソースにのみ適用されます。 この値は、コレクション 変更ストリーム ソースのすべてのドキュメントで同じです。データベース変更ストリームまたはクラスター変更ストリーム ソースによって異なります。 |
| ドキュメント | 条件付き | ソースドキュメントのオブジェクトIDを含むドキュメント。 Atlas変更ストリームソースにのみ適用されます。 |
| string | 条件付き | 最初の同期操作の現在の状態。最初の同期中に Atlas変更ストリームソースにのみ適用されます。 |
| string | 条件付き | Atlas Stream Processing がドキュメントをソースとするKinesis Data Stream の名前。 AWS Kinesisソースにのみ適用されます。 |
| string | 条件付き | Atlas Stream Processing がドキュメントをソースとするKinesis Data Stream 内のシャードのID 。 AWS Kinesisソースにのみ適用されます。 |
| string | 条件付き | Kinesis Data Stream から提供されたドキュメントの一意の識別子です。 AWS Kinesisソースにのみ適用されます。 |
| string | 条件付き | ソースドキュメントが属するパーティションの一意の識別子。 AWS Kinesisソースにのみ適用されます。 |
| ドキュメント | 条件付き | ウィンドウメタデータを含むドキュメント。ドキュメントがウィンドウで処理された場合にのみ適用されます。 |
| ISODate | 条件付き | ウィンドウのオープン時間。ドキュメントがウィンドウで処理された場合にのみ適用されます。 |
| ISODate | 条件付き | ウィンドウを閉じる。ドキュメントがウィンドウで処理された場合にのみ適用されます。 |
| string | 条件付き | ドキュメントが属するセッションウィンドウパーティション。ドキュメントが セッションウィンドウで処理された場合にのみ適用されます。 |
| ドキュメント | 条件付き | $ https ステージのメタデータを含むドキュメント。 |
| string | 条件付き |
|
| string | 条件付き |
|
| 整数 | 条件付き | リクエストのHTTPレスポンス ステータス コード 。 |
| 整数 | 条件付き | リクエストの応答時間(ミリ秒単位)。 |
動作
Atlas Stream Processing$meta 式は、既存のMongoDB$meta 集計式のすべての機能を提供します。ただし、標準のMongoDB集計クエリでは、$meta の Atlas Stream Processing バージョンに固有の機能を使用することはできません。
例
次の例では、データが取り込まれた Kafka ソース トピックの配列を使用してストリームの出力を強化します。
{ $source: { connectionName: "kafka", topic: ["t1", "t2", "t3"] } }, { $emit: { connectionName: "kafka", topic: { $concat: [ { $meta: "stream.source.topic" }, "out" ] } } }
次の例では、各ウィンドウの開始時刻を報告するストリームにフィールドを追加します。
{ $source: { connectionName: "kafka", topic: "t1" } }, { $hoppingWindow: . . . }, { $addFields: { start: { $meta: "stream.window.start" } } }