Enhancing Streaming and Request Handling Efficiency with gRPC and Coroutines: Part 1

Background

TerrariumDB handles millions of requests per hour. We can distinguish two types:

  • User-facing query: This request is processed under 150 ms 99.9% of the time. Typically, these requests involve sending and receiving a few kilobytes to a few hundred kilobytes of data. For example, we might inquire about the most recent book purchased by a client using an ID. In handling such requests, only one worker is required.
  • Analytic query: This request type may require more time due to the extensive data involved. For instance, we may request the top 100 most popular books sold by company X from 2018 to 2024. The response to such a query could consist of even tens of megabytes and might take a few seconds to process. In handling such requests, all workers (currently 2160) need to be involved.

From a performance perspective, the most challenging were user-facing query requests, because we wanted to make them as fast as possible, and we could have a millions small messages per minute.

A second challenge involved sending large amounts of data for analytic requests. Initially, we were fetching data from all workers, each worker may have a few megabytes of data. However, we realized that in some cases, not all the data was necessary, and fetching everything was simply a waste of time. The better solution was to start streaming data and fetching only necessary data.

This article will concentrate on the gateway section, which involves the creation of a server capable of managing millions of requests per minute and handling thousands of simultaneous requests, even with only 64 threads available. The next article will demonstrate the client-side approach for the gateway to inquire data from workers through streaming.

First approach to the gateway server

The first implementation used QWebSocket to handle all requests. The main disadvantages of such a solution were:

  • Limited performance of a built-in QEventLoop;
  • Keeping connection alive constantly by QWebSocket, which required additional time to check the connection status continuously.
  • Manual data serialization due to QWebSocket sending messages as QString/QByteArray
  • Unsupported streaming in QWebSocket in Qt5.

Initially opting for QWebSocket seemed like a sound decision, as it spared us the need to implement our own thread pool and monitor connection status. However, the expected speed of this solution wasn’t satisfying. The problem with serialization was solved by using JSON format. Because we wanted to improve the performance of user-facing query requests (for QWebsocket we get about 400ms for p99 of requests) and limit the number of fetched data from analytic requests, we needed to find a better solution.

Thread pool

The first challenge was to implement a thread pool that would manage the request handling. Unlike before with QEventLoop, in this situation we had to build everything from scratch. Considering the pool will be dealing with millions of requests every hour, we were aiming for top speed to prevent any slowdowns when requests are pushed and popped from the pool. We couldn’t use a global mutex for the protection queue because a thread will waste a lot of time waiting for its turn. In such case, where we have 64 or even more threads, competition between threads will be huge. The better solution was to allocate a big chunk of vector (like 1 million) and allow each thread to put a task on an available index.

The index will increase from 0 to a maximum value (such as 1 million) before resetting back to 0. There will typically be an available slot, allowing us to push a task without waiting. The counter, which is an atomic value, can increase rapidly without the need for mutex protection. However, we must ensure there is protection in place to prevent pushing a task onto the index if another task is already present.

We opted for a straightforward binary semaphore to signal the availability of a resource at a specific index. This choice speeds up the thread pool operations and minimizes the need for locking mechanisms, as waiting for a semaphore was rarely necessary given the vast number of indexes available (one million).  

Coroutine

Another challenge we faced was managing multiple small requests concurrently. Since most tasks are compact, switching between threads significantly impacts query time. Our approach involves assigning a task to a thread, querying a worker for data (and awaiting the response), and then sending the resulting data.

To optimize this process, we employ a strategy where a thread alternates between assigning tasks to workers and, upon receiving the response, transmitting it to the recipient. A more efficient solution lies in utilizing coroutines, which can execute on any thread and require less thread-specific data (making coroutine switching faster than thread switching).

Img 1. Handling task by coroutine

By leveraging this approach, we can utilize 64 threads to handle various coroutines seamlessly. The primary obstacle was developing a framework to manage these coroutines, as C++20 does not offer native support. Consequently, we designed four classes dedicated to overseeing coroutine functionality:

  • Awaiter: Inform whether the result is ready and suspend the coroutine if not.
  • Awaitable: Hold a result and resume coroutine when set or allow canceling a request. It will also return an Awaiter object when we call co_await on such awaitable (create a link between awaiter and awaitable, by sharing std::coroutine_handle). It is also a RAII wrapper that will destroy std::coroutine_handle when it will not be needed anymore (std::coroutine_handler stores all information about coroutine, like a stack of functions, etc.).
  • Task: Hold the result of a coroutine task. It also stores an Awaitable object, to allow the call co_await on such a task (delegate operator co Awaitable).
  • PromiseType: C++ Coroutine requires a structure that describes how coroutine should behave when suspended, and so on. This class defines all necessary information.
Img 2. Switching coruitne between available threads

struct PromiseType {
  Task get_return_object() {
    return {handle_type::from_promise(*this), _awaitable};
  }

  std::suspend_never initial_suspend() { return {}; }

  std::suspend_never final_suspend() noexcept {
    if (_awaitable) {
      _awaitable->complete();
    }

    return {};
  }

  void return_void() {}

  void unhandled_exception() {}

  std::shared_ptr<Awaitable> awaitable{
      std::makeshared<Awaitable>()};
};

struct Awaiter {
  explicit Awaiter(std::coroutine_handle<>* handler)
      : hp_(handler) {}

  constexpr bool await_ready() const noexcept {
    return false;
  }

  void await_suspend(std::coroutine_handle<> h) {
    *hp_ = h;
  }

  constexpr void await_resume() const noexcept {}

 private:
  std::coroutine_handle<>* hp_;
};
class Awaitable {
 public:
  ~Awaitable() {
    if (_awaited) {
      _h.destroy();
    }
  }

  void cancel() {
    if (_awaited) {
      _h.destroy();
      _awaited = false;
    }
  }

  auto operator co_await() {
    return Awaiter { &_h; };
  }

  void complete() {
    if (_awaited) {
      _h.resume();
      _awaited = false;
    }
    resultready = true;
  }

  bool ready() const { return resultready; }

  void add_handle(std::coroutine_handle<> h) {
    _h = h;
    _awaited = true;
  }

 private:
  bool resultready{false};
  bool _awaited{false};
  std::coroutine_handle<> _h;
};
class Task {
 public:
  using handle_type = std::coroutine_handle<PromiseType>;

  Task(handle_type handle,
       std::shared_ptr<Awaitable> awaitable)
      : _handle(handle), _awaitable(std::move(awaitable)) {}

  auto operator co_await() {
    return awaitable->operator coawait();
  }

 private:
  handle_type _handle;
  std::shared_ptr<Awaitable> _awaitable;
};

Listing 1. Basic implementation for coroutine framework

Connecting everything allows spawning a Task object that will store a result when the worker answers in the future. This lets us avoid intense thread switching because coroutine can be handled on any thread. We just need to store the Awaitable object. To make it easier to use, we store it as std::shared_ptr.

Task<int> counter(auto& awaitable) {
  const auto i = co_await awaitable;
  std::cout << "inside counter fun! i: " << i << std::endl;
}

int main() {
  for (int i = 0; i < 3; ++i) {
    Awaitable<int> awaitable;
    counter(awaitable);

    std::cout << "In main function! i: " << i << std::endl;
    awaitable.complete(i);
  }
}

Listing 2. Example usage of coroutine.

In main function! i: 0   
iniside counter fun! i: 0   
In main function! i: 1   
iniside counter fun! i: 1   
In main function! i: 2   
iniside counter fun! i: 2 

Listing 3. Output from running the code from Listing2.

gRPC

By using a fast thread pool and a specialized coroutine framework, we can now develop a mechanism to process requests, dispatch them to the worker, perform computations, and send back the response. Our aim is to achieve a processing time of under 150ms, even with a workload of several million requests per hour. To meet this demand, we need to handle multiple concurrent requests, as we will often be waiting for a response from the worker. Thanks to the coroutine suspend mechanism, we don't need to switch threads, because we will just resume coroutine on any thread.

Img 3. gRPC server workflow

The gRPC library enables the creation of multiple CompletionQueue classes designed to manage requests. These queues offer an object called Tag, which can be of any type since its defined type is void*. The caller is free to insert any data and then cast this to a specific type. For instance, we can put a specific id that represents data from a vector that is stored in a given id.

We can also create an object which will hold all requested information and cast it to the void*. We need to provide such tag for the initialization of a request, for notification about cancellation, and for notification about finishing a request.

In most cases, it will be the same object, which will change its status during a request handling. In the case of handling data by gateway we decided to use an id option (we store awaitables in unordered_set) in the case of worker communication we decided to use a second option (an object that represents the state of a given request).

Prior to launching a server on a gateway to manage requests, the initial step involves configuring both a queue and handlers.. As we need to handle millions of requests, we should have a few CompletionQueue to balance the payload. Handlers should return a coroutine task because we want to avoid thread switching before each request step (receive a request, send data to worker, receive data from worker, send the final response to the caller).

std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>
    queues;
auto builder = grpc::ServerBuilder();

for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
  queues.emplace_back(builder.AddCompletionQueue());
}

auto server = builder.AddListeningPort(port)
                  .RegisterService(this)
                  .BuildAndStart();
}

Listing 4. Setup server and create a few CompletionQueue.

Task<void> InitializeHandler(
    grpc::ServerCompletionQueue* cq) {
  grpc::ServerContext context;
  Request request;
  grpc::ServerAsyncResponseWriter<Response> responder{
      context};

  // Function will return all awaitables and dedicated tag.
  // Because the coroutine stores the stack We don't need to
  // worry about the lifetime of local values even if a
  // function is suspended by coroutine
  auto [pendingTag, sendTag, finishTag, pendingAwaitable,
        sendAwaitable, finishAwaitable] =
      prepareAwaitables();

  // Set the tag that we want to receive when the user
  // cancels a request
  context.AsyncNotifyWhenDone(reinterpret_cast<void*>(&finishTag);   

  // Set the tag that we want to receive when we get a new request. This function 
  // is a gRPC method  that will fill the request with a proper date 
  // (received from a caller)   
  RequestHandle(
    &context, &request, &responder, cq, cq, reinterpret_cast<void*>(&pendingTag));   

  // Here we suspend coroutine until we get a request   
  co_await *pendingAwaitable;   

  // Initialize another handler (because we get a request, so we setup 
  // another which will wait)   
  InitializeHandler(cq);   

  // Here we send a request to worker, compute data , prepare response etc...   
  // Implementation is on listing 6.   
  handle(request, responder, context, snedTag);   

  // Wait until request will be sent   
  co_await *sendAwaitable;   

  // Wait for finish a request   
  co_await *finishAwaitable;
}

Listing 5. Setup handlers

void handle(
    const Request& request,
    grpc::ServerAsyncResponseWriter<Response>& responder,
    grpc::ServerContext* context, const Tag& sendTag) {

  // Push a task on a thread pool to not block further
  // requests. The coroutine may resume on any thread. We
  // can capture values by reference, because the coroutine
  // will store a stack, so all local values provided to the
  // handle function will exist until coroutine is finished
  _ threadPool.pushTask(
  [&message, &responder, &context, &sendTag](){
    json request = parseRequest(message);
    auto promise = std::make_shared<RpcPromise>(
        [&responder, &sendTag](const json& result) {
          Response response;
          response.set_response(result.dump());
          // Set a tag which will be returned by completion
          // queue when response will be sentd
          responder.Finish(
              response, grpc::Status::OK,
              reinterpret_cast<void*>(&sendTag));
        });

    // Function will send a request to worker and call
    // promise object when get a result
    handle(promise);   
  };
}

Listing 6. Handle method

Once the server is set up and the handlers are initialized, we are ready to start receiving requests. Each request utilizes three tags to signify its current state (pending, sending, finished). The handle method pushes the task to ThreadPool to not block further requests. Additionally, it prepares a special Promise object, which will be invoked later, when we get a response from the worker, followed by computing the final output. To simplify, we will not focus on this part in this article.

In the final phase of gRPC implementation, handling a CompletionQueue tag is straightforward. The process involves iterating through by calling Next until the function returns false, indicating the server or queue shutdown. After getting a new tag, the next step is to search the map<Tag*, std::shared_ptr<Awaitable>> (map need to be a thread safe!) and find an awaitable for a corresponding tag. Then complete it to resume coroutine. To resume the coroutine without blocking additional requests, it is essential to invoke complete on a thread pool, rather than the completionQueue thread.

void handle() {
  void* tag;
  bool ok;
  while (ok) {
    if (!cq->Next(&tag, &ok)) {
      // CompletionQueue shut down
      return;
    }

    _threadPool.pushTask([tag]() {
      processTask(reinterpret_cast<Tag*>(tag));
    });
  }

Listing 7. Handling a tag on the completion queue.

void processTask(Tag* tag) {
  // Find corresponding awaitable for a given tag (search
  // map)
  if (const auto it = tasks.find(tag); it != tasks.end()) {
    auto awaitable = std::move(it->second);
    tasks.erase(it);
    awaitable->complete();
  }
}

Listing 8. Complete awaitable

Conclusion  

By using our custom fast thread pool designed specifically for managing high volumes of small requests, along with coroutines that enable the execution of functions on any thread by pausing and resuming them when results are available, and incorporating gRPC, we can establish a resilient and high-performance server capable of simultaneously processing thousands of requests. This is made possible through the coroutine mechanism for suspending functions until results become available and the utilization of tags for the completion queue. In the next article, I will demonstrate how to write a client side.

Mateusz Adamski, Senior C++ developer at Synerise