For AI agents: a documentation index is available at https://www.mongodb.com/docs/llms.txt — markdown versions of all pages are available by appending .md to any URL path.
Docs Menu

$iceberg Aggregation Stage

The $iceberg stage specifies a connection in the Connection Registry to an AWS S3 bucket where you can write data to an Apache Iceberg table.

$iceberg must be the last stage of any pipeline in which it appears. You can use only one $iceberg stage per pipeline.

The $iceberg pipeline stage has the following prototype form:

{
"$iceberg": {
"connectionName": "<registered-connection>",
"bucket": "<target-bucket>",
"databaseName": "<database>",
"tableName": "<string>" | <expression>,
"path": "<key-prefix>",
"region": "<target-region>",
"mode": "cdc" | "insert",
"idFieldName": "<field-name>",
"partitionedBy": {
"<column-name>": "<partition-transform>",
. . .
},
"catalog": {
"type": "hadoop" | "glue"
}
}
}

The $iceberg stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Required

Name, of the AWS S3 connection to use for reads and writes. This must match the name of the connection in the Connection Registry.

bucket

string

Required

Name of the S3 bucket containing the target Apache Iceberg database.

databaseName

string

Required

Name of the Apache Iceberg database containing the target table.

tableName

string | expression

Required

Name of the target Apache Iceberg table. Must be either a string or an expression that evaluates to a string. Use expressions for dynamic routing per-document.

path

string

Required

Prefix key of the path to your Apache Iceberg database.

region

string

Conditional

AWS region of the bucket. Required for stream processors not running on AWS.

mode

string

Optional

Strategy for determining which operation to perform per input document.

  • "cdc" causes Atlas Stream Processing to determine operation type by reading the stream.source.operationType metadata field.

  • "insert" causes Atlas Stream Processing to append each document to the target table as a new row, ignoring operation type declarations in the stream.source.operationType metadata field.

Defaults to cdc.

idFieldName

string

Optional

Field and column name used as the row key in cdc mode.

Defaults to "_id".

partitionedBy

document

Optional

Partitioning specification. If you don't set this field, $iceberg sets a default partition transform for the idFieldName column.

Must be a document containing one or more key-value pairs. Each key must be the name of a column against which to perform a partition transform, and each value must the partition transform to use. The first partition transform must be against idFieldName.

The partition transform value of a given field must be one of the following:

  • "identity"

  • "year"

  • "month"

  • "day"

  • "hour"

  • { truncate: int }

  • { bucket: int }

For more information about Apache Iceberg partition transforms, see the Apache Iceberg documentation.

catalog

document

Optional

Document defining the Iceberg catalog to use. Must be a document containing the type field with a value of either "hadoop" or "glue".

If you use the $iceberg stage, it must be the last stage in your stream processor.

Atlas Stream Processing supports the $iceberg stage for only SP10, SP30, and SP50 stream processors. Your processor tier determines the maximum number of tables supported for dynamic routing:

Tier
Max Tables

SP10

5

SP30

10

SP50

50

The $iceberg stage infers the schema of the resulting Apache Iceberg table from the schema of the stream processor's output data. As Atlas Stream Processing observes new fields in the stream, the table schema evolves accordingly.

If you specify a table that doesn't already exist, Apache Iceberg creates the table when it receives the first message that targets it.

Atlas Stream Processing guarantees at-least-once processing for output to Apache Iceberg tables.

You can use a dynamic expression as the value of the tableName field. By using a dynamic expression to capture document-specific values, you can route input documents to different tables according to these values. The expression must evaluate to a string. For an example, see Dynamic Routing. To learn more see expression operators.

If you specify a topic with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue, if it's configured, and processes subsequent messages. If there's no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.

Atlas Stream Processing performs type conversion from BSON to Iceberg primitive types when writing to tables in the $iceberg stage.

BSON
Apache Iceberg Primitive
Details

string

string

int

int

long

long

double

double

bool

boolean

ObjectId

string

Hexadecimal-encoded

UUID

string

Stringified UUID

BinData

binary

Doesn't apply to UUID

date

timestamptz

UTC time, measured to the microsecond

timestamp

timestamptz

UTC time, measure to the microsecond

object

string

Serialized as a Basic JSON string

array

string

Serialized as a Basic JSON string

Other BSON types are unsupported. Atlas Stream Processing sends documents with unsupported BSON types to the DLQ.

The follow examples illustrate various applications of the $iceberg stage.

The following example demonstrates how to write the initial contents and changestream of an Atlas database to an Apache Iceberg table in an append-only fashion, creating a durable archive of that database's operational history. This aggregation has two stages:

  1. The $source stage establishes a connection with the Atlas database, specifically targeting the orders collection in the db database. It enables initial sync to capture documents in the database at the time of processor activation, and ensures that the full document is captured by each changestream event.

  2. The $iceberg stage establishes a connection to the AWS S3 bucket, writing to a table named myTable in the iceberg-warehouse/ path. By specifying only insert operations, it ensures an append-only, log-style write flow.

{
"$source": {
"connectionName": "atlas1",
"db": "db",
"coll": "orders",
"initialSync": {
"enable": true
},
"config": {
"fullDocument": "required"
}
},
"$iceberg": {
"connectionName": "myS3Connection",
"bucket": "myData",
"path": "iceberg-warehouse/",
"tableName": "myTable",
"mode": "insert"
}
}

The following example demonstrates how to mirror an Atlas collection in its entirety to an Apache Iceberg table.

Before defining the aggregation, set the following variable:

const isDeleteExpr = {$eq: [{$meta: "stream.source.operationType"}, "delete"]};

The following aggregation adds, updates, and deletes Apache Iceberg table entries in sync with changes to the Atlas source collection. It has four stages:

  1. The $source stage establishes a connection with the Atlas database, specifically targeting the orders collection in the db database. It enables initial sync to capture documents in the database at the time of processor activation, and ensures that the full document is captured by each changestream event.

  2. The $match stage filters on operationType so that only documents with valid operation type declarations are processed.

  3. The $replaceRoot stage changes the document root depending on the type of operation.

    • For delete operations, it changes the document root to the document's key. This results in a record that the document was deleted but excludes its content from further processing.

    • For all other operations, it changes the document root to the fullDocument, passing the content of the document along for further processing while excluding changestream metadata.

  4. The $iceberg stage establishes a connection to the AWS S3 bucket, writing to an Apache Iceberg table named myTable in the iceberg-warehouse/ path. In cdc mode, this stage determines the operation to perform against the Apache Iceberg table by reading from the stream.source.operationType metadata field of each document.

{
"$source": {
"connectionName": "atlas1",
"db": "db",
"coll": "orders",
"initialSync": {
"enable": true
},
"config": {
"fullDocument": "required"
}
},
"$match": {
"operationType": {
"$in": ["insert", "update", "delete", "replace"]
}
},
"$replaceRoot": {
"newRoot": {
"$cond": {
"if": isDeleteExpr,
"then": "$documentKey",
"else": "$fullDocument"
}
}
}
"$iceberg": {
"connectionName": "myS3Connection",
"bucket": "myData",
"path": "iceberg-warehouse/",
"tableName": "myTable",
"mode": "cdc"
}
}

In the following example, Atlas Stream Processing uses dynamic expressions to dynamically route documents to various output targets.

  1. The $source stage establishes a connection with the Atlas database, specifically targeting the collections a, b, and c in the db database. It enables initial sync to capture documents in the database at the time of processor activation, and ensures that the full document is captured by each changestream event.

  2. The $match stage filters for documents whose operationType is one of "insert", "update", "delete", or "replace".

  3. The $replaceRoot stage changes the document root depending on the type of operation.

    • For delete operations, it changes the document root to the document's key. This results in a record that the document was deleted but excludes its content from further processing.

    • For all other operations, it changes the document root to the fullDocument, passing the content of the document along for further processing while excluding changestream metadata.

  4. The $iceberg stage establishes a connection to the AWS S3 bucket named myData, writing to an Apache Iceberg table in the iceberg-warehouse/ path. It determines the name of the table according to the name of the source collection as retrieved from document metadata. It also determines the operation to perform according to document metadata.

{
"$source": {
"connectionName": "atlas1",
"db": "db",
"coll": ["a", "b", "c"],
"initialSync": {
"enable": true
},
"config": {
"fullDocument": "required"
}
}
},
{
"$match": {
"operationType": {
"$in": ["insert", "update", "delete", "replace"]
}
}
},
{
"$replaceRoot": {
"newRoot": {
"$cond": {
"if": {
"$eq": [{ "$meta": "stream.source.operationType" }, "delete"]
},
"then": "$documentKey",
"else": "$fullDocument"
}
}
}
},
{
"$iceberg": {
"connectionName": "myS3Connection",
"databaseName": "iceberg-db",
"bucket": "myData",
"path": "iceberg-warehouse/",
"tableName": { "$meta": "stream.source.ns.coll" },
"mode": "cdc"
}
}