Building an Async Networking Layer for mongos
Many people run their MongoDB servers in a sharded cluster. In such a setup, a mongos sits between the user’s application and their sharded data. Clients connect to the mongos and send it queries, and mongos routes those queries to one or more shards to be fulfilled.
In most cases, mongos
can pinpoint a single shard for each given query. However, some queries require “scatter gather” routing; in other words, mongos
has to send the query to all shards, wait for their responses, and assemble them into a single master response. We could fan these requests out to shards serially, but then one slow connection would block mongos’ entire system. To do this efficiently, we needed a way to run requests concurrently.
Given the structure of the networking code in MongoDB 3.0, the only way to run requests concurrently was to run them in different threads. Some clusters have hundreds of shards—that’s a lot of requests to fan out. You can imagine what might occur in a mongos
handling many requests: thread explosion!! Having too many threads can bog down a system, causing contention over hardware resources.
In 3.2, we wrote an alternate solution: asynchronous outbound networking for mongos
. This new networking layer eliminates our thread explosion problem, but this new implementation brought with it difficult memory management challenges. It took a lot of experimentation, failure, iteration, and above all, obsessive testing to implement a new callback driven, asynchronous system.
The lifetime of a network request
Let’s break down the problem. Imagine we want to run a find
command on another shard. This request goes through several stages over its lifetime. First, we open a connection to the remote host. Then, we authenticate our connection. Next, we send our find
command. Once we receive a response we are done, and can return to the calling code. We might close the connection, or we might return it to a connection pool to be recycled.
Each of these steps requires at least one network operation. Some, like authentication, require several. The handshake for authentication can require at least two complete host-to-remote-and-back communications, first for the receipt of a nonce, then for the actual authentication request.
If your servers are half a world apart, this could take a while! In a mongos
fanning out requests serially, if just one of these exchanges is slow it will block all pending requests.
Building an asynchronous networking layer
The first thing to realize is that we need a separation of threads. We don’t want mongos threads, one per connection, doing the work of scheduling and waiting for network requests. Instead, we can offload this work to an execution engine with its own thread pool. The execution engine maintains a queue of work items. It uses its thread pool to pop tasks off this queue and run them. The standalone ASIO library provides the execution engine and many other primitives for our new networking layer.
We need to package our tasks into bite-sized work items that the execution engine can run. This means that none of the tasks can perform any blocking work, otherwise they’d block the whole engine. So, rather than opening a socket and waiting for it to connect, we want to kick off an asynchronous “open connection” task and return immediately.
We can package up the asynchronous “open connection” logic as a task to enqueue on the execution engine. Then, when “open connection” completes, it can enqueue the next asynchronous task, “ask for nonce,” on the engine. Similarly, once we’ve asked for a nonce, we’ll enqueue the “get nonce” task to receive our answer from the remote host.
We proceed this way, with each task enqueuing the next one on our execution engine. Because each task is an independent bit of work, the execution engine is not tied to any specific request or connection. It can handle many requests to different hosts concurrently.
When the final task in a request cycle is run by the execution engine, it triggers a callback in mongos
. This allows a mongos
thread to pick up the response and begin to assemble its own master response for the user application.
In essence, we are tossing requests back and forth between two thread pools, one for mongos
logic and one for network operations. With such a system, we can have a fixed or configurable number of threads instead of having one per connection.
Let’s look at some of the technical challenges that we encountered as we implemented this.
Technical challenge #1: Vanishing state
Before diving right into how state was vanishing from under our noses, let’s look at some features C++ provides to help us implement callback-driven systems. In particular, C++11 lambdas were an important part of this project.
Lambdas package tasks
A lambda is a callable entity. In C++, it has three parts: a capture list, parameters, and a body. The capture list snapshots existing variables when the lambda is instantiated. Parameters are passed in when the lambda is called. The body of the lambda executes when it is called.
auto lambda = [capture list](parameters){
 // body
};

lambda(); // runs body

Let’s take a closer look at the capture list. The following code shows a lambda that captures a variable N from its environment and prints N out when called later in the code.
int N = 1;
auto print_number = [N](){
 std::cout << N << std::endl;
};

print_number(); // prints “1”

Lambdas can capture variables either by value or by reference. By default, they capture variables by value and make a copy:
int N = 1;
auto print_number = [N](){
 // We have our own copy of N
 std::cout << N << std::endl;
};

N = 123456;

print_number(); // still prints “1”

When lambdas capture variables by reference, they’ll use the original variable instead of making a copy:
int N = 1;
auto print_number = [&N](){
 // Now, we use the original N
 std::cout << N << std::endl;
};

N = 123456;

print_number(); // prints “123456”

If we capture by reference, we can avoid making expensive copies of variables. Also, copying some objects doesn’t make any sense; we need the original. Consider a Timer
class that tracks the elapsed time since its construction. To get a reliable time, we need a reference to the original timer, not a copy. Timers
might not even allow themselves to be copied, because what would a copied Timer
do? Should the copy start over from 0:00, or should it begin timing at the original Timer’s
elapsed time? One could argue for either behavior.
What’s a little lambda to do, off in the world with a reference to a cleaned-up variable? Seg fault, of course.
But back to networking. Opening connections is slow, as we discussed, so let’s try writing an asynchronous open_connection()
method using a lambda:
void open_connection(Command cmd) {
 tcp::socket sock(_engine);

 // pass a lambda to async_connect
 async_connect(sock,[cmd](error_code err) {
 if (!err) {
 authenticate(sock, cmd);
 }
 });

 return;
}

Here, we call an async_connect()
method that takes a socket as its first parameter and a lambda as its second parameter. When called, this lambda checks for network errors and starts the next task, authenticate()
.
The async_connect()
method will call the lambda when the network operation completes: this could be soon if our servers are close to each other, or not-so-soon if they are half a world apart. We can’t know exactly when the lambda will be called. In the meantime, open_connection()
returns immediately.
This is fine. But what if we want to know exactly how long it takes to run async_connect()
? Let’s use the hypothetical Timer
class described above. We can’t copy it, so we’ll capture timer
by reference:
void open_connection(Command cmd) {
 tcp::socket sock(_engine)
 Timer timer; // starts timing now

 async_connect(sock,[&timer, cmd](error_code err) {
 std::cout << timer.secs() << “ seconds” << std::endl;
 if (!err) {
 authenticate(sock, cmd);
 }
 });

 return;
}

This is not fine. In fact, we have a huge problem on our hands. The lambda passed into async_connect()
has a reference to timer
. We don’t know when the lambda will get called, but we do know that it won’t happen immediately. But open_connection()
will return immediately, and when it returns its stack will vanish. We declared timer
on the stack!
What’s a little lambda to do, off in the world with a reference to a cleaned-up variable? Seg fault, of course.
We need to ensure that all necessary state is packaged up with each asynchronous task we run. For things that can’t be neatly copied and packaged, like Timers
, we need to ensure that referenced state outlives each task.
We have two choices for how to keep such state alive.
Solution A: Store state in a persistent structure
Our first option is to keep our state somewhere besides the stack. We could maintain a vector of Timer
objects, one for each command we begin running. Then, each task involved in completing that command would reference the stored Timer
.
This approach is nice because we have control over the Timers
and their lifetimes. They never get cleaned up underneath us because we are in charge of cleaning them up.
The downside here is exactly that: we are in charge of cleaning up the Timers
. This requires overhead that we may not want to deal with, and that we absolutely have to get right.
Solution B: Use C++ shared_ptr to keep state alive
Our other option is to use C++ shared_ptrs
. A shared_ptr
looks and acts like a regular pointer, except it also holds a reference count that tracks the users using the pointer. The shared_ptr
keeps the pointed-to object alive until all of its users have released it.
Instead of capturing Timers
by reference, we can take them by shared_ptr
into the lambda. We are guaranteed that the Timer won’t be cleaned up until the lambda is done using it. The Timer
then bounces along from task to task, until we finish the command and release it.
Using shared_ptrs
also has pros and cons. A major plus is that this implementation is dead-simple: there’s no overhead needed to maintain our own set of persistent Timers
.
But, because we’ve ceded control over the Timers
to C++, we can’t make assumptions about their lifetime. They won’t get cleaned up underneath us, but otherwise we can’t be sure when they’re still around. Overuse of shared_ptrs
can lead to some nasty and hard-to-detect bugs. For those interested, suggested reading is this extensive blog post. We would have to proceed with caution down this path.
A tale of two solutions
For MongoDB’s networking layer, there was no one-size-fits-all-cases solution. For some state, it made sense to use persistent structures. For others, shared_ptr
was the cleaner, safer solution. We used a hybrid approach that mixed the two solutions.
Technical challenge #2: Vanishing state (again!)
The picture I gave you earlier ignored aborts, but there are several ways the “lifetime of a request” can be cut short before completing, and that adds several more opportunities to lose state. Say that while sending our command we experience a network error. In this case, there’s no point in continuing our efforts to communicate with the remote host; it is unreachable, so we exit the state machine early, cleaning up the heap-allocated state we've been passing along (depicted below as an envelope):
This is fine, because network errors occur on what I call the primary path of execution. This path is shown above in the blue dotted lines. The primary path is the code in bodies of the lambdas of each task in the cycle. This is where we would receive network errors and where we would decide whether or not to enqueue the next task in the chain.
A network request can also get cut short if a mongos
thread decides to cancel it. Say mongos
is running a find command with a limit of 5 results. If we’ve already received 5 records from one shard, we might as well cancel any outstanding requests to other shards.
Our first approach to cancellations was to forcibly cancel the operation from the mongos
thread. The mongos
thread would clean up some state, mark the op as cancelled, and that was that!
Except it wasn’t. The mongos
thread runs on a secondary path of execution, shown above in the solid red lines. If the “send command” task, a lambda on the primary path, was already running or enqueued on the engine, it would not know its operation state had been stomped on. When “send command” completes, it will attempt to continue the state machine:
This, as you’ve probably guessed, is bad. The memory containing that state might have been reused for another operation by then, for example.
One path to rule them all
To prevent such mishaps, we enforced a rule: only the primary path can end a task, because only the primary path has complete knowledge of the operation.
We first implemented this using a simple cancelled flag that was stored with the operation.
// Basic “network operation” class
class NetworkOp {
 bool cancelled;
}

// Secondary path
cancel(NetworkOp *op) {
 op->cancelled = true;
}

// Primary path
if (op->cancelled) {
 done(op);
}

When the mongos
thread, on the secondary path, wanted to cancel an operation, it would simply request a cancellation by setting cancelled
to true. Upon running, the primary path would check the cancelled
flag and cancel itself if requested. This way, the actual cancellation happens on the primary path, not the secondary path.
This implementation is better, but it still has problems. Imagine that the mongos
thread waits to cancel an operation until the last possible moment, while the primary path is cleaning itself up. These paths are running on separate threads, so this could be happening concurrently. If the operation completes before the cancellation goes through, the act of cancelling might require state that has been cleaned up! Danger!
It’s clear from looking at the code that we’re headed straight for a segmentation fault:
// Secondary path
cancel(NetworkOp *op) {
 // op could be a null pointer!
 op->cancelled = true;
}

Keep it locked, keep it safe
We need to protect our shared state, which we can do with a mutex. But where should the mutex go? We can’t put it inside our NetworkOp
class. Much like the Timer
objects from above, the mutex has to be stored somewhere independent of the operation, with a lifetime that we can reason about. First we thought about storing these in a persistent structure, but we had a cascading problem on our hands: who cleans up the thing that manages cleanup?
Given the nature of this problem, shared_ptr
was the better solution. We designed a structure, called a SafeOp
, that stores a mutex and a NetworkOp*
:
// “network operation” class
class NetworkOp {
 bool cancelled;
}

// "access control" object
class SafeOp {
 mutex lock;
 NetworkOp* op;
}

Instead of handling bare NetworkOp
pointers, both paths hold a shared_ptr
to a SafeOp object. Both sides also agree on a mutual contract: they will not access or alter the NetworkOp
without first locking the SafeOp
’s mutex.
// Primary path
done(shared_ptr<SafeOp> safe) {
 // lock before
 // cleanup 
 safe->lock.lock();

 safe->op->done();
 safe->op = NULL;

 safe.unlock();

}

// Secondary path
cancel(shared_ptr<SafeOp> safe) {
 // once we lock, can't
 // change under us
 safe->lock.lock();

 if (safe->op) {
 safe->op->cancelled =
 true;
 }

 safe->lock.unlock();
}

With this solution, we avert our problematic scenario and achieve exactly the semantics we want:
Implement, test, repeat
It took sweat, tears, and hours of head-scratching, and as is usually the case in software, our first attempts were not always the best implementation. We had to iterate, iterate, seg fault, and iterate again. And, test, test, test! When developing something new and complex, like this project, we need to fail fast and often to make the best product possible. We wrote tests at many levels (unit tests, integration tests, stress tests, etc) to test the networking layer up, down, and sideways.
So how exactly did we test our new callback-driven, asynchronous networking system? That’s a topic for another time.