AI エージェント向け: ドキュメントインデックスは https://www.mongodb.com/ja-jp/docs/llms.txt で利用できます。すべてのページの markdown バージョンは、いずれかの URL パスに .md を追加することで利用できます。
Make the MongoDB docs better! We value your opinion. Share your feedback for a chance to win $100.
MongoDB Branding Shape
Click here >
Docs Menu

$meta 集計ステージ(ストリーム プロセシング)

$meta 式は、ドキュメントのすべてのストリーミング メタデータを含むオブジェクトを返します。このデータは、ストリーム全体、または以下の Atlas Stream Processing の集計ステージのいずれかに対して公開できます。

$meta 式には次のプロトタイプ形式があります。

{ "$meta": <string> }
"source": {
"type": "<source-type>",
"ts": {
"$date": "<datetime>"
},
"topic": "<string>",
"partition": <int>,
"offset": <int>,
"key": "<kafka-key>",
"headers": [
{
"k": "<header-key>",
"v": "<header-value>"
}
],
"operationType": "<db-operation>",
"ns": {
"db": "<namespace-db>",
"coll": "<namespace-coll>"
},
"documentKey": {
"_id": {
"$oid": "<object-id>"
}
},
"initialSync": {
"phase": "<sync-state>"
}
"kinesisStream": "<kinesis-name>",
"shardId": "<kinesis-shard-id>",
"sequenceNumber": "<doc-uuid>",
"partitionKey": "<partition-id>",
}
"window": {
"start": <ISODate>,
"end": <ISODate>,
"partition": "<session-partition>"
},
"https": {
"url": "<target-url>",
"method": "<request-method>",
"httpStatusCode": <http-code>,
"responseTimeMs": <response-time-ms>
}

$meta 式は、メタデータのソースの完全修飾ドット構文パスに対応する単一の文字列入力を受け取ります。このパスのルートは "stream" である必要があります。次のパスをクエリできます。

パス
タイプ
条件
説明

stream

オブジェクト

常に

$source ステージおよびすべてのウィンドウ ステージ、またはパイプラインに構成された $https ステージのすべてのメタデータ。

stream.source

ドキュメント

常に

$sourceステージのメタデータを含むドキュメント。

stream.source.type

string

常に

ソースとして使用される接続のタイプ。

stream.source.ts

ISODate

常に

取り込み時点でのレコードの日時。

stream.source.topic

string

条件付き

ストリームがレコードを取り込むKafkaトピック。 Kafkaソースにのみ適用されます。

stream.source.partition

integer

条件付き

ストリームがレコードを取り込むKafkaトピックのパーティション。 Kafkaソースにのみ適用されます。

stream.source.offset

integer

条件付き

Kafkaソース パーティション内のメッセージ順序とキューの位置をオフセットする。 Kafkaソースにのみ適用されます。

stream.source.key

string | int | long | double |オブジェクト| binData

条件付き

パーティショニングと負荷分散のためにKafkaメッセージに割り当てられたキー。 Kafkaソースにのみ適用されます。

stream.source.headers

配列

条件付き

Kafkaメッセージメタデータを記述するキーと値のペアのセット。 Kafkaソースにのみ適用されます。

stream.source.operationType

string

条件付き

指定されたドキュメントに対して Atlas Stream Processing が実行を試みたデータベース操作のタイプ。 Atlas変更ストリームソースにのみ適用されます。

stream.source.ns

ドキュメント

条件付き

Atlas Stream Processing のソース ドキュメントの名前空間を含むドキュメント。 Atlas変更ストリームソースにのみ適用されます。

stream.source.ns.db

string

条件付き

Atlas Stream Processing が操作を試行するデータベースの名前。 Atlas変更ストリームソースにのみ適用されます。

この値は、コレクション変更ストリームまたはデータベース変更ストリーム ソースのすべてのドキュメントで同じです。クラスターの変更ストリーム ソースによって異なります。

stream.source.ns.coll

string

条件付き

Atlas Stream Processing が操作を試行するコレクションの名前。 Atlas変更ストリームソースにのみ適用されます。

この値は、コレクション 変更ストリーム ソースのすべてのドキュメントで同じです。データベース変更ストリームまたはクラスター変更ストリーム ソースによって異なります。

stream.source.documentKey._id

ドキュメント

条件付き

ソースドキュメントのオブジェクトIDを含むドキュメント。 Atlas変更ストリームソースにのみ適用されます。

stream.source.initialSync.phase

string

条件付き

最初の同期操作の現在の状態。最初の同期中に Atlas変更ストリームソースにのみ適用されます。

stream.source.kinesisStream

string

条件付き

Atlas Stream Processing がドキュメントをソースとするKinesis Data Stream の名前。 AWS Kinesisソースにのみ適用されます。

stream.source.shardId

string

条件付き

Atlas Stream Processing がドキュメントをソースとするKinesis Data Stream 内のシャードのID 。 AWS Kinesisソースにのみ適用されます。

stream.source.sequenceNumber

string

条件付き

Kinesis Data Stream から提供されたドキュメントの一意の識別子です。 AWS Kinesisソースにのみ適用されます。

stream.source.partitionKey

string

条件付き

ソースドキュメントが属するパーティションの一意の識別子。 AWS Kinesisソースにのみ適用されます。

stream.window

ドキュメント

条件付き

ウィンドウメタデータを含むドキュメント。ドキュメントがウィンドウで処理された場合にのみ適用されます。

stream.window.start

ISODate

条件付き

ウィンドウのオープン時間。ドキュメントがウィンドウで処理された場合にのみ適用されます。

stream.window.end

ISODate

条件付き

ウィンドウを閉じる。ドキュメントがウィンドウで処理された場合にのみ適用されます。

stream.window.partition

string

条件付き

ドキュメントが属するセッションウィンドウパーティション。ドキュメントが セッションウィンドウで処理された場合にのみ適用されます。

stream.https

ドキュメント

条件付き

$ https ステージのメタデータを含むドキュメント。$https ステージで処理の失敗が発生した場合にのみ適用されます。

stream.https.url

string

条件付き

$https ステージのターゲットURL 。 $https ステージで処理の失敗が発生した場合にのみ適用されます。

stream.https.method

string

条件付き

$https ステージで使用されるHTTPリクエストメソッド。 $https ステージで処理の失敗が発生した場合にのみ適用されます。

stream.https.httpStatusCode

整数

条件付き

リクエストのHTTPレスポンス ステータス コード 。 $https ステージで処理の失敗が発生した場合にのみ適用されます。

stream.https.responseTimeMs

整数

条件付き

リクエストの応答時間(ミリ秒単位)。 $https ステージで処理の失敗が発生した場合にのみ適用されます。

Atlas Stream Processing$meta 式は、既存のMongoDB$meta 集計式のすべての機能を提供します。ただし、標準のMongoDB集計クエリでは、$meta の Atlas Stream Processing バージョンに固有の機能を使用することはできません。

次の例では、データが取り込まれた Kafka ソース トピックの配列を使用してストリームの出力を強化します。

{
$source: {
connectionName: "kafka",
topic: ["t1", "t2", "t3"]
}
},
{
$emit: {
connectionName: "kafka",
topic: {
$concat: [
{
$meta: "stream.source.topic"
},
"out"
]
}
}
}

次の例では、各ウィンドウの開始時刻を報告するストリームにフィールドを追加します。

{
$source: {
connectionName: "kafka",
topic: "t1"
}
},
{
$hoppingWindow: . . .
},
{
$addFields: {
start: { $meta: "stream.window.start" }
}
}