Using MongoDB Realm WebHooks with Amazon Kinesis Data Firehose
Rate this tutorial
Keep in mind that this is just an example. You do not need to use Atlas as both the source and destination for your Kinesis streams. I am only doing so in this example to demonstrate how you can use MongoDB Atlas as both an AWS Kinesis Data and Delivery Stream. But, in actuality, you can use
, and still use MongoDB Atlas as the destination.
Before we get started, you will need the following:
Realm functions are useful if you need to transform or do some other computation with the data before putting the record into Kinesis. However, if you do not need to do any additional computation, it is even easier with the
. MongoDB offers an AWS Eventbridge partner event source that lets you send Realm Trigger events to an event bus instead of calling a Realm Function. You can configure any Realm Trigger to send events to EventBridge. You can find out more in the documentation: "
AWS Kinesis HTTP(s) Endpoint Delivery Requests are sent via POST with a single JSON document as the request body. Delivery destination URLs must be HTTPS.
Each Delivery Stream Request contains essential information in the HTTP headers, some of which we'll use in our Realm WebHook in a moment.
X-Amz-Firehose-Protocol-Version: This header indicates the version of the request/response formats. Currently, the only version is 1.0, but new ones may be added in the future
X-Amz-Firehose-Request-Id: This value of this header is an opaque GUID used for debugging purposes. Endpoint implementations should log the value of this header if possible, for both successful and unsuccessful requests. The request ID is kept the same between multiple attempts of the same request
X-Amz-Firehose-Source-Arn: The ARN of the Firehose Delivery Stream represented in ASCII string format. The ARN encodes region, AWS account id, and the stream name
X-Amz-Firehose-Access-Key: This header carries an API key or other credentials. This value is set when we create or update the delivery stream. We'll discuss it in more detail later
The body carries a single JSON document, you can configure the max body size, but it has an upper limit of 64 MiB, before compression. The JSON document has the following properties:
requestId: Same as the value in the X-Amz-Firehose-Request-Id header, duplicated here for convenience
timestamp: The timestamp (milliseconds since epoch) at which the Firehose server generated this request
records: The actual records of the Delivery Stream, carrying your data. This is an array of objects, each with a single property of data. This property is a base64 encoded string of your data. Each request can contain a minimum of 1 record and a maximum of 10,000. It's worth noting that a record can be empty
When responding to a Delivery Stream Request, there are a few things you should be aware of.
The HTTP status code must be in the 2xx, 4xx, 5xx range; they will not follow redirects, so nothing in the 3xx range. Only a status of 200 is considered a successful delivery of the records; all other statuses are regarded as a retriable error, except 413.
413 (size exceeded) is considered a permanent failure, and will not be retried. In all other error cases, they will reattempt delivery of the same batch of records using an exponential back-off algorithm.
The retries are backed off using an initial back-off time of 1 second with a jitter factor of 15% . Each subsequent retry is backed off using the formula initial-backoff-time * (multiplier(2) ^ retry_count) with added jitter. The back-off time is capped by a maximum interval of 2 minutes. For example on the 'n'-th retry the back-off time is = MAX(120sec, (1 * (2^n)) * random(0.85, 1.15).
As well as the HTTP status code your response should include the following headers:
Content-Type: The only acceptable content type is application/json
Content-Length: The Content-Length header must be present if the response has a body
Do not send a
Content-Encodingheader, the body must be uncompressed.
Just like the Request, the Response body is JSON, but it has a max filesize of 1MiB. This JSON body has two required properties:
requestId: This must match the requestId in the Delivery Stream Request
timestamp: The timestamp (milliseconds since epoch) at which the server processed this request
If there was a problem processing the request, you could optionally include an errorMessage property. If a request fails after exhausting all retries, the last Instance of this error message is copied to the error output S3 bucket, if one has been configured for the Delivery Stream.
When we configure our Kinesis Delivery Stream, we will have the opportunity to set an AccessKey value. This is the same value which is sent with each request as the
X-Amz-Firehose-Access-Keyheader. We will use this shared secret to validate the source of the request.
We shouldn't hard-code this access key in our Realm function; instead, we will create a new secret named
FIREHOSE_ACCESS_KEY. It can be any value, but keep a note of it as you'll need to reference it later when we configure the Kinesis Delivery Stream.
- Authentication type must be set to system
- The HTTP method is POST
- "Respond with result" is disabled
- Request validation must be set to "No Additional Authorisation"; we need to handle authenticating Requests ourselves using the X-Amz-Firehose-Access-Key header
For our WebHook we need to write a function which:
- Receives a POST request from Kinesis
- Ensures that the
X-Amz-Firehose-Access-Keyheader value matches the
- Parses the JSON body from the request
- Iterates over the reports array and base64 decodes the data in each
- Writes the object to MongoDB Atlas as a new document
- Returns the correct status code and JSON body to Kinesis in the response
One extra we do have within Realm functions is the global context object. This provides access to other Realm functions, values, and services; you may have noticed in the trigger function at the start of this article that we use the context object to access our AWS service. Whereas in the code above we're using the context object to access the
mongodb-atlasservice and to retrieve our secret value. You can read more about what's available in the Realm context in our documentation.
The data in each of these records is Base64 encoded, so we have to decode it first.
Buffer()within Realm functions may currently cause a degradation in performance. Currently we do not recommend using Buffer to decode Base64 strings, but instead to use a function such as
decodeBase64()in the example above.
Once the parsing and decoding are complete, we're left with an array of between 1 and 10,000 objects, depending on the size of the batch. It's tempting to pass this array to
insertMany(), but there is the possibility that some records might already exist as documents in our collection.
Remember if Kinesis does not receive an HTTP status of 200 in response to a request it will, in the majority of cases, retry the batch. We have to take into account that there could be an issue after the documents have been written that prevents Kinesis from receiving the 200 OK status. If this occurs and we try to insert the document again, MongoDB will raise a
Duplicate key errorexception.
To prevent this we perform a
When updating/inserting a single document, you can use
But we could potentially have to update/insert 10,000 records, so instead, we perform a bulk write.
If our write operations have completed successfully, we return an HTTP 200 status code with our response. Otherwise, we return a 500 and include the error message from the exception in the response body.
Now we've finished writing our Realm Function, save and deploy it. Then on the settings tab copy the WebHook URL, we'll need it in just a moment.
To create our Kinesis Delivery Stream we're going to use the AWS CLI, and you'll need the following information:
- Your Kinesis Data Stream ARN
- The ARN of your respective IAM roles, also ensure that service-principal firehose.amazonaws.com is allowed to assume these roles
- Bucket and Role ARNs for the S3 bucket to be used for errors/backups
- MongoDB Realm WebHook URL
- The value of the
Your final AWS CLI command will look something like this:
If everything executes correctly, you should see your new Delivery Stream appear in your Kinesis Dashboard. Also, after a few moments, the WebHook event will appear in your Realm logs and documents will begin to populate your collection!
Now you've seen how you can use MongoDB Realm as an AWS Kinesis HTTP Endpoint you might find our other articles on using MongoDB with Kinesis useful: