Docs Menu
Docs Home
/
Atlas
/ /

$emit 集計ステージ(ストリーム プロセシング)

$emit ステージは、メッセージを出力する 接続レジストリで接続を指定します。接続は、 Apache Kafka プロバイダーまたは時系列コレクション のいずれかである必要があります。

処理されたデータをApache Kafkaプロバイダーに書き込むには、次のプロトタイプ形式で $emitパイプラインステージを使用します。

{
"$emit": {
"connectionName": "<registered-connection>",
"topic": "<target-topic>" | <expression>,
"config": {
"acks": <number-of-acknowledgements>,
"compression_type": "<compression-type>",
"dateFormat": "default" | "ISO8601",
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<serialization-type>",
"outputFormat": "<json-format>",
"tombstoneWhen": <expression>
}
}
}

$emitステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続の名前(接続レジストリに表示されます)。

topic

string |式

必須

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.acks

整数

任意

成功した $emit 操作に対して、Apache Kafka クラスターから求められる確認応答の数。

デフォルト値は all です。Atlas Stream Processing は以下の値をサポートしています。

  • -1

  • 0

  • 1

  • all

config.compression_type

string

任意

制作者が生成するすべてのデータの圧縮形式。デフォルトは「なし」になっています(つまり、圧縮は行われません)。有効な値は次のとおりです。

  • none

  • gzip

  • snappy

  • lz4

  • zstd

圧縮はデータの完全なバッチに使用されるため、バッチ処理の効率は圧縮率に影響します。バッチ処理が多いほど、圧縮率が向上します。

config.dateFormat

string

任意

日付値のための日付フォーマット。有効な値は次のとおりです。

  • default outputFormat のデフォルトを使用する。

  • ISO8601 - ミリ秒単位の精度(YYYY-MM-DDTHH:mm:ss.sssZ)を含む ISO8601 形式で、日付を文字列に変換する。

以下に例を挙げます。

次の入力をご覧ください。

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

$emit.config.dateFormatdefault に設定された場合、出力は次のようになります。

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

$emit.config.dateFormatISO8601 に設定された場合、出力は次のようになります。

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.headers

任意

出力メッセージに追加する ヘッダー 。 式は、オブジェクトまたは配列のいずれかに評価される必要があります。

式がオブジェクトと評価される場合、Atlas Stream Processing は、そのオブジェクト内の各キーと値のペアからヘッダーを構築します。キーはヘッダー名、値はヘッダー値です。

式が配列と評価される場合は、キーと値のペア オブジェクトの配列の形式になる必要があります。 例:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing は、配列内の各オブジェクトからヘッダーを構築します。キーはヘッダー名で、値はヘッダー値です。Atlas Stream Processing は次の種類のヘッダー値をサポートしています。

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

オブジェクト | string | 式

任意

Apache Kafkaメッセージ キーとして評価される 式 。

config.keyを指定する場合は、 config.keyFormatを指定する必要があります。

config.keyFormat

string

条件付き

Apache Kafkaキー データを直列化するために使用されるデータ型。次のいずれかの値である必要があります。

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

デフォルトは binData です。config.key を指定する場合は、config.keyFormat を指定する必要があります。ドキュメントの config.key が指定されたデータ型に正常に直列化されない場合、Atlas Stream Processing ではデッドレターキュー (DLQ) に送信されます。

config.outputFormat

string

任意

Apache Kafkaにメッセージを発行するときに使用するJSON形式。次のいずれかの値である必要があります。

  • "relaxedJson"

  • "canonicalJson"

デフォルトは "relaxedJson" です。

config.tombstoneWhen

任意

Kafkaに null を発行するタイミングを決定する 式 。式はブール値true または false のいずれかに評価する必要があります。式が特定のドキュメントに対して true と評価されると、Atlas Stream Processing はその場所で null をKafkaシンクに出力します。式がfalse と評価された場合、Atlas Stream Processing は $emit ステージに達したときにドキュメントが存在するように出力します。

式がブール値値に評価できない場合、または評価できない場合、Atlas Stream Processing はドキュメントをDLQ に書込みます。

この設定を使用すると、$emit.config.key$emit.config.keyFormat の値を指定する場合にトピック圧縮を有効にできます。これらの値を指定しない場合、この式がtrue と評価されたときに Atlas Stream Processing は引き続き null を発行しますが、これらによってKafkaトピックの圧縮はトリガーされません。

処理されたデータを Atlas 時系列コレクションに書き込むには、次のプロトタイプ形式で$emitパイプライン ステージを使用します。

{
"$emit": {
"connectionName": "<registered-connection>",
"db": "<target-db>" | <expression>,
"coll": "<target-coll>" | <expression>,
"timeseries": {
<options>
}
}
}

$emitステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続の名前(接続レジストリに表示されます)。

db

string |式

必須

ターゲット時系列コレクションを含む Atlas データベースに解決される、または式の名前。

coll

string |式

必須

書込み先の Atlas時系列コレクションに解決される、または式の名前。

timeseries

ドキュメント

必須

コレクションの時系列フィールドを定義するドキュメント。

注意

時系列コレクション内のドキュメントの最大サイズは4 MB です。 詳細については「時系列コレクションの制限 」を参照してください。

処理済みデータを AWS S3 バケットシンク接続に書き込むには、次のプロトタイプ形式で $emit パイプラインステージを使用します:

{
"$emit": {
"connectionName": "<registered-connection>",
"bucket": "<target-bucket>",
"region": "<target-region>",
"path": "<key-prefix>" | <expression>,
"config": {
"writeOptions": {
"count": <doc-count>,
"bytes": <threshold>,
"interval": {
"size": <unit-count>,
"unit": "<time-denomination>"
}
},
"delimiter": "<delimiter>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
"compression": "gzip" | "snappy",
"compressionLevel": <level>
}
}
}

$emitステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

接続レジストリに表示される、データを書き込む先の接続の名前。

bucket

string

必須

データを書き込む先の S3 バケットの名前。

region

string

任意

対象バケットが存在する AWS リージョンの名前です。ストリーム処理インスタンスを AWS リージョンでホストしている場合、このパラメータはそのリージョンがデフォルトになります。そうでない場合は、ストリーム処理インスタンスのホストリージョンに最も近い AWS リージョンがデフォルトになります。

path

string |式

必須

S3 バケットに書き込まれたオブジェクトのキーのプレフィックス。リテラルのプレフィックス string または、string を評価する式である必要があります。

config

ドキュメント

任意

さまざまなデフォルト値を上書きする追加のパラメーターを含むドキュメント。

config.writeOptions

ドキュメント

任意

書き込み動作を制御する追加パラメーターを含むドキュメントです。これらのパラメーターは、どの閾値が最初に満たされるかに応じて書き込み動作をトリガーします。

例えば、取り込まれたドキュメントが config.writeOptions.interval の閾値に達していなくても config.writeOptions.count の閾値に達した場合、ストリームプロセッサは config.writeOptions.count の閾値に従ってこれらのドキュメントを S3 に送出します。

config.writeOptions.count

integer

任意

S3 に書き込む各ファイルにグループ化するドキュメントの数。

config.writeOptions.bytes

integer

任意

S3 にファイルを書き込む前に蓄積される必要がある最小バイト数を指定します。バイト数は、最終出力ファイルのサイズではなく、パイプラインで取り込まれた BSON ドキュメントのサイズによって決まります。

config.writeOptions.interval

ドキュメント

任意

ドキュメントを一括書き込みするためのタイマーをsizeunitsの組み合わせとして指定します。

デフォルトは 1 分です。いかなるunitに対してもsizeを0に設定することはできません。最大間隔は7日です。

config.writeOptions.interval.size

integer

条件付き

ストリームプロセッサが S3 にドキュメントを一括書き込みするまでの、writeOptions.interval.units で指定された単位数。

デフォルトは1です。0 の size は設定できません。writeOptions.interval を定義する場合は、このパラメーターも必ず定義する必要があります。

config.writeOptions.interval.units

string

条件付き

一括書き込みタイマーをカウントする時間の単位です。このパラメーターは以下の値をサポートしています:

  • ms

  • second

  • minute

  • hour

  • day

デフォルトはminuteです。writeOptions.intervalを定義する場合は、このパラメータも定義しなければなりません。

config.delimiter

string

任意

出力されたファイル内の各エントリの間の区切り文字。

デフォルトは \n です。

config.outputFormat

string

任意

S3 に書き込まれる JSON の出力形式を指定します。次のいずれかの値でなければなりません:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

デフォルトは「relaxedJson」です。

詳しくは、Basic JSON をご覧ください。

config.dateFormat

string

任意

日付値のための日付フォーマット。有効な値は次のとおりです。

  • default outputFormat のデフォルトを使用する。

  • ISO8601 - ミリ秒単位の精度(YYYY-MM-DDTHH:mm:ss.sssZ)を含む ISO8601 形式で、日付を文字列に変換する。

例として、次のレコードをパイプラインに追加した場合:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

$emit.config.dateFormatdefault に設定されている場合、出力は次のようになります:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

$emit.config.dateFormatISO8601 に設定された場合、出力は次のようになります。

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.compression

string

任意

使用する圧縮アルゴリズムの名称。次のいずれかの値でなければなりません:

  • "gzip"

  • "snappy"

config.compressionLevel

string

条件付き

出力されるメッセージに適用する圧縮レベルです。1-9 を含む値をサポートしており、値が大きいほど圧縮率が高くなります。

デフォルトは 6 です。

このパラメーターは gzip に対して必須であり、かつそれに限定されます。config.compressionsnappy に設定した場合、このパラメーターを設定しても効果はありません。

メッセージの取り込みを容易にするため、Atlas Stream Processing は RelaxedJSON 形式を簡略化した Basic JSON をサポートしています。以下の表は、影響を受けるすべてのフィールドに対するこれらの簡略化の例を示しています。

フィールド型
Relaxed JSON
basicJson

バイナリ

{ "binary": { "$binary": { "base64": "gf1UcxdHTJ2HQ/EGQrO7mQ==", "subType": "00" }}}

{ "binary": "gf1UcxdHTJ2HQ/EGQrO7mQ=="}

日付

{ "date": { "$date": "2024-10-24T18:07:29.636Z"}}

{ "date": 1729625275856}

小数点

{ "decimal": { "$numberDecimal": "9.9" }}

{ "decimal": "9.9" }

タイムスタンプ

{ "timestamp": { "$timestamp": { "t": 1729793249, "i": 1 }}}

{ "timestamp": 1729793249000}

ObjectId

{ "_id": { "$oid": "671a8ce1497407eff0e17cba" }}

{ "_id": "6717fcbba18c8a8f74b6d977" }

Negative Infinity

{ "negInf": { "$numberDouble": "-Infinity" }}

{ "negInf": "-Infinity" }

Positive Infinity

{ "posInf": { "$numberDouble": "Infinity" }}

{ "posInf": "Infinity" }

正規表現

{ "regex": { "$regularExpression": { "pattern": "ab+c", "options": "i" }}}

{ "regex": { "pattern": "ab+c", "options": "i" }}

UUID

{ "uuid": { "$binary": { "base64": "Kat+fHk6RkuAmotUmsU7gA==", "subType": "04" }}}

{ "uuid": "420b7ade-811a-4698-aa64-c8347c719cf1"}

$emit は、表示されるすべてのパイプラインの 最後のステージ である必要があります。 パイプラインごとに使用できる$emitステージは 1 つだけです。

ストリーム プロセッサごとに 1 つの Atlas 時系列コレクションにのみ書込み (write) ができます。 存在しないコレクションを指定した場合、Atlas は指定した時系列フィールドでコレクションを作成します。 既存のデータベースを指定する必要があります。

ストリーム topicdbcollプロセッサがメッセージごとに異なるターゲットに書き込むようにするために、 、 、 フィールドの値として動的式を使用できます。式はstring として評価される必要があります。

次の形式のメッセージを生成するトランザクション イベントのストリームがあります。

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

これらをそれぞれを個別のApache Kafkaトピックに並べ替えるには、次の $emit ステージを記述します。

{
"$emit": {
"connectionName": "kafka1",
"topic": "$customerStatus"
}
}

この$emitステージ:

  • Very Important IndustriesメッセージをVIPという名前のトピックに書き込みます。

  • N. E. Buddyメッセージをemployeeという名前のトピックに書き込みます。

  • Khan Traktorメッセージをcontractorという名前のトピックに書き込みます。

動的式の詳細については、「式演算子 」を参照してください。

まだ存在しないトピックを指定した場合、Apache Kafka は、それを対象とする最初のメッセージを受信したときに自動的にトピックを作成します。

動的な式でトピックを指定したものの、Atlas Stream Processing が特定のメッセージの式を評価できない場合、構成されている場合、Atlas Stream Processing はそのメッセージをデッドレターキュー(DLQ)に送信し、以降のメッセージを処理します。デッドレターキュー(DLQ)が構成されていない場合、Atlas Stream Processing はそのメッセージを完全にスキップし、以降のメッセージを処理します。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. $sourceステージでは、 という名前のトピックでこれらのレポートを収集するApachemy_weatherdata Kafkaプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれる際に公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTime に設定されます。

  2. $match ステージでは、airTemperature.value30.0 以上であるドキュメントを除外し、airTemperature.value30.0 未満のドキュメントを次のステージに渡します。

  3. $addFields ステージでは、ストリームにメタデータが追加されます。

  4. $emit ステージは、weatherStreamOutput Kafkaブローカー接続を介してstream というトピックに出力を書き込みます。

{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata",
"tsFieldName": "ingestionTime"
}
},
{
"$match": {
"airTemperature.value": {
"$lt": 30
}
}
},
{
"$addFields": {
"processorMetadata": {
"$meta": "stream"
}
}
},
{
"$emit": {
"connectionName": "weatherStreamOutput",
"topic": "stream"
}
}

stream トピックのドキュメントは以下の形式をとります:

{
"st": "x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8, 116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1", "AG1", "UG1", "SA1", "MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight": {
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime": {
"$date": "2024-09-26T17:34:41.843Z"
},
"_stream_meta": {
"source": {
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$tumblingWindow

項目一覧