Skip to content

euanlimzx/distributed-query-engine

Repository files navigation

What this project is

A Rust implementation of a query engine, following the architectural patterns from How Query Engines Work. Also includes its own distributed execution layer as well as observability layer.

I wrote an article about it on my blog! Read it here

Beyond your typical query engine components, it introduces:

  • Rayon parallelism — parallel scans across Parquet row groups
  • Tokio offloading — CPU-heavy work like vectorized Arrow kernels and hash table construction runs on blocking threads, keeping the async executor free
  • S3-backed data sources — workers read directly from S3/MinIO using range-request I/O, fetching only Parquet footers and needed column chunks, never the full file

The distributed layer follows a Scheduler–Worker architecture coordinated over gRPC and Arrow Flight. It features:

  • Intra-file parallelism — each query is expanded into per-row-group sub-tasks
  • Shuffle-based joins — workers hash-partition data by join key, synchronize through a barrier, then pull partitions from peers for local hash joins
  • Worker health — managed through heartbeats and a reaper, with all RPCs using exponential backoff with jitter
  • Distributed observability — OpenTelemetry traces propagated across gRPC boundaries via W3C TraceContext, exported to Jaeger; Prometheus metrics (counters, histograms, gauges) for cluster health

If you'd like to implement your own version of this engine, feel free to follow the steps outlined here!

Project Philosophy

Principle Description
Educational Growth Learning Rust & Distributed Systems through implementation. Document learning outcomes in /markdown_references as we encounter them
Incremental Abstraction Building layer-by-layer. Each layer is designed with awareness of potential future layers but without premature implementation. We plan and implement one layer at a time. Architecture diagram can be found in /markdown_references
LLM-driven development Read more code than you write. Have the LLM drive implementation. More details here

Module Roadmap

Module Status Description
datatypes [Completed] Type system built on Apache Arrow
datasource [Completed] Data source abstractions and CSV/Parquet readers
logical-plan [Completed] Logical plans and expressions
dataframe [Completed] Higher-level DataFrame API (convenience layer)
physical-plan [Completed] Physical plans and expression evaluation
query-planner [Completed] Translation from logical to physical plans
joins [Completed] Physical join implementations (hash join, etc.)
optimizer [Completed] Query optimization rules (projection pushdown only)
execution [Completed] Query execution (via DataFrame::execute / DataFrame::collect in dataframe)
wire-protocol [Completed] Protobuf LogicalPlan + Arrow Flight (query-engine-proto, query-server/client)
parallel-execution [Completed] Data Parallelism (Range Requests + Row Group splitting)
concurrency [Completed] Tokio + Rayon + Streams (async-native execution, spawn_blocking offload)
distributed-execution [Completed] Distributed scheduler/worker, shuffle, resilience across multiple nodes
observability [Completed] Prometheus metrics + Axum dashboard (SSE/query history) + tracing integration

Module Dependencies and Data Flow

The query engine follows a layered architecture where each module builds upon the previous ones:

  1. datatypes (foundation): Provides the type system (Schema, Field, ScalarValue, RecordBatch, ColumnVector) that all other modules use for data representation. Corresponds to book chapters: "Apache Arrow" and "Type System".

  2. datasource: Reads data from external sources (CSV, Parquet) and returns RecordBatch instances with associated Schema. Corresponds to book chapter: "Data Sources".

  3. logical-plan: Represents queries as logical operations (select, filter, join) that operate on Schema and ScalarValue expressions. Corresponds to book chapter: "Logical Plans". Includes join support within this module.

  4. dataframe: Higher-level convenience API built on top of logical plans, providing a more ergonomic interface for query construction. Corresponds to book chapter: "DataFrames". This is optional - the engine can work directly with logical plans. The book introduces DataFrames after Logical Plans as a user-friendly abstraction that builds logical plans behind the scenes.

  5. physical-plan: Implements concrete execution strategies for logical operations, producing RecordBatch outputs. Corresponds to book chapter: "Physical Plans".

  6. query-planner: Translates logical plans into physical plans, considering available operators and data sources. Corresponds to book chapter: "Query Planning".

  7. joins: Implements physical join algorithms (hash join, nested loop join, etc.) as physical plan operators. Builds on physical-plan and is used by query-planner when planning join operations. Corresponds to book chapter: "Joins".

  8. optimizer: Applies optimization rules to logical plans (predicate pushdown, projection pruning, join reordering, etc.). Corresponds to book chapter: "Query Optimizers".

  9. execution: Orchestrates physical plan execution, managing data flow between operators. Corresponds to book chapter: "Query Execution".

  10. parallel-execution: Extends the execution module to support parallel query execution within a single node using multiple threads.

  11. distributed-execution: Advanced extension for distributed query execution across multiple machines in a cluster. Requires significant architectural changes.

Run Distributed Benchmark (EC2 + SSH log dashboard)

The repository includes a single end-to-end driver script that provisions an EC2 cluster (scheduler + workers), deploys the binaries, generates/uploads TPC-H parquet data to S3, runs the engine in --mode distributed, and (by default) tears everything down.

Prerequisites

  • terraform and aws (AWS CLI) installed and configured with credentials
  • An EC2 SSH keypair available locally (see KEY_PATH / KEY_NAME)
  • You should expect this to provision real infrastructure (potential cost)

Run distributed test with 4 workers

bash demo/benchmarks/run_distributed_benchmark.sh

View logs from your laptop (localhost)

The script starts SSH tunnels and prints the URLs it forwards to your machine.

  • Scheduler dashboard (Axum, HTTP): http://localhost:8080
  • Scheduler log (text file): http://localhost:9999/scheduler.log
  • Jaeger log (raw text, if needed): http://localhost:9999/jaeger.log

Results

  • Written to demo/BENCHMARKS_DISTRIBUTED.md (fetched back to your laptop)

Run Local Benchmark (EC2)

Runs the engine in --mode local on a provisioned EC2 instance, generating TPC-H parquet data on the instance and benchmarking vs Pandas.

Prerequisites

  • terraform and aws (AWS CLI) installed and configured with credentials
  • An EC2 SSH keypair available locally (see KEY_PATH / KEY_NAME)
  • You should expect this to provision real infrastructure (potential cost)

Run local test (single node)

bash demo/benchmarks/run_ec2_local_benchmark.sh

Results

  • Written to demo/BENCHMARKS.md (fetched back to your laptop)

References

Intentionally Skipped Concepts; Things I wished I did but didn't

The following concepts from the book were intentionally skipped to keep the implementation focused and educational:

Core Features

  • Aggregation logical expressions
  • SQL support & tokenization
  • Physical aggregation expressions & physical plans (e.g., hash aggregates)
  • Logical to physical plan query planning optimizations (we use a simple, deterministic planner, implement one type of join only)
  • Subqueries

Query Optimizations

  • Predicate push-down (logical and to storage)
  • Cost-based optimization (including join ordering, physical-operator choice by cost)
  • Constant folding
  • Dead column elimination
  • Limit push-down
  • Partition pruning
  • Join reordering
  • Advanced join optimizations (spill, Bloom filters, etc.)
  • Distributed optimizations (shuffle minimization, broadcast-join selection, stats-based planning)

Distributed & Object Storage

  • Limited File Format Support: Distributed execution and MinIO/Object Store integration are currently scoped to Parquet files only. CSV and in-memory sources are currently not supported in remote worker contexts.
  • Not intentionally de-scoped but tech-debt from poor architecture planning: In actual query engines, the physical plan is serialized and sent to workers, not the logical ones
  • There is no DAG traversal, we assume that the join stage always executes last
  • Tech debt: Many of our classes have deprecated methods for backward compatibility for our tests

Future Roadmap

  • Task 7.1 (Coordinator HA): Remove the Scheduler as a single point of failure. Use a lightweight consensus library or external store (like etcd) to elect a "Leader" scheduler.

  • Task 7.2 (Lineage-based Recovery): If a Worker fails (or is scaled down) mid-query, the Scheduler marks its tasks as Pending and re-assigns them. Implement lineage-based re-computation.

  • Task 7.3 (Elastic Autoscaling): Implement a reactive scaler. If the "Pending Task" queue grows beyond a threshold, the Scheduler automatically triggers a script to spin up new worker containers.

  • Success Criteria: You can kill the leader Scheduler or a worker during a massive shuffle join, and the system elects a new leader and heals the data flow without failing the query.

  • Task 8: Python API support

About

Query Engine in Rust + Distributed Execution + Observability Layer

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors