Definition
The $iceberg stage specifies a connection in the Connection
Registry to an AWS S3 bucket where you can write data to an Apache Iceberg table.
Placement
$iceberg must be the last stage of any pipeline in which it appears. You can use only one $iceberg stage per pipeline.
Syntax
The $iceberg pipeline stage has the following prototype form:
{ "$iceberg": { "connectionName": "<registered-connection>", "bucket": "<target-bucket>", "databaseName": "<database>", "tableName": "<string>" | <expression>, "path": "<key-prefix>", "region": "<target-region>", "mode": "cdc" | "insert", "idFieldName": "<field-name>", "partitionedBy": { "<column-name>": "<partition-transform>", . . . }, "catalog": { "type": "hadoop" | "glue" } } }
The $iceberg stage takes a document with the following fields:
Field | Type | Necessity | Description |
|---|---|---|---|
| string | Required | Name, of the AWS S3 connection to use for reads and writes. This must match the name of the connection in the Connection Registry. |
| string | Required | Name of the S3 bucket containing the target Apache Iceberg database. |
| string | Required | Name of the Apache Iceberg database containing the target table. |
| string | expression | Required | Name of the target Apache Iceberg table. Must be either a string or an expression that evaluates to a string. Use expressions for dynamic routing per-document. |
| string | Required | Prefix key of the path to your Apache Iceberg database. |
| string | Conditional | AWS region of the bucket. Required for stream processors not running on AWS. |
| string | Optional | Strategy for determining which operation to perform per input document.
Defaults to |
| string | Optional | Field and column name used as the row key in Defaults to |
| document | Optional | Partitioning specification. If you don't set this field, Must be a document containing one or more key-value pairs. Each key must be the name of a column against which to perform a partition transform, and each value must the partition transform to use. The first partition transform must be against The partition transform value of a given field must be one of the following:
For more information about Apache Iceberg partition transforms, see the Apache Iceberg documentation. |
| document | Optional | Document defining the Iceberg catalog to use. Must be a document containing the |
Behavior
If you use the $iceberg stage, it must be the last stage in your stream processor.
Atlas Stream Processing supports the $iceberg stage for only SP10, SP30, and SP50 stream processors. Your processor tier determines the maximum number of tables supported for dynamic routing:
Tier | Max Tables |
|---|---|
SP10 | 5 |
SP30 | 10 |
SP50 | 50 |
The $iceberg stage infers the schema of the resulting Apache Iceberg table from the schema of the stream processor's output data. As Atlas Stream Processing observes new fields in the stream, the table schema evolves accordingly.
If you specify a table that doesn't already exist, Apache Iceberg creates the table when it receives the first message that targets it.
Atlas Stream Processing guarantees at-least-once processing for output to Apache Iceberg tables.
You can use a dynamic expression as the value of the tableName field. By using a dynamic expression to capture document-specific values, you can route input documents to different tables according to these values. The expression must evaluate to a string. For an example, see Dynamic Routing. To learn more see expression operators.
If you specify a topic with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue, if it's configured, and processes subsequent messages. If there's no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.
Type Conversion
Atlas Stream Processing performs type conversion from BSON to Iceberg primitive types when writing to tables in the $iceberg stage.
BSON | Apache Iceberg Primitive | Details |
|---|---|---|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| Hexadecimal-encoded |
|
| Stringified UUID |
|
| Doesn't apply to UUID |
|
| UTC time, measured to the microsecond |
|
| UTC time, measure to the microsecond |
|
| Serialized as a Basic JSON string |
|
| Serialized as a Basic JSON string |
Other BSON types are unsupported. Atlas Stream Processing sends documents with unsupported BSON types to the DLQ.
Examples
The follow examples illustrate various applications of the $iceberg stage.
Archiving a Changestream
The following example demonstrates how to write the initial contents and changestream of an Atlas database to an Apache Iceberg table in an append-only fashion, creating a durable archive of that database's operational history. This aggregation has two stages:
The
$sourcestage establishes a connection with the Atlas database, specifically targeting theorderscollection in thedbdatabase. It enables initial sync to capture documents in the database at the time of processor activation, and ensures that the full document is captured by each changestream event.The
$icebergstage establishes a connection to the AWS S3 bucket, writing to a table namedmyTablein theiceberg-warehouse/path. By specifying onlyinsertoperations, it ensures an append-only, log-style write flow.
{ "$source": { "connectionName": "atlas1", "db": "db", "coll": "orders", "initialSync": { "enable": true }, "config": { "fullDocument": "required" } }, "$iceberg": { "connectionName": "myS3Connection", "bucket": "myData", "path": "iceberg-warehouse/", "tableName": "myTable", "mode": "insert" } }
Mirroring a Collection
The following example demonstrates how to mirror an Atlas collection in its entirety to an Apache Iceberg table.
Before defining the aggregation, set the following variable:
const isDeleteExpr = {$eq: [{$meta: "stream.source.operationType"}, "delete"]};
The following aggregation adds, updates, and deletes Apache Iceberg table entries in sync with changes to the Atlas source collection. It has four stages:
The
$sourcestage establishes a connection with the Atlas database, specifically targeting theorderscollection in thedbdatabase. It enables initial sync to capture documents in the database at the time of processor activation, and ensures that the full document is captured by each changestream event.The
$matchstage filters onoperationTypeso that only documents with valid operation type declarations are processed.The
$replaceRootstage changes the document root depending on the type of operation.For delete operations, it changes the document root to the document's key. This results in a record that the document was deleted but excludes its content from further processing.
For all other operations, it changes the document root to the
fullDocument, passing the content of the document along for further processing while excluding changestream metadata.
The
$icebergstage establishes a connection to the AWS S3 bucket, writing to an Apache Iceberg table namedmyTablein theiceberg-warehouse/path. Incdcmode, this stage determines the operation to perform against the Apache Iceberg table by reading from thestream.source.operationTypemetadata field of each document.
{ "$source": { "connectionName": "atlas1", "db": "db", "coll": "orders", "initialSync": { "enable": true }, "config": { "fullDocument": "required" } }, "$match": { "operationType": { "$in": ["insert", "update", "delete", "replace"] } }, "$replaceRoot": { "newRoot": { "$cond": { "if": isDeleteExpr, "then": "$documentKey", "else": "$fullDocument" } } } "$iceberg": { "connectionName": "myS3Connection", "bucket": "myData", "path": "iceberg-warehouse/", "tableName": "myTable", "mode": "cdc" } }
From Multi-Collection Source to Multi-Table Apache Iceberg Target
In the following example, Atlas Stream Processing uses dynamic expressions to dynamically route documents to various output targets.
The
$sourcestage establishes a connection with the Atlas database, specifically targeting the collectionsa,b, andcin thedbdatabase. It enables initial sync to capture documents in the database at the time of processor activation, and ensures that the full document is captured by each changestream event.The
$matchstage filters for documents whoseoperationTypeis one of"insert","update","delete", or"replace".The
$replaceRootstage changes the document root depending on the type of operation.For delete operations, it changes the document root to the document's key. This results in a record that the document was deleted but excludes its content from further processing.
For all other operations, it changes the document root to the
fullDocument, passing the content of the document along for further processing while excluding changestream metadata.
The
$icebergstage establishes a connection to the AWS S3 bucket namedmyData, writing to an Apache Iceberg table in theiceberg-warehouse/path. It determines the name of the table according to the name of the sourcecollectionas retrieved from document metadata. It also determines the operation to perform according to document metadata.
{ "$source": { "connectionName": "atlas1", "db": "db", "coll": ["a", "b", "c"], "initialSync": { "enable": true }, "config": { "fullDocument": "required" } } }, { "$match": { "operationType": { "$in": ["insert", "update", "delete", "replace"] } } }, { "$replaceRoot": { "newRoot": { "$cond": { "if": { "$eq": [{ "$meta": "stream.source.operationType" }, "delete"] }, "then": "$documentKey", "else": "$fullDocument" } } } }, { "$iceberg": { "connectionName": "myS3Connection", "databaseName": "iceberg-db", "bucket": "myData", "path": "iceberg-warehouse/", "tableName": { "$meta": "stream.source.ns.coll" }, "mode": "cdc" } }