A Rust implementation of a query engine, following the architectural patterns from How Query Engines Work. Also includes its own distributed execution layer as well as observability layer.
I wrote an article about it on my blog! Read it here
Beyond your typical query engine components, it introduces:
- Rayon parallelism — parallel scans across Parquet row groups
- Tokio offloading — CPU-heavy work like vectorized Arrow kernels and hash table construction runs on blocking threads, keeping the async executor free
- S3-backed data sources — workers read directly from S3/MinIO using range-request I/O, fetching only Parquet footers and needed column chunks, never the full file
The distributed layer follows a Scheduler–Worker architecture coordinated over gRPC and Arrow Flight. It features:
- Intra-file parallelism — each query is expanded into per-row-group sub-tasks
- Shuffle-based joins — workers hash-partition data by join key, synchronize through a barrier, then pull partitions from peers for local hash joins
- Worker health — managed through heartbeats and a reaper, with all RPCs using exponential backoff with jitter
- Distributed observability — OpenTelemetry traces propagated across gRPC boundaries via W3C TraceContext, exported to Jaeger; Prometheus metrics (counters, histograms, gauges) for cluster health
If you'd like to implement your own version of this engine, feel free to follow the steps outlined here!
| Principle | Description |
|---|---|
| Educational Growth | Learning Rust & Distributed Systems through implementation. Document learning outcomes in /markdown_references as we encounter them |
| Incremental Abstraction | Building layer-by-layer. Each layer is designed with awareness of potential future layers but without premature implementation. We plan and implement one layer at a time. Architecture diagram can be found in /markdown_references |
| LLM-driven development | Read more code than you write. Have the LLM drive implementation. More details here |
| Module | Status | Description |
|---|---|---|
datatypes |
[Completed] | Type system built on Apache Arrow |
datasource |
[Completed] | Data source abstractions and CSV/Parquet readers |
logical-plan |
[Completed] | Logical plans and expressions |
dataframe |
[Completed] | Higher-level DataFrame API (convenience layer) |
physical-plan |
[Completed] | Physical plans and expression evaluation |
query-planner |
[Completed] | Translation from logical to physical plans |
joins |
[Completed] | Physical join implementations (hash join, etc.) |
optimizer |
[Completed] | Query optimization rules (projection pushdown only) |
execution |
[Completed] | Query execution (via DataFrame::execute / DataFrame::collect in dataframe) |
wire-protocol |
[Completed] | Protobuf LogicalPlan + Arrow Flight (query-engine-proto, query-server/client) |
parallel-execution |
[Completed] | Data Parallelism (Range Requests + Row Group splitting) |
concurrency |
[Completed] | Tokio + Rayon + Streams (async-native execution, spawn_blocking offload) |
distributed-execution |
[Completed] | Distributed scheduler/worker, shuffle, resilience across multiple nodes |
observability |
[Completed] | Prometheus metrics + Axum dashboard (SSE/query history) + tracing integration |
The query engine follows a layered architecture where each module builds upon the previous ones:
-
datatypes(foundation): Provides the type system (Schema,Field,ScalarValue,RecordBatch,ColumnVector) that all other modules use for data representation. Corresponds to book chapters: "Apache Arrow" and "Type System". -
datasource: Reads data from external sources (CSV, Parquet) and returnsRecordBatchinstances with associatedSchema. Corresponds to book chapter: "Data Sources". -
logical-plan: Represents queries as logical operations (select, filter, join) that operate onSchemaandScalarValueexpressions. Corresponds to book chapter: "Logical Plans". Includes join support within this module. -
dataframe: Higher-level convenience API built on top of logical plans, providing a more ergonomic interface for query construction. Corresponds to book chapter: "DataFrames". This is optional - the engine can work directly with logical plans. The book introduces DataFrames after Logical Plans as a user-friendly abstraction that builds logical plans behind the scenes. -
physical-plan: Implements concrete execution strategies for logical operations, producingRecordBatchoutputs. Corresponds to book chapter: "Physical Plans". -
query-planner: Translates logical plans into physical plans, considering available operators and data sources. Corresponds to book chapter: "Query Planning". -
joins: Implements physical join algorithms (hash join, nested loop join, etc.) as physical plan operators. Builds onphysical-planand is used byquery-plannerwhen planning join operations. Corresponds to book chapter: "Joins". -
optimizer: Applies optimization rules to logical plans (predicate pushdown, projection pruning, join reordering, etc.). Corresponds to book chapter: "Query Optimizers". -
execution: Orchestrates physical plan execution, managing data flow between operators. Corresponds to book chapter: "Query Execution". -
parallel-execution: Extends theexecutionmodule to support parallel query execution within a single node using multiple threads. -
distributed-execution: Advanced extension for distributed query execution across multiple machines in a cluster. Requires significant architectural changes.
The repository includes a single end-to-end driver script that provisions an EC2 cluster (scheduler + workers), deploys the binaries, generates/uploads TPC-H parquet data to S3, runs the engine in --mode distributed, and (by default) tears everything down.
terraformandaws(AWS CLI) installed and configured with credentials- An EC2 SSH keypair available locally (see
KEY_PATH/KEY_NAME) - You should expect this to provision real infrastructure (potential cost)
bash demo/benchmarks/run_distributed_benchmark.shThe script starts SSH tunnels and prints the URLs it forwards to your machine.
- Scheduler dashboard (Axum, HTTP):
http://localhost:8080 - Scheduler log (text file):
http://localhost:9999/scheduler.log - Jaeger log (raw text, if needed):
http://localhost:9999/jaeger.log
- Written to
demo/BENCHMARKS_DISTRIBUTED.md(fetched back to your laptop)
Runs the engine in --mode local on a provisioned EC2 instance, generating TPC-H parquet data on the instance and benchmarking vs Pandas.
terraformandaws(AWS CLI) installed and configured with credentials- An EC2 SSH keypair available locally (see
KEY_PATH/KEY_NAME) - You should expect this to provision real infrastructure (potential cost)
bash demo/benchmarks/run_ec2_local_benchmark.sh- Written to
demo/BENCHMARKS.md(fetched back to your laptop)
The following concepts from the book were intentionally skipped to keep the implementation focused and educational:
- Aggregation logical expressions
- SQL support & tokenization
- Physical aggregation expressions & physical plans (e.g., hash aggregates)
- Logical to physical plan query planning optimizations (we use a simple, deterministic planner, implement one type of join only)
- Subqueries
- Predicate push-down (logical and to storage)
- Cost-based optimization (including join ordering, physical-operator choice by cost)
- Constant folding
- Dead column elimination
- Limit push-down
- Partition pruning
- Join reordering
- Advanced join optimizations (spill, Bloom filters, etc.)
- Distributed optimizations (shuffle minimization, broadcast-join selection, stats-based planning)
- Limited File Format Support: Distributed execution and MinIO/Object Store integration are currently scoped to Parquet files only. CSV and in-memory sources are currently not supported in remote worker contexts.
- Not intentionally de-scoped but tech-debt from poor architecture planning: In actual query engines, the physical plan is serialized and sent to workers, not the logical ones
- There is no DAG traversal, we assume that the join stage always executes last
- Tech debt: Many of our classes have deprecated methods for backward compatibility for our tests
-
Task 7.1 (Coordinator HA): Remove the Scheduler as a single point of failure. Use a lightweight consensus library or external store (like
etcd) to elect a "Leader" scheduler. -
Task 7.2 (Lineage-based Recovery): If a Worker fails (or is scaled down) mid-query, the Scheduler marks its tasks as
Pendingand re-assigns them. Implement lineage-based re-computation. -
Task 7.3 (Elastic Autoscaling): Implement a reactive scaler. If the "Pending Task" queue grows beyond a threshold, the Scheduler automatically triggers a script to spin up new worker containers.
-
Success Criteria: You can kill the leader Scheduler or a worker during a massive shuffle join, and the system elects a new leader and heals the data flow without failing the query.
-
Task 8: Python API support