定義
$emit
ステージは、メッセージを出力する 接続レジストリで接続を指定します。接続は、 Apache Kafka プロバイダーまたは時系列コレクション のいずれかである必要があります。
構文
Apache Kafka ブロック
処理されたデータをApache Kafkaプロバイダーに書き込むには、次のプロトタイプ形式で $emit
パイプラインステージを使用します。
{ "$emit": { "connectionName": "<registered-connection>", "topic": "<target-topic>" | <expression>, "config": { "acks": <number-of-acknowledgements>, "compression_type": "<compression-type>", "dateFormat": "default" | "ISO8601", "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<serialization-type>", "outputFormat": "<json-format>", "tombstoneWhen": <expression> } } }
$emit
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 | |||||
---|---|---|---|---|---|---|---|---|
| string | 必須 | データを取り込む接続の名前(接続レジストリに表示されます)。 | |||||
| string |式 | 必須 | メッセージの送信先Apache Kafkaトピックの名前。 | |||||
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 | |||||
| 整数 | 任意 | 成功した デフォルト値は
| |||||
| string | 任意 | 制作者が生成するすべてのデータの圧縮形式。デフォルトは「なし」になっています(つまり、圧縮は行われません)。有効な値は次のとおりです。
圧縮はデータの完全なバッチに使用されるため、バッチ処理の効率は圧縮率に影響します。バッチ処理が多いほど、圧縮率が向上します。 | |||||
| string | 任意 | 日付値のための日付フォーマット。有効な値は次のとおりです。
以下に例を挙げます。 次の入力をご覧ください。
| |||||
| 式 | 任意 | 出力メッセージに追加する ヘッダー 。 式は、オブジェクトまたは配列のいずれかに評価される必要があります。 式がオブジェクトと評価される場合、Atlas Stream Processing は、そのオブジェクト内の各キーと値のペアからヘッダーを構築します。キーはヘッダー名、値はヘッダー値です。 式が配列と評価される場合は、キーと値のペア オブジェクトの配列の形式になる必要があります。 例:
Atlas Stream Processing は、配列内の各オブジェクトからヘッダーを構築します。キーはヘッダー名で、値はヘッダー値です。Atlas Stream Processing は次の種類のヘッダー値をサポートしています。
| |||||
| オブジェクト | string | 式 | 任意 | Apache Kafkaメッセージ キーとして評価される 式 。
| |||||
| string | 条件付き | Apache Kafkaキー データを直列化するために使用されるデータ型。次のいずれかの値である必要があります。
デフォルトは | |||||
| string | 任意 | Apache Kafkaにメッセージを発行するときに使用するJSON形式。次のいずれかの値である必要があります。
デフォルトは | |||||
| 式 | 任意 | Kafkaに 式がブール値値に評価できない場合、または評価できない場合、Atlas Stream Processing はドキュメントをDLQ に書込みます。 この設定を使用すると、 |
Atlas 時系列コレクション
処理されたデータを Atlas 時系列コレクションに書き込むには、次のプロトタイプ形式で$emit
パイプライン ステージを使用します。
{ "$emit": { "connectionName": "<registered-connection>", "db": "<target-db>" | <expression>, "coll": "<target-coll>" | <expression>, "timeseries": { <options> } } }
$emit
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 必須 | データを取り込む接続の名前(接続レジストリに表示されます)。 |
| string |式 | 必須 | ターゲット時系列コレクションを含む Atlas データベースに解決される、または式の名前。 |
| string |式 | 必須 | 書込み先の Atlas時系列コレクションに解決される、または式の名前。 |
| ドキュメント | 必須 | コレクションの時系列フィールドを定義するドキュメント。 |
注意
時系列コレクション内のドキュメントの最大サイズは4 MB です。 詳細については「時系列コレクションの制限 」を参照してください。
AWS S3
処理済みデータを AWS S3 バケットシンク接続に書き込むには、次のプロトタイプ形式で $emit
パイプラインステージを使用します:
{ "$emit": { "connectionName": "<registered-connection>", "bucket": "<target-bucket>", "region": "<target-region>", "path": "<key-prefix>" | <expression>, "config": { "writeOptions": { "count": <doc-count>, "bytes": <threshold>, "interval": { "size": <unit-count>, "unit": "<time-denomination>" } }, "delimiter": "<delimiter>", "outputFormat": "basicJson" | "canonicalJson" | "relaxedJson", "dateFormat": "default" | "ISO8601", "compression": "gzip" | "snappy", "compressionLevel": <level> } } }
$emit
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 | |||
---|---|---|---|---|---|---|
| string | 必須 | 接続レジストリに表示される、データを書き込む先の接続の名前。 | |||
| string | 必須 | データを書き込む先の S3 バケットの名前。 | |||
| string | 任意 | 対象バケットが存在する AWS リージョンの名前です。ストリーム処理インスタンスを AWS リージョンでホストしている場合、このパラメータはそのリージョンがデフォルトになります。そうでない場合は、ストリーム処理インスタンスのホストリージョンに最も近い AWS リージョンがデフォルトになります。 | |||
| string |式 | 必須 | S3 バケットに書き込まれたオブジェクトのキーのプレフィックス。リテラルのプレフィックス string または、string を評価する式である必要があります。 | |||
| ドキュメント | 任意 | さまざまなデフォルト値を上書きする追加のパラメーターを含むドキュメント。 | |||
| ドキュメント | 任意 | 書き込み動作を制御する追加パラメーターを含むドキュメントです。これらのパラメーターは、どの閾値が最初に満たされるかに応じて書き込み動作をトリガーします。 例えば、取り込まれたドキュメントが | |||
| integer | 任意 | S3 に書き込む各ファイルにグループ化するドキュメントの数。 | |||
| integer | 任意 | S3 にファイルを書き込む前に蓄積される必要がある最小バイト数を指定します。バイト数は、最終出力ファイルのサイズではなく、パイプラインで取り込まれた BSON ドキュメントのサイズによって決まります。 | |||
| ドキュメント | 任意 | ドキュメントを一括書き込みするためのタイマーを デフォルトは 1 分です。いかなる | |||
| integer | 条件付き | ストリームプロセッサが S3 にドキュメントを一括書き込みするまでの、 デフォルトは | |||
| string | 条件付き | 一括書き込みタイマーをカウントする時間の単位です。このパラメーターは以下の値をサポートしています:
デフォルトは | |||
| string | 任意 | 出力されたファイル内の各エントリの間の区切り文字。 デフォルトは | |||
| string | 任意 | S3 に書き込まれる JSON の出力形式を指定します。次のいずれかの値でなければなりません:
デフォルトは「 詳しくは、Basic JSON をご覧ください。 | |||
| string | 任意 | 日付値のための日付フォーマット。有効な値は次のとおりです。
例として、次のレコードをパイプラインに追加した場合:
| |||
| string | 任意 | 使用する圧縮アルゴリズムの名称。次のいずれかの値でなければなりません:
| |||
| string | 条件付き | 出力されるメッセージに適用する圧縮レベルです。 デフォルトは このパラメーターは |
Basic JSON
メッセージの取り込みを容易にするため、Atlas Stream Processing は RelaxedJSON 形式を簡略化した Basic JSON をサポートしています。以下の表は、影響を受けるすべてのフィールドに対するこれらの簡略化の例を示しています。
フィールド型 | Relaxed JSON | basicJson |
---|---|---|
バイナリ |
|
|
日付 |
|
|
小数点 |
|
|
タイムスタンプ |
|
|
ObjectId |
|
|
Negative Infinity |
|
|
Positive Infinity |
|
|
正規表現 |
|
|
UUID |
|
|
動作
$emit
は、表示されるすべてのパイプラインの 最後のステージ である必要があります。 パイプラインごとに使用できる$emit
ステージは 1 つだけです。
ストリーム プロセッサごとに 1 つの Atlas 時系列コレクションにのみ書込み (write) ができます。 存在しないコレクションを指定した場合、Atlas は指定した時系列フィールドでコレクションを作成します。 既存のデータベースを指定する必要があります。
ストリーム topic
db
coll
プロセッサがメッセージごとに異なるターゲットに書き込むようにするために、 、 、 フィールドの値として動的式を使用できます。式はstring として評価される必要があります。
例
次の形式のメッセージを生成するトランザクション イベントのストリームがあります。
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
これらをそれぞれを個別のApache Kafkaトピックに並べ替えるには、次の $emit
ステージを記述します。
{ "$emit": { "connectionName": "kafka1", "topic": "$customerStatus" } }
この$emit
ステージ:
Very Important Industries
メッセージをVIP
という名前のトピックに書き込みます。N. E. Buddy
メッセージをemployee
という名前のトピックに書き込みます。Khan Traktor
メッセージをcontractor
という名前のトピックに書き込みます。
動的式の詳細については、「式演算子 」を参照してください。
まだ存在しないトピックを指定した場合、Apache Kafka は、それを対象とする最初のメッセージを受信したときに自動的にトピックを作成します。
動的な式でトピックを指定したものの、Atlas Stream Processing が特定のメッセージの式を評価できない場合、構成されている場合、Atlas Stream Processing はそのメッセージをデッドレターキュー(DLQ)に送信し、以降のメッセージを処理します。デッドレターキュー(DLQ)が構成されていない場合、Atlas Stream Processing はそのメッセージを完全にスキップし、以降のメッセージを処理します。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
$source
ステージでは、 という名前のトピックでこれらのレポートを収集するApachemy_weatherdata
Kafkaプロバイダーとの接続を確立し、各レコードが後続の集計ステージに取り込まれる際に公開します。このステージではまた、プロジェクションを実行するタイムスタンプフィールドの名前が上書きされ、ingestionTime
に設定されます。$match
ステージでは、airTemperature.value
が30.0
以上であるドキュメントを除外し、airTemperature.value
が30.0
未満のドキュメントを次のステージに渡します。$addFields
ステージでは、ストリームにメタデータが追加されます。$emit
ステージは、weatherStreamOutput
Kafkaブローカー接続を介してstream
というトピックに出力を書き込みます。
{ "$source": { "connectionName": "sample_weatherdata", "topic": "my_weatherdata", "tsFieldName": "ingestionTime" } }, { "$match": { "airTemperature.value": { "$lt": 30 } } }, { "$addFields": { "processorMetadata": { "$meta": "stream" } } }, { "$emit": { "connectionName": "weatherStreamOutput", "topic": "stream" } }
stream
トピックのドキュメントは以下の形式をとります:
{ "st": "x+34700+119500", "position": { "type": "Point", "coordinates": [122.8, 116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1", "AG1", "UG1", "SA1", "MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight": { "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime": { "$date": "2024-09-26T17:34:41.843Z" }, "_stream_meta": { "source": { "type": "kafka", "topic": "my_weatherdata", "partition": 0, "offset": 4285 } } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。