MongoDB Developer Centerchevron-right
Developer Topicschevron-right

Using MongoDB Realm WebHooks with Amazon Kinesis Data Firehose

Aaron BassettPublished Feb 05, 2022 • Updated Sep 23, 2022
Copy Link
facebook icontwitter iconlinkedin icon
random alt
Rate this tutorial
With MongoDB Realm's AWS integration, it has always been as simple as possible to use MongoDB as a Kinesis data stream. Now with the launch of third-party data destinations in Kinesis, you can also use MongoDB Realm and MongoDB Atlas as a AWS Kinesis Data Firehose destination.
Keep in mind that this is just an example. You do not need to use Atlas as both the source and destination for your Kinesis streams. I am only doing so in this example to demonstrate how you can use MongoDB Atlas as both an AWS Kinesis Data and Delivery Stream. But, in actuality, you can use any source for your data that AWS Kinesis supports, and still use MongoDB Atlas as the destination.


Before we get started, you will need the following:

Setting up our Kinesis Data Stream

RaspberryPi 3 with a Sense HAT
In this example, the source of my data is a Raspberry Pi with a Sense HAT. The output from the Sense HAT is read by a Python script running on the Pi. This script then stores the sensor data such as temperature, humidity, and pressure in MongoDB Atlas.
I then use a Realm Database Trigger to transform this data into a Kinesis Data Stream.
Realm functions are useful if you need to transform or do some other computation with the data before putting the record into Kinesis. However, if you do not need to do any additional computation, it is even easier with the AWS Eventbridge. MongoDB offers an AWS Eventbridge partner event source that lets you send Realm Trigger events to an event bus instead of calling a Realm Function. You can configure any Realm Trigger to send events to EventBridge. You can find out more in the documentation: "Send Trigger Events to AWS EventBridge"
You can find out more details on how to do this in our blog post "Integrating MongoDB and Amazon Kinesis for Intelligent, Durable Streams."

Amazon Kinesis Data Firehose Payloads

AWS Kinesis HTTP(s) Endpoint Delivery Requests are sent via POST with a single JSON document as the request body. Delivery destination URLs must be HTTPS.
Delivery Stream Request Headers
Each Delivery Stream Request contains essential information in the HTTP headers, some of which we'll use in our Realm WebHook in a moment.
  • X-Amz-Firehose-Protocol-Version: This header indicates the version of the request/response formats. Currently, the only version is 1.0, but new ones may be added in the future
  • X-Amz-Firehose-Request-Id: This value of this header is an opaque GUID used for debugging purposes. Endpoint implementations should log the value of this header if possible, for both successful and unsuccessful requests. The request ID is kept the same between multiple attempts of the same request
  • X-Amz-Firehose-Source-Arn: The ARN of the Firehose Delivery Stream represented in ASCII string format. The ARN encodes region, AWS account id, and the stream name
  • X-Amz-Firehose-Access-Key: This header carries an API key or other credentials. This value is set when we create or update the delivery stream. We'll discuss it in more detail later
Delivery Stream Request Body
The body carries a single JSON document, you can configure the max body size, but it has an upper limit of 64 MiB, before compression. The JSON document has the following properties:
  • requestId: Same as the value in the X-Amz-Firehose-Request-Id header, duplicated here for convenience
  • timestamp: The timestamp (milliseconds since epoch) at which the Firehose server generated this request
  • records: The actual records of the Delivery Stream, carrying your data. This is an array of objects, each with a single property of data. This property is a base64 encoded string of your data. Each request can contain a minimum of 1 record and a maximum of 10,000. It's worth noting that a record can be empty
Response Format
When responding to a Delivery Stream Request, there are a few things you should be aware of.
Status Codes
The HTTP status code must be in the 2xx, 4xx, 5xx range; they will not follow redirects, so nothing in the 3xx range. Only a status of 200 is considered a successful delivery of the records; all other statuses are regarded as a retriable error, except 413.
413 (size exceeded) is considered a permanent failure, and will not be retried. In all other error cases, they will reattempt delivery of the same batch of records using an exponential back-off algorithm.
The retries are backed off using an initial back-off time of 1 second with a jitter factor of 15% . Each subsequent retry is backed off using the formula initial-backoff-time * (multiplier(2) ^ retry_count) with added jitter. The back-off time is capped by a maximum interval of 2 minutes. For example on the 'n'-th retry the back-off time is = MAX(120sec, (1 * (2^n)) * random(0.85, 1.15).
These parameters are subject to change. Please refer to the AWS Firehose documentation for exact initial back-off time, max back-off time, multiplier, and jitter percentages.
Other Response Headers
As well as the HTTP status code your response should include the following headers:
  • Content-Type: The only acceptable content type is application/json
  • Content-Length: The Content-Length header must be present if the response has a body
Do not send a Content-Encoding header, the body must be uncompressed.
Response Body
Just like the Request, the Response body is JSON, but it has a max filesize of 1MiB. This JSON body has two required properties:
  • requestId: This must match the requestId in the Delivery Stream Request
  • timestamp: The timestamp (milliseconds since epoch) at which the server processed this request
If there was a problem processing the request, you could optionally include an errorMessage property. If a request fails after exhausting all retries, the last Instance of this error message is copied to the error output S3 bucket, if one has been configured for the Delivery Stream.

Storing Shared Secrets

When we configure our Kinesis Delivery Stream, we will have the opportunity to set an AccessKey value. This is the same value which is sent with each request as the X-Amz-Firehose-Access-Key header. We will use this shared secret to validate the source of the request.
We shouldn't hard-code this access key in our Realm function; instead, we will create a new secret named FIREHOSE_ACCESS_KEY. It can be any value, but keep a note of it as you'll need to reference it later when we configure the Kinesis Delivery Stream.
Screenshot of Realm Secrets Screenshot

Creating our Realm WebHook

Before we can write the code for our WebHook, we first need to configure it. The "Configure Service WebHooks guide in the Realm documentation goes into more detail, but you will need to configure the following options:
  • Authentication type must be set to system
  • The HTTP method is POST
  • "Respond with result" is disabled
  • Request validation must be set to "No Additional Authorisation"; we need to handle authenticating Requests ourselves using the X-Amz-Firehose-Access-Key header
Screenshot of Realm function settings
The Realm Function
For our WebHook we need to write a function which:
  • Receives a POST request from Kinesis
  • Ensures that the X-Amz-Firehose-Access-Key header value matches the FIREHOSE_ACCESS_KEY secret
  • Parses the JSON body from the request
  • Iterates over the reports array and base64 decodes the data in each
  • Parses the base64 decoded JSON string into a JavaScript object
  • Writes the object to MongoDB Atlas as a new document
  • Returns the correct status code and JSON body to Kinesis in the response
As you can see, Realm functions are mostly just vanilla JavaScript. We export a function which takes the request and response as arguments and returns the modified response.
One extra we do have within Realm functions is the global context object. This provides access to other Realm functions, values, and services; you may have noticed in the trigger function at the start of this article that we use the context object to access our AWS service. Whereas in the code above we're using the context object to access the mongodb-atlas service and to retrieve our secret value. You can read more about what's available in the Realm context in our documentation.
Decoding and Parsing the Payload Body
When we receive the POST request, we first have to convert the body—which is a JSON string—into a JavaScript object. Then we can iterate over each of the records.
The data in each of these records is Base64 encoded, so we have to decode it first.
Using Buffer() within Realm functions may currently cause a degradation in performance. Currently we do not recommend using Buffer to decode Base64 strings, but instead to use a function such as decodeBase64() in the example above.
This data could be anything, whatever you've supplied in your Delivery Stream, but in this example, it is the MongoDB document sent from our Realm trigger. This document is also a JSON string, so we'll need to parse it back into a JavaScript object.
Writing the Reports to MongoDB Atlas
Once the parsing and decoding are complete, we're left with an array of between 1 and 10,000 objects, depending on the size of the batch. It's tempting to pass this array to insertMany(), but there is the possibility that some records might already exist as documents in our collection.
Remember if Kinesis does not receive an HTTP status of 200 in response to a request it will, in the majority of cases, retry the batch. We have to take into account that there could be an issue after the documents have been written that prevents Kinesis from receiving the 200 OK status. If this occurs and we try to insert the document again, MongoDB will raise a Duplicate key error exception.
To prevent this we perform a find() and updateOne(), with upsert().
When updating/inserting a single document, you can use updateOne() with the upsert option.
But we could potentially have to update/insert 10,000 records, so instead, we perform a bulk write.
Sending the Response
If our write operations have completed successfully, we return an HTTP 200 status code with our response. Otherwise, we return a 500 and include the error message from the exception in the response body.
Our WebHook URL
Now we've finished writing our Realm Function, save and deploy it. Then on the settings tab copy the WebHook URL, we'll need it in just a moment.

Creating an AWS Kinesis Delivery Stream

To create our Kinesis Delivery Stream we're going to use the AWS CLI, and you'll need the following information:
  • Your Kinesis Data Stream ARN
  • The ARN of your respective IAM roles, also ensure that service-principal is allowed to assume these roles
  • Bucket and Role ARNs for the S3 bucket to be used for errors/backups
  • MongoDB Realm WebHook URL
  • The value of the FIREHOSE_ACCESS_KEY
Your final AWS CLI command will look something like this:
If everything executes correctly, you should see your new Delivery Stream appear in your Kinesis Dashboard. Also, after a few moments, the WebHook event will appear in your Realm logs and documents will begin to populate your collection!
Screenshot Kinesis delivery stream dashboard
Screenshot of Realm logs
Screenshot of a collection in MongoDB Atlas

Next Steps

With the Kinesis data now in MongoDB Atlas, we have a wealth of possibilities. We can transform it with aggregation pipelines, visualise it with Charts, turn it into a GraphQL API, or even trigger more Realm functions or services.

Further reading

Now you've seen how you can use MongoDB Realm as an AWS Kinesis HTTP Endpoint you might find our other articles on using MongoDB with Kinesis useful:
If you haven't yet set up your free cluster on MongoDB Atlas, now is a great time to do so. You have all the instructions in this blog post.

Copy Link
facebook icontwitter iconlinkedin icon
Rate this tutorial
SwiftUI Best Practices with Realm

May 12, 2022
Building a Space Shooter Game in Unity that Syncs with Realm and MongoDB Atlas

Sep 02, 2022
Code Example
Building a Mobile Chat App Using Realm – The New and Easier Way

Sep 23, 2022
Easy Realm JWT Authentication with CosyncJWT

May 23, 2022
Table of Contents