Enhancing the capabilities of TerrariumDB, a real-time, schema-less database engine, is our top priority. Currently, our analytics backend relies on a JSON-based API, which proves vital on a daily basis. However, we are excited to announce the integration of SQL support, paving the way for a future of expanded functionality and an enhanced experience for our esteemed clients. This article marks the beginning of an insightful series, where we delve into the technical journey of integrating SQL with Terrarium's architecture. Stay tuned for upcoming articles, as we unveil advanced features and progress in the development process.
TerrariumDB, developed by Synerise, is a high-performance, in-memory and disk-based column and row store engine designed for real-time data collection and processing. Its schema-less nature allows it to handle diverse business scenarios with unparalleled scalability and speed. More about TerrariumDB can be found here.
We were strongly motivated to develop SQL support in-house due to our unique schema-less table structure, which lets users insert data of any type. Data is stored in column-oriented storage using our own - designed from scratch - column format, which supports their efficient storage and retrieval. This format supports the insertion of nested data structures, enabling users to retrieve entire objects along with all nested parameters, or access the nested data directly.
We chose to follow MySQL grammar due to its extensive documentation and widespread familiarity. However, we opted not to generate code with Abstract Syntax Trees (ASTs) directly from the MySQL grammar. Instead, we wanted to introduce new parts of the AST gradually, maintaining flexibility and control over the AST development. This approach allowed us to develop the system incrementally, ensuring stability and enabling continuous improvements based on our needs as we introduced new features.
As I delved into SQL integration, I initially used a basic architectural template that had been prepared by my teammates as proof of concept. This framework served as a solid starting point for me to begin the process of integrating SQL with TerrariumDB. Additionally, there
was a simple MySQL server-like implementation which enables basic connectivity capabilities allowing it to receive and respond to MySQL queries via linux mysql-client.
As a team, we had to make several key decisions and assumptions about how to proceed with the integration. Since the SQL support had to be developed from scratch, we needed to establish a general path forward.
One of the key decisions we made included:
· Choosing to support the MySQL standard due to its extensive integration capabilities and widespread usage, and familiarity, and ensuring SQL support with MySQL syntax compatibility. This approach allowed us to follow extensive MySQL/MariaDB documentation and introduce features incrementally. · Following the MySQL grammar published on GitHub for consistency when constructing ASTs and relations between them
· Choosing a connector compatible with MySQL communication protocol
· Drawing inspiration for building the lexer, parser and interpreters from Ruslan Spivak's tutorial Let’s Build A Simple Interpreter.
· Drawing inspiration from modern column-store OLAP database engines and its efficient streaming data processing techniques
· Data processing - initially focusing on streaming with row-based data processing, with future improvements in mind, to incorporate column-based data processing. More about this further below.
In short, the lexer is responsible for transforming characters of an SQL query into tokens. Then, a set of parsers detects the type of query and its parameters, subsequently building ASTs. Interpreters are responsible for creating data streams from the generated ASTs.
The lexer is responsible for transforming the raw SQL query into tokens, which are the basic building blocks for further parsing. The Lexer interface provides an execute method that takes a QString query and returns a vector of Token objects.
// Lexer interface
std::vector<Token> Lexer::execute(const QString& query) const;
Each Token represents a wide set of characters, as defined in the MySQL grammar. The Token class includes an enumeration of possible token types, such as presented below, and many others.
class Token
{
public:
enum class Type : uint8_t
{
Ampersand,
Arrow,
Asterisk,
At,
BackQuotedString,
// Other token types...
Slash,
String,
Tilde,
Whitespace
};
explicit Token(Type type, const QString& string);
Type type() const { return _type; }
const QString& string() const& { return _string;
}
private:
const Type _type;
const QString _string;
};
The parser references the vector containing the entire query and the position of the currently processed token. The position is necessary because the parsers are designed to parse the smallest possible part of a query atomically, allowing for maximum reusability. As a result, a complete AST for the query statement is returned, which holds all required command parameters and defines the query execution order.
// Parser interface
std::unique_ptr<StatementAst>
StatementParser::parse(const std::vector<Token>&
tokens, uint32_t& pos) const;
Interpreters operate in two primary steps. First, a tree of executable interpreters is generated from the previously prepared ASTs. In the second step, interpreters are executed recursively in the appropriate execution
The execute function takes sourceStream as one of its parameters and creates a new stream appropriate for the given interpreter, wrapping itself around the sourceStream. The original stream, which the subsequent streams are based on, contains raw data from the columns required to calculate all query expressions. The position argument refers to the column index (from the left) currently being processed by a given stream.
AdvancedQueryCtx is a query-scoped context required to process more advanced queries.
// Interpreter interface
class IExpressionInterpreter
{
public:
explicit IExpressionInterpreter() = default;
virtual ~IExpressionInterpreter() = default;
virtual std::unique_ptr<core:: IRowInputStream>
execute(std::unique_ptr<core:: IRowInputStream>&&
sourceStream,
std::shared_ptr<AdvancedQueryCtx> aqc,
uint32_t pos) && = 0;
static std::unique_ptr<IExpressionInterpreter>
fromAst(sql::ExpressionElementAst&&
expressionElementAst);
};
Each stream manages a distinct aspect of the execution process for a particular expression such as evaluating a binary operator, processing a function call or other similar tasks. Initiated by the interpreter using a source stream and a specified column index, each stream is customized with a unique implementation to fulfill its designated functions and requirements. However, all streams share two common functions. The read() function reads and returns a single row of data, where the row consists of all columns required by the query. The schema() function returns the column layout for stream, with each QStringList representing a single column and each element in the list corresponding to the nested structure of the stored objects.
// Stream interface
class IRowInputStream
{
public:
explicit IRowInputStream() = default;
virtual ~IRowInputStream() = default;
virtual std::optional<Row> read() = 0;
virtual const std::vector<QStringList>& schema()
const = 0;
};
Because evaluating an expression for a given column often requires data from multiple columns, all columns required by the expressions in the SELECT list are fetched. Stream processing occurs from left to right, ensuring that when evaluating the expression for a single column, all required data are consumed by the streams. As a result, only one value is produced and stored in the current index before moving to the next column.
Query execution operates as follows:
1. Gateway Handling: Upon receiving the SQL query, the gateway handles the request. Due to our distributed architecture, the current cluster comprises 18 nodes, each running on a separate virtual machine (VM). Each node controls 120 workers.
2. Request Distribution: The gateway extracts columns required for query execution and directly requests them from the workers.
3. Data Collection: Workers collect data from specified columns and apply predicates. If the query contains aggregates and pre-aggregation is feasible (e.g., no GROUP BY is present), pre-aggregation is performed.
4. Response Aggregation: The responses are then sent back to the gateway. The gateway aggregates all of them, evaluates expressions, and sends the final response back to the client.
This architecture ensures efficient query processing by leveraging a distributed setup, allowing for scalable and robust data handling. A key aspect of this system is end-to-end streaming, which enables data to be received from workers and processed one row at a time, allowing for fast processing and low resource consumption and memory allocation. If the query does not require aggregation, the result rows are directly sent to the user.
When aggregation is necessary, it often involves aggregating a single response from each worker due to the applied pre-aggregation. In more complex scenarios, such as those involving GROUP BY where groups need to be created before the aggregation process, all data are fetched and stored on disk. Further details will be discussed in upcoming articles.
Pre-aggregation on the worker side is a critical aspect of our distributed system, designed to enhance efficiency and reduce data transmission. For most cases, pre-aggregation involves straightforward operations such as summing all values from a given column. However, calculating averages requires special attention. Due to the distributed nature, we cannot guarantee how rows from a given table are distributed among workers, meaning all data from all rows for a given column are needed to calculate the average correctly.
To handle this robustly without sending all rows to the gateway for aggregation, we adopt a specific pre-aggregation approach. Each worker calculates the sum and count of values for the specified column and sends back the results. The gateway aggregates these sums and counts from all workers, enabling the correct calculation of the average.
This approach significantly reduces network traffic and decreases the amount of data aggregation required on the gateway side. This not only improves efficiency but also optimizes the overall performance of the query execution process.
Author: Maciej Kłosiński | Software Engineer at Synerise