Docs Menu
Docs Home
/
Atlas
/ /

$emit ステージ(Stream Processing)

$emitステージは、メッセージを出力する接続レジストリで接続を指定します。 接続は Apache Kafka のいずれかである必要があります。 プロバイダーまたは 時系列コレクション。

処理されたデータを Apache Kafka に書き込む$emit プロバイダーは、次のプロトタイプ形式を持つ パイプライン ステージを使用します。

{
"$emit": {
"connectionName": "<registered-connection>",
"topic": "<target-topic>" | <expression>,
"config": {
"acks": <number-of-acknowledgements>,
"compression_type": "<compression-type>",
"dateFormat": "default" | "ISO8601",
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

$emitステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続の名前(接続レジストリに表示されます)。

topic

string |式

必須

Apache Kafka の名前 メッセージを送信するトピック。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.acks

整数

任意

成功した $emit 操作に対して、Apache Kafka クラスターから求められる確認応答の数。

デフォルト値は all です。Atlas Stream Processing は以下の値をサポートしています。

  • -1

  • 0

  • 1

  • all

config.compression_type

string

任意

制作者が生成するすべてのデータの圧縮形式。デフォルトは「なし」になっています(つまり、圧縮は行われません)。有効な値は次のとおりです。

  • none

  • gzip

  • snappy

  • lz4

  • zstd

圧縮はデータの完全なバッチに使用されるため、バッチ処理の効率は圧縮率に影響します。バッチ処理が多いほど、圧縮率が向上します。

config.dateFormat

string

任意

日付値のための日付フォーマット。有効な値は次のとおりです。

  • default outputFormat のデフォルトを使用する。

  • ISO8601 - ミリ秒単位の精度(YYYY-MM-DDTHH:mm:ss.sssZ)を含む ISO8601 形式で、日付を文字列に変換する。

以下に例を挙げます。

次の入力をご覧ください。

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

$emit.config.defaultFormatdefault に設定された場合、出力は次のようになります。

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

$emit.config.defaultFormatISO8601 に設定された場合、出力は次のようになります。

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.headers

任意

出力メッセージに追加する ヘッダー 。 式は、オブジェクトまたは配列のいずれかに評価される必要があります。

式がオブジェクトと評価される場合、Atlas Stream Processing は、そのオブジェクト内の各キーと値のペアからヘッダーを構築します。キーはヘッダー名、値はヘッダー値です。

式が配列と評価される場合は、キーと値のペア オブジェクトの配列の形式になる必要があります。 例:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing は、配列内の各オブジェクトからヘッダーを構築します。キーはヘッダー名で、値はヘッダー値です。Atlas Stream Processing は次の種類のヘッダー値をサポートしています。

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

オブジェクト | string | 式

任意

Apache Kafka として評価される 式 メッセージ キー。

config.keyを指定する場合は、 config.keyFormatを指定する必要があります。

config.keyFormat

string

条件付き

Apache Kafka を逆直列化するために使用されるデータ型 キー データ。次のいずれかの値である必要があります。

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

デフォルトはbinDataです。 config.keyを指定する場合は、 config.keyFormatを指定する必要があります。 ドキュメントのconfig.keyが指定されたデータ型に正常に逆シリアル化されない場合、Atlas Stream Processing はそれをデッド レター キューに送信します。

config.outputFormat

string

任意

Apache Kafka にメッセージを送信するときに使用する JSON 形式 。次のいずれかの値である必要があります。

  • "relaxedJson"

  • "canonicalJson"

デフォルトは "relaxedJson" です。

処理されたデータを Atlas 時系列コレクションに書き込むには、次のプロトタイプ形式で$emitパイプライン ステージを使用します。

{
"$emit": {
"connectionName": "<registered-connection>",
"db": "<target-db>",
"coll": "<target-coll>",
"timeseries": {
<options>
}
}
}

$emitステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続の名前(接続レジストリに表示されます)。

db

string

必須

ターゲット 時系列コレクションを含む Atlas database の名前。

coll

string

必須

書き込み先の Atlas 時系列コレクションの名前。

timeseries

ドキュメント

必須

コレクションの時系列フィールドを定義するドキュメント。

注意

時系列コレクション内のドキュメントの最大サイズは4 MB です。 詳細については「時系列コレクションの制限 」を参照してください。

処理済みデータを AWS S3 バケットシンク接続に書き込むには、次のプロトタイプ形式で $emit パイプラインステージを使用します:

{
"$emit": {
"connectionName": "<registered-connection>",
"bucket": "<target-bucket>",
"region": "<target-region>",
"path": "<key-prefix>" | <expression>,
"config": {
"writeOptions": {
"count": <doc-count>,
"bytes": <threshold>,
"interval": {
"size": <unit-count>,
"unit": "<time-denomination>"
}
},
"delimiter": "<delimiter>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
"compression": "gzip" | "snappy",
"compressionLevel": <level>
}
}
}

$emitステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

接続レジストリに表示される、データを書き込む先の接続の名前。

bucket

string

必須

データを書き込む先の S3 バケットの名前。

region

string

任意

対象バケットが存在する AWS リージョンの名前です。ストリーム処理インスタンスを AWS リージョンでホストしている場合、このパラメータはそのリージョンがデフォルトになります。そうでない場合は、ストリーム処理インスタンスのホストリージョンに最も近い AWS リージョンがデフォルトになります。

path

string |式

必須

S3 バケットに書き込まれるオブジェクトのキーのプレフィックスです。リテラルのプレフィックス文字列、または文字列に評価される式でなければなりません。

config

ドキュメント

任意

さまざまなデフォルト値を上書きする追加のパラメーターを含むドキュメント。

config.writeOptions

ドキュメント

任意

書き込み動作を制御する追加パラメーターを含むドキュメントです。これらのパラメーターは、どの閾値が最初に満たされるかに応じて書き込み動作をトリガーします。

例えば、取り込まれたドキュメントが config.writeOptions.interval の閾値に達していなくても config.writeOptions.count の閾値に達した場合、ストリームプロセッサは config.writeOptions.count の閾値に従ってこれらのドキュメントを S3 に送出します。

config.writeOptions.count

integer

任意

S3 に書き込む各ファイルにグループ化するドキュメントの数。

config.writeOptions.bytes

integer

任意

S3 にファイルを書き込む前に蓄積される必要がある最小バイト数を指定します。バイト数は、最終出力ファイルのサイズではなく、パイプラインで取り込まれた BSON ドキュメントのサイズによって決まります。

config.writeOptions.interval

ドキュメント

任意

ドキュメントを一括書き込みするためのタイマーをsizeunitsの組み合わせとして指定します。

デフォルトは 1 分です。いかなるunitに対してもsizeを0に設定することはできません。最大間隔は7日です。

config.writeOptions.interval.size

integer

条件付き

ストリームプロセッサが S3 にドキュメントを一括書き込みするまでの、writeOptions.interval.units で指定された単位数。

デフォルトは1です。0 の size は設定できません。writeOptions.interval を定義する場合は、このパラメーターも必ず定義する必要があります。

config.writeOptions.interval.units

string

条件付き

一括書き込みタイマーをカウントする時間の単位です。このパラメーターは以下の値をサポートしています:

  • ms

  • second

  • minute

  • hour

  • day

デフォルトはminuteです。writeOptions.intervalを定義する場合は、このパラメータも定義しなければなりません。

config.delimiter

string

任意

出力されたファイル内の各エントリの間の区切り文字。

デフォルトは \n です。

config.outputFormat

string

任意

S3 に書き込まれる JSON の出力形式を指定します。次のいずれかの値でなければなりません:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

デフォルトは「relaxedJson」です。

詳しくは、Basic JSON をご覧ください。

config.dateFormat

string

任意

日付値のための日付フォーマット。有効な値は次のとおりです。

  • default outputFormat のデフォルトを使用する。

  • ISO8601 - ミリ秒単位の精度(YYYY-MM-DDTHH:mm:ss.sssZ)を含む ISO8601 形式で、日付を文字列に変換する。

例として、次のレコードをパイプラインに追加した場合:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

$emit.config.defaultFormatdefault に設定されている場合、出力は次のようになります:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

$emit.config.defaultFormatISO8601 に設定された場合、出力は次のようになります。

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.compression

string

任意

使用する圧縮アルゴリズムの名称。次のいずれかの値でなければなりません:

  • "gzip"

  • "snappy"

config.compressionLevel

string

条件付き

出力されるメッセージに適用する圧縮レベルです。1-9 を含む値をサポートしており、値が大きいほど圧縮率が高くなります。

デフォルトは 6 です。

このパラメーターは gzip に対して必須であり、かつそれに限定されます。config.compressionsnappy に設定した場合、このパラメーターを設定しても効果はありません。

メッセージの取り込みを容易にするため、Atlas Stream Processing は RelaxedJSON 形式を簡略化した Basic JSON をサポートしています。以下の表は、影響を受けるすべてのフィールドに対するこれらの簡略化の例を示しています。

フィールド型
Relaxed JSON
basicJson

バイナリ

{ "binary": { "$binary": { "base64": "gf1UcxdHTJ2HQ/EGQrO7mQ==", "subType": "00" }}}

{ "binary": "gf1UcxdHTJ2HQ/EGQrO7mQ=="}

日付

{ "date": { "$date": "2024-10-24T18:07:29.636Z"}}

{ "date": 1729625275856}

小数点

{ "decimal": { "$numberDecimal": "9.9" }}

{ "decimal": "9.9" }

タイムスタンプ

{ "timestamp": { "$timestamp": { "t": 1729793249, "i": 1 }}}

{ "timestamp": 1729793249000}

ObjectId

{ "_id": { "$oid": "671a8ce1497407eff0e17cba" }}

{ "_id": "6717fcbba18c8a8f74b6d977" }

Negative Infinity

{ "negInf": { "$numberDouble": "-Infinity" }}

{ "negInf": "-Infinity" }

Positive Infinity

{ "posInf": { "$numberDouble": "Infinity" }}

{ "posInf": "Infinity" }

正規表現

{ "regex": { "$regularExpression": { "pattern": "ab+c", "options": "i" }}}

{ "regex": { "pattern": "ab+c", "options": "i" }}

UUID

{ "uuid": { "$binary": { "base64": "Kat+fHk6RkuAmotUmsU7gA==", "subType": "04" }}}

{ "uuid": "420b7ade-811a-4698-aa64-c8347c719cf1"}

$emit は、表示されるすべてのパイプラインの 最後のステージ である必要があります。 パイプラインごとに使用できる$emitステージは 1 つだけです。

ストリーム プロセッサごとに 1 つの Atlas 時系列コレクションにのみ書込み (write) ができます。 存在しないコレクションを指定した場合、Atlas は指定した時系列フィールドでコレクションを作成します。 既存のデータベースを指定する必要があります。

ストリーム プロセッサが別のターゲット Apache Kafka に書き込むようにするために、 フィールドの値として 動的式topic を使用できます。 メッセージごとにトピックを作成します。式は string として評価される必要があります。

次の形式のメッセージを生成するトランザクション イベントのストリームがあります。

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

これらをそれぞれ個別の Apache Kafka に並べ替えるには、 トピックには、次の$emit ステージを記述できます。

{
"$emit": {
"connectionName": "kafka1",
"topic": "$customerStatus"
}
}

この$emitステージ:

  • Very Important IndustriesメッセージをVIPという名前のトピックに書き込みます。

  • N. E. Buddyメッセージをemployeeという名前のトピックに書き込みます。

  • Khan Traktorメッセージをcontractorという名前のトピックに書き込みます。

動的式の詳細については、「式演算子 」を参照してください。

まだ存在しないトピックを指定した場合、 Apache Kafka は、それを対象とする最初のメッセージを受信したときにトピックを自動的に作成します。

動的な式でトピックを指定したものの、Atlas Stream Processing が特定のメッセージの式を評価できない場合、構成されている場合、Atlas Stream Processing はそのメッセージをデッドレターキュー(DLQ)に送信し、以降のメッセージを処理します。デッドレターキュー(DLQ)が構成されていない場合、Atlas Stream Processing はそのメッセージを完全にスキップし、以降のメッセージを処理します。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. ステージは Apache$source Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、 ingestionTimeに設定されます。

  2. $match ステージでは、airTemperature.value30.0 以上であるドキュメントを除外し、airTemperature.value30.0 未満のドキュメントを次のステージに渡します。

  3. $addFields ステージでは、ストリームにメタデータが追加されます。

  4. $emit ステージは、weatherStreamOutput Kafkaブローカー接続を介してstream というトピックに出力を書き込みます。

{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata",
"tsFieldName": "ingestionTime"
}
},
{
"$match": {
"airTemperature.value": {
"$lt": 30
}
}
},
{
"$addFields": {
"processorMetadata": {
"$meta": "stream"
}
}
},
{
"$emit": {
"connectionName": "weatherStreamOutput",
"topic": "stream"
}
}

stream トピックのドキュメントは以下の形式をとります:

{
"st": "x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8, 116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1", "AG1", "UG1", "SA1", "MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight": {
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime": {
"$date": "2024-09-26T17:34:41.843Z"
},
"_stream_meta": {
"source": {
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$tumblingWindow

項目一覧