New in MongoDB Atlas Stream Processing: External Function Support
July 3, 2025
Today we're excited to introduce External Functions, a new capability in MongoDB Atlas Stream Processing that lets you invoke AWS Lambda, directly from your streaming pipelines. The addition of External Functions to Atlas Stream Processing unlocks new ways to enrich, validate, and transform data in-flight, enabling smarter and more modular event-driven applications. This functionality is available through a new pipeline stage, $externalFunction.
What are external functions?
External functions allow you to integrate Atlas Stream Processing with external logic services such as AWS Lambda. This lets you reuse existing business logic, perform AI/ML inference, or enrich and validate data as it moves through your pipeline, all without needing to rebuild that logic directly in your pipeline definition.
AWS Lambda is a serverless compute service that runs your code in response to events, scales automatically, and supports multiple languages (JavaScript, Python, Go, etc.). Because there’s no infrastructure to manage, Lambda is ideal for event-driven systems. Now, by using external functions, you can seamlessly plug that logic into your streaming workloads.
Where $externalFunction fits in your pipeline
MongoDB Atlas Stream Processing can connect to a wide range of sources and output to various sinks. The diagram below shows a typical streaming architecture: Atlas Stream Processing ingests data, enriches it with stages like $https and $externalFunction, and routes the transformed results to various destinations.

The $externalFunction stage can be placed anywhere in your pipeline (except as the initial source stage) allowing you to inject external logic at any step. Atlas Stream Processing supports two modes for invoking external functions—synchronous and asynchronous.
Synchronous execution type
In synchronous mode, the pipeline calls the Lambda function and waits for a response. The result is stored in a user-defined field (using the “as” key) and passed into the following stages.
let syncEF = { 
 $externalFunction: {
 connectionName: "myLambdaConnection",
 functionName: "arn:aws:lambda:region:account-id:function:function-name",
 execution: "sync",
 as: "response",
 onError: "fail",
 payload: [
 { $replaceRoot: { newRoot: "$fullDocument.payloadToSend" } },
 { $addFields: { sum: { $sum: "$randomArray" }}},
 { $project: { success: 1, sum: 1 }}
 ]
 }
}

Let’s walk through what each part of the $externalFunction stage does in this synchronous setup:
-
connectionName: external function connection name specified in the Connection Registry.
-
functionName: full AWS ARN or the name of the AWS Lambda function.
-
execution: Indicates synchronous execution ("sync") as opposed to asynchronous (“async).
-
as: specifies the Lambda response will be stored in the “response” field.
-
onError: behavior when the operator encounters an error (in this case "fail" stops the processor). The default is to add the event to the dead letter queue.
-
payload: inner pipeline that allows you to customize the request body sent, using this allows you to decrease the size of the data passed and ensure only relevant data is sent to the external function.
This type is useful when you want to enrich or transform a document using external logic before it proceeds through the rest of the pipeline.
Asynchronous execution type
In async mode, the function is called, but the pipeline does not wait for a response. This is useful when you want to notify downstream systems, trigger external workflows, or pass data into AWS without halting the pipeline.
let asyncEF = {
 $externalFunction: {
 connectionName: "EF-Connection",
 functionName: "arn:aws:lambda:us-west-1:12112121212:function:EF-Test",
 execution: "async"
 }
}

Use the async execution type for propagating information outward, for example:
-
Triggering downstream AWS applications or analytics
-
Notifying external systems
-
Firing off alerts or billing logic
Real-world use case: Solar device diagnostics
To illustrate the power of external functions, let’s walk through an example: a solar energy company wants to monitor real-time telemetry from thousands of solar devices. Each event includes sensor readings (e.g., temperature, power output) and metadata like device_id and timestamp. These events need to be processed, enriched and then stored into a MongoDB Atlas collection for dashboards and alerts.
This can easily be accomplished using a synchronous external function. Each event will be sent to a Lambda function that enriches the record with a status (e.g., ok, warning, critical) as well as diagnostic comments. After which the function waits for the enriched events to be returned and then sends them to the desired MongoDB collection.
Step 1: Define the external function connection
First, create a new AWS Lambda connection in the Connection Registry within Atlas. You can authenticate using Atlas's Unified AWS Access, which securely connects Atlas and your AWS account.

2. Implement the lambda function
Here’s a simple diagnostic function. It receives solar telemetry data, checks it against thresholds, and returns a structured result.
export const handler = async (event) => {
 const {
 device_id,
 group_id,
 watts,
 temp,
 max_watts,
 timestamp
 } = event;


 // Default thresholds
 const expectedTempRange = [20, 40]; // Celsius
 const wattsLowerBound = 0.6 * max_watts; // 60% of max output


 let status = "ok";
 let messages = [];


 // Wattage check
 if (watts < wattsLowerBound) {
 status = "warning";
 messages.push(`Observed watts (${watts}) below 60% of max_watts (${max_watts}).`);
 }


 // Temperature check
 if (temp < expectedTempRange[0] || temp > expectedTempRange[1]) {
 status = "warning";
 messages.push(`Temperature (${temp}°C) out of expected range [${expectedTempRange[0]}–${expectedTempRange[1]}].`);
 }


 // If multiple warnings, escalate to critical
 if (messages.length > 1) {
 status = "critical";
 }


 return {
 device_id,
 status,
 timestamp,
 watts_expected_range: [wattsLowerBound, max_watts],
 temp_expected_range: expectedTempRange,
 comment: messages.length ? messages.join(" ") : "All readings within expected ranges."
 };
};

3. Create the streaming pipeline
Using VS Code, define a stream processor using the sample solar stream as input.
let s = {
 $source: {
 connectionName: 'sample_stream_solar'
 }
};


// Define the External Function
let EFStage = {
 $externalFunction: {
 connectionName: "telemetryCheckExternalFunction",
 onError: "fail",
 functionName: "arn:aws:lambda:us-east-1:121212121212:function:checkDeviceTelemetry",
 as: "responseFromLambda",
 }
};
// Replace the original document with the Lambda response
let projectStage = {
 $replaceRoot: {
 newRoot: "$responseFromLambda"
 }
};
// Merge the results into a DeviceTelemetryResults collection
let sink = {
 $merge: {
 into: {
 connectionName: "IoTDevicesCluster",
 db: "SolarDevices",
 coll: "DeviceTelemetryResults"
 }
 }
};


sp.createStreamProcessor("monitorSolarDevices", [s, EFStage, projectStage, sink]);
sp.monitorSolarDevices.start();

Once running, the processor ingests live telemetry data, invokes the Lambda diagnostics logic, and returns enriched results to MongoDB Atlas, complete with status and diagnostic comments.
4. View enriched results in MongoDB Atlas
Explore the enriched data in MongoDB Atlas using the Data Explorer. For example, filter all documents where status = "ok" after a specific date.

Smarter stream processing with external logic
MongoDB Atlas Stream Processing external functions allow you to enrich your data stream with logic that lives outside the pipeline, making your processing smarter and more adaptable. In this example, we used AWS Lambda to apply device diagnostics in real-time and store results in MongoDB. You could easily extend this to use cases in fraud detection, personalization, enrichment from third-party APIs, and more.
Log in today to get started, or check out our documentation to create your first external function. Have an idea for how you'd use external functions in your pipelines? Let us know in the MongoDB community forum!