Skip to content

Commit 90c9a89

Browse files
authored
engine: combine plan and exec mods (#173)
1 parent 0f2e152 commit 90c9a89

33 files changed

+506
-531
lines changed

crates/toasty/src/engine.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
mod plan;
2-
use plan::Plan;
3-
41
mod eval;
52
mod exec;
63
mod index;
@@ -48,7 +45,7 @@ impl Engine {
4845

4946
// The plan is called once (single entry record stream) with no arguments
5047
// (empty record).
51-
self.exec_plan(&plan.pipeline, plan.vars).await
48+
self.exec_plan(plan).await
5249
}
5350

5451
/// Returns a new ExprContext

crates/toasty/src/engine/exec.rs

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,82 @@
1+
mod action;
2+
pub(crate) use action::Action;
3+
14
mod delete_by_key;
5+
pub(crate) use delete_by_key::DeleteByKey;
6+
27
mod exec_statement;
8+
pub(crate) use exec_statement::{ExecStatement, ExecStatementOutput};
9+
310
mod filter;
11+
pub(crate) use filter::Filter;
12+
413
mod find_pk_by_index;
14+
pub(crate) use find_pk_by_index::FindPkByIndex;
15+
516
mod get_by_key;
17+
pub(crate) use get_by_key::GetByKey;
18+
619
mod nested_merge;
20+
pub(crate) use nested_merge::{MergeQualification, NestedChild, NestedLevel, NestedMerge};
21+
22+
mod output;
23+
pub(crate) use output::Output;
24+
25+
mod plan;
26+
pub(crate) use plan::ExecPlan;
27+
728
mod project;
29+
pub(crate) use project::Project;
30+
831
mod query_pk;
32+
pub(crate) use query_pk::QueryPk;
33+
934
mod rmw;
35+
pub(crate) use rmw::ReadModifyWrite;
36+
1037
mod set_var;
38+
pub(crate) use set_var::SetVar;
39+
1140
mod update_by_key;
41+
pub(crate) use update_by_key::UpdateByKey;
1242

13-
mod var_store;
14-
pub(crate) use var_store::VarStore;
43+
mod var;
44+
pub(crate) use var::{VarId, VarStore};
1545

16-
use crate::{
17-
engine::{
18-
plan::{self, Action, VarId},
19-
Engine,
20-
},
21-
Result,
46+
use crate::{engine::Engine, Result};
47+
use toasty_core::{
48+
driver::Rows,
49+
stmt::{self, ValueStream},
2250
};
23-
use toasty_core::stmt::ValueStream;
24-
use toasty_core::{driver::Rows, stmt};
2551

2652
struct Exec<'a> {
2753
engine: &'a Engine,
2854
vars: VarStore,
2955
}
3056

3157
impl Engine {
32-
pub(crate) async fn exec_plan(
33-
&self,
34-
pipeline: &plan::Pipeline,
35-
vars: VarStore,
36-
) -> Result<ValueStream> {
37-
Exec { engine: self, vars }.exec_pipeline(pipeline).await
38-
}
39-
}
58+
pub(crate) async fn exec_plan(&self, plan: ExecPlan) -> Result<ValueStream> {
59+
let mut exec = Exec {
60+
engine: self,
61+
vars: plan.vars,
62+
};
4063

41-
impl Exec<'_> {
42-
async fn exec_pipeline(&mut self, pipeline: &plan::Pipeline) -> Result<ValueStream> {
43-
for step in &pipeline.actions {
44-
self.exec_step(step).await?;
64+
for step in &plan.actions {
65+
exec.exec_step(step).await?;
4566
}
4667

47-
Ok(if let Some(returning) = pipeline.returning {
48-
match self.vars.load(returning).await? {
68+
Ok(if let Some(returning) = plan.returning {
69+
match exec.vars.load(returning).await? {
4970
Rows::Count(_) => ValueStream::default(),
5071
Rows::Values(value_stream) => value_stream,
5172
}
5273
} else {
5374
ValueStream::default()
5475
})
5576
}
77+
}
5678

79+
impl Exec<'_> {
5780
async fn exec_step(&mut self, action: &Action) -> Result<()> {
5881
match action {
5982
Action::DeleteByKey(action) => self.action_delete_by_key(action).await,
@@ -70,7 +93,7 @@ impl Exec<'_> {
7093
}
7194
}
7295

73-
async fn collect_input2(&mut self, input: &[VarId]) -> Result<Vec<stmt::Value>> {
96+
async fn collect_input(&mut self, input: &[VarId]) -> Result<Vec<stmt::Value>> {
7497
let mut ret = Vec::new();
7598

7699
for var_id in input {

crates/toasty/src/engine/plan/action.rs renamed to crates/toasty/src/engine/exec/action.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::engine::plan::{
1+
use crate::engine::exec::{
22
DeleteByKey, ExecStatement, Filter, FindPkByIndex, GetByKey, NestedMerge, Project, QueryPk,
33
ReadModifyWrite, SetVar, UpdateByKey,
44
};

crates/toasty/src/engine/exec/delete_by_key.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,31 @@
1-
use toasty_core::driver::{operation, Rows};
1+
use toasty_core::{
2+
driver::{operation, Rows},
3+
schema::db::TableId,
4+
stmt,
5+
};
26

3-
use super::{plan, Exec, Result};
7+
use crate::engine::exec::{Action, Output, VarId};
8+
9+
use super::{Exec, Result};
10+
11+
/// Input is the key to delete
12+
#[derive(Debug)]
13+
pub(crate) struct DeleteByKey {
14+
/// How to access input from the variable table.
15+
pub input: VarId,
16+
17+
/// Where to store the output (impacted row count)
18+
pub output: Output,
19+
20+
/// Which model to get
21+
pub table: TableId,
22+
23+
/// Only delete keys that match the filter
24+
pub filter: Option<stmt::Expr>,
25+
}
426

527
impl Exec<'_> {
6-
pub(super) async fn action_delete_by_key(&mut self, action: &plan::DeleteByKey) -> Result<()> {
28+
pub(super) async fn action_delete_by_key(&mut self, action: &DeleteByKey) -> Result<()> {
729
let keys = self
830
.vars
931
.load(action.input)
@@ -37,3 +59,9 @@ impl Exec<'_> {
3759
Ok(())
3860
}
3961
}
62+
63+
impl From<DeleteByKey> for Action {
64+
fn from(src: DeleteByKey) -> Self {
65+
Self::DeleteByKey(src)
66+
}
67+
}

crates/toasty/src/engine/exec/exec_statement.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,38 @@
1-
use super::{plan, Exec, Result};
21
use toasty_core::{
32
driver::{operation, Rows},
43
stmt,
54
};
65

6+
use crate::{
7+
engine::exec::{Action, Exec, Output, VarId},
8+
Result,
9+
};
10+
11+
#[derive(Debug)]
12+
pub(crate) struct ExecStatement {
13+
/// Where to get arguments for this action.
14+
pub input: Vec<VarId>,
15+
16+
/// How to handle output
17+
pub output: ExecStatementOutput,
18+
19+
/// The query to execute. This may require input to generate the query.
20+
pub stmt: stmt::Statement,
21+
22+
/// When true, the statement is a conditional update without any returning.
23+
pub conditional_update_with_no_returning: bool,
24+
}
25+
26+
#[derive(Debug)]
27+
pub(crate) struct ExecStatementOutput {
28+
/// Databases always return rows as a vec of values. This specifies the type
29+
/// of each value.
30+
pub ty: Option<Vec<stmt::Type>>,
31+
pub output: Output,
32+
}
33+
734
impl Exec<'_> {
8-
pub(super) async fn action_exec_statement(
9-
&mut self,
10-
action: &plan::ExecStatement,
11-
) -> Result<()> {
35+
pub(super) async fn action_exec_statement(&mut self, action: &ExecStatement) -> Result<()> {
1236
let mut stmt = action.stmt.clone();
1337

1438
// Collect input values and substitute into the statement
@@ -110,3 +134,9 @@ impl Exec<'_> {
110134
Ok(())
111135
}
112136
}
137+
138+
impl From<ExecStatement> for Action {
139+
fn from(value: ExecStatement) -> Self {
140+
Self::ExecStatement(value)
141+
}
142+
}

crates/toasty/src/engine/exec/filter.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,26 @@
11
use crate::{
2-
engine::{exec::Exec, plan},
2+
engine::{
3+
eval,
4+
exec::{Action, Exec, Output, VarId},
5+
},
36
Result,
47
};
58
use toasty_core::driver::Rows;
69

10+
#[derive(Debug)]
11+
pub(crate) struct Filter {
12+
/// Source of the input
13+
pub(crate) input: VarId,
14+
15+
/// Where to store the output
16+
pub(crate) output: Output,
17+
18+
/// How to project it before storing
19+
pub(crate) filter: eval::Func,
20+
}
21+
722
impl Exec<'_> {
8-
pub(super) async fn action_filter(&mut self, action: &plan::Filter) -> Result<()> {
23+
pub(super) async fn action_filter(&mut self, action: &Filter) -> Result<()> {
924
// Load the input variable
1025
let mut input_stream = self.vars.load(action.input).await?.into_values();
1126

@@ -30,3 +45,9 @@ impl Exec<'_> {
3045
Ok(())
3146
}
3247
}
48+
49+
impl From<Filter> for Action {
50+
fn from(value: Filter) -> Self {
51+
Action::Filter(value)
52+
}
53+
}

crates/toasty/src/engine/exec/find_pk_by_index.rs

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,44 @@
11
use toasty_core::{
22
driver::{operation, Rows},
3+
schema::db::{IndexId, TableId},
34
stmt,
45
};
56

6-
use super::{plan, Exec, Result};
7-
use crate::engine::simplify;
7+
use crate::{
8+
engine::{
9+
exec::{Action, Exec, Output, VarId},
10+
simplify,
11+
},
12+
Result,
13+
};
14+
15+
/// Schema: `self` references [index-fields, input-fields] flattened
16+
#[derive(Debug)]
17+
pub(crate) struct FindPkByIndex {
18+
/// How to access input from the variable table.
19+
pub input: Vec<VarId>,
20+
21+
/// Where to store the output
22+
pub output: Output,
23+
24+
/// Table to query
25+
pub table: TableId,
26+
27+
/// Index to use
28+
pub index: IndexId,
29+
30+
/// Filter to apply to index
31+
pub filter: stmt::Expr,
32+
}
833

934
impl Exec<'_> {
10-
pub(super) async fn action_find_pk_by_index(
11-
&mut self,
12-
action: &plan::FindPkByIndex,
13-
) -> Result<()> {
35+
pub(super) async fn action_find_pk_by_index(&mut self, action: &FindPkByIndex) -> Result<()> {
1436
let mut filter = action.filter.clone();
1537

1638
// Collect input values and substitute into the statement
1739
if !action.input.is_empty() {
1840
assert!(action.input.len() == 1);
19-
let input = self.collect_input2(&action.input).await?;
41+
let input = self.collect_input(&action.input).await?;
2042

2143
filter.substitute(&input);
2244

@@ -50,3 +72,9 @@ impl Exec<'_> {
5072
Ok(())
5173
}
5274
}
75+
76+
impl From<FindPkByIndex> for Action {
77+
fn from(src: FindPkByIndex) -> Self {
78+
Self::FindPkByIndex(src)
79+
}
80+
}

crates/toasty/src/engine/exec/get_by_key.rs

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,32 @@
1-
use super::{plan, Exec, Result};
2-
use crate::driver::Rows;
3-
use toasty_core::{driver::operation, stmt::ValueStream};
1+
use crate::{
2+
driver::Rows,
3+
engine::exec::{Action, Exec, Output, VarId},
4+
Result,
5+
};
6+
use toasty_core::{
7+
driver::operation,
8+
schema::db::{ColumnId, TableId},
9+
stmt::ValueStream,
10+
};
11+
12+
/// Get a model by key
13+
#[derive(Debug)]
14+
pub(crate) struct GetByKey {
15+
/// Where to get the keys to load
16+
pub input: VarId,
17+
18+
/// Where to store the result
19+
pub output: Output,
20+
21+
/// Table to query
22+
pub table: TableId,
23+
24+
/// Columns to get
25+
pub columns: Vec<ColumnId>,
26+
}
427

528
impl Exec<'_> {
6-
pub(super) async fn action_get_by_key(&mut self, action: &plan::GetByKey) -> Result<()> {
29+
pub(super) async fn action_get_by_key(&mut self, action: &GetByKey) -> Result<()> {
730
let keys = self
831
.vars
932
.load(action.input)
@@ -34,3 +57,9 @@ impl Exec<'_> {
3457
Ok(())
3558
}
3659
}
60+
61+
impl From<GetByKey> for Action {
62+
fn from(src: GetByKey) -> Self {
63+
Self::GetByKey(src)
64+
}
65+
}

0 commit comments

Comments
 (0)