Skip to content

Commit fb0dcd7

Browse files
authored
[query-engine] Expand expressions to support user-defined functions (#1478)
Relates to #1479 ## Changes * Make it possible to declare and invoke user-defined functions in query expression tree ## Details Implementation and KQL parsing will come as follow-ups.
1 parent b071617 commit fb0dcd7

File tree

7 files changed

+638
-3
lines changed

7 files changed

+638
-3
lines changed

rust/experimental/query_engine/engine-recordset/src/scalars/scalar_expressions.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,8 @@ where
460460
}
461461
}
462462
}
463+
ScalarExpression::Argument(_) => todo!(),
464+
ScalarExpression::InvokeFunction(_) => todo!(),
463465
};
464466

465467
execution_context.add_diagnostic_if_enabled(

rust/experimental/query_engine/expressions/src/pipeline_expression.rs

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub struct PipelineExpression {
88
query: Box<str>,
99
query_location: QueryLocation,
1010
constants: Vec<StaticScalarExpression>,
11+
functions: Vec<PipelineFunction>,
1112
initializations: Vec<PipelineInitialization>,
1213
expressions: Vec<DataExpression>,
1314
}
@@ -18,6 +19,7 @@ impl PipelineExpression {
1819
query: query.into(),
1920
query_location: QueryLocation::new(0, query.len(), 1, 1).unwrap(),
2021
constants: Vec::new(),
22+
functions: Vec::new(),
2123
initializations: Vec::new(),
2224
expressions: Vec::new(),
2325
}
@@ -38,6 +40,11 @@ impl PipelineExpression {
3840
self.constants.len() - 1
3941
}
4042

43+
pub(crate) fn push_function(&mut self, value: PipelineFunction) -> usize {
44+
self.functions.push(value);
45+
self.functions.len() - 1
46+
}
47+
4148
pub fn get_constants(&self) -> &[StaticScalarExpression] {
4249
&self.constants
4350
}
@@ -46,6 +53,14 @@ impl PipelineExpression {
4653
self.constants.get(constant_id)
4754
}
4855

56+
pub fn get_functions(&self) -> &[PipelineFunction] {
57+
&self.functions
58+
}
59+
60+
pub fn get_function(&self, function_id: usize) -> Option<&PipelineFunction> {
61+
self.functions.get(function_id)
62+
}
63+
4964
pub fn get_expressions(&self) -> &[DataExpression] {
5065
&self.expressions
5166
}
@@ -69,12 +84,14 @@ impl PipelineExpression {
6984
pub fn get_resolution_scope(&self) -> PipelineResolutionScope<'_> {
7085
PipelineResolutionScope {
7186
constants: &self.constants,
87+
functions: &self.functions,
7288
}
7389
}
7490

7591
pub(crate) fn optimize(&mut self) -> Result<(), Vec<ExpressionError>> {
7692
let scope = PipelineResolutionScope {
7793
constants: &self.constants,
94+
functions: &self.functions,
7895
};
7996

8097
let mut errors = Vec::new();
@@ -125,6 +142,22 @@ impl std::fmt::Display for PipelineExpression {
125142
}
126143
}
127144

145+
if self.functions.is_empty() {
146+
writeln!(f, "├── Functions: []")?;
147+
} else {
148+
writeln!(f, "├── Functions:")?;
149+
let last_idx = self.functions.len() - 1;
150+
for (i, func) in self.functions.iter().enumerate() {
151+
if i == last_idx {
152+
writeln!(f, "│ └── {i}")?;
153+
func.fmt_with_indent(f, "│ ")?;
154+
} else {
155+
writeln!(f, "│ ├── {i}")?;
156+
func.fmt_with_indent(f, "│ │ ")?;
157+
}
158+
}
159+
}
160+
128161
if self.initializations.is_empty() {
129162
writeln!(f, "├── Initializations: []")?;
130163
} else {
@@ -163,12 +196,17 @@ impl std::fmt::Display for PipelineExpression {
163196

164197
pub struct PipelineResolutionScope<'a> {
165198
constants: &'a Vec<StaticScalarExpression>,
199+
functions: &'a Vec<PipelineFunction>,
166200
}
167201

168202
impl<'a> PipelineResolutionScope<'a> {
169203
pub fn get_constant(&self, constant_id: usize) -> Option<&'a StaticScalarExpression> {
170204
self.constants.get(constant_id)
171205
}
206+
207+
pub fn get_function(&self, function_id: usize) -> Option<&'a PipelineFunction> {
208+
self.functions.get(function_id)
209+
}
172210
}
173211

174212
#[derive(Debug, Clone, PartialEq)]
@@ -220,6 +258,14 @@ impl PipelineExpressionBuilder {
220258
self
221259
}
222260

261+
pub fn with_functions(mut self, functions: Vec<PipelineFunction>) -> PipelineExpressionBuilder {
262+
for f in functions {
263+
self.push_function(f);
264+
}
265+
266+
self
267+
}
268+
223269
pub fn with_global_variables(
224270
mut self,
225271
variables: Vec<(&str, ScalarExpression)>,
@@ -246,6 +292,10 @@ impl PipelineExpressionBuilder {
246292
self.pipeline.push_constant(value)
247293
}
248294

295+
pub fn push_function(&mut self, value: PipelineFunction) -> usize {
296+
self.pipeline.push_function(value)
297+
}
298+
249299
pub fn push_global_variable(&mut self, name: &str, value: ScalarExpression) {
250300
self.pipeline.push_global_variable(name, value)
251301
}
@@ -267,6 +317,210 @@ impl AsRef<PipelineExpression> for PipelineExpressionBuilder {
267317
}
268318
}
269319

320+
#[derive(Debug, Clone, PartialEq)]
321+
pub enum PipelineFunctionExpression {
322+
Transform(TransformExpression),
323+
Return(ScalarExpression),
324+
}
325+
326+
impl PipelineFunctionExpression {
327+
pub(crate) fn fmt_with_indent(
328+
&self,
329+
f: &mut std::fmt::Formatter<'_>,
330+
indent: &str,
331+
) -> std::fmt::Result {
332+
match self {
333+
PipelineFunctionExpression::Transform(t) => {
334+
write!(f, "Transform: ")?;
335+
t.fmt_with_indent(f, format!("{indent} ").as_str())
336+
}
337+
PipelineFunctionExpression::Return(s) => {
338+
write!(f, "Return: ")?;
339+
s.fmt_with_indent(f, format!("{indent} ").as_str())
340+
}
341+
}
342+
}
343+
}
344+
345+
#[derive(Debug, Clone, PartialEq)]
346+
pub struct PipelineFunction {
347+
query_location: QueryLocation,
348+
parameters: Vec<PipelineFunctionParameter>,
349+
return_value_type: Option<ValueType>,
350+
implementation: PipelineFunctionImplementation,
351+
}
352+
353+
#[derive(Debug, Clone, PartialEq)]
354+
pub enum PipelineFunctionImplementation {
355+
Expressions(Vec<PipelineFunctionExpression>),
356+
External(Box<str>),
357+
}
358+
359+
impl PipelineFunctionImplementation {
360+
pub(crate) fn fmt_with_indent(
361+
&self,
362+
f: &mut std::fmt::Formatter<'_>,
363+
indent: &str,
364+
) -> std::fmt::Result {
365+
match self {
366+
PipelineFunctionImplementation::Expressions(e) => {
367+
if e.is_empty() {
368+
writeln!(f, "(Expressions): None")?;
369+
} else {
370+
writeln!(f, "(Expressions):")?;
371+
let last_idx = e.len() - 1;
372+
for (i, p) in e.iter().enumerate() {
373+
if i == last_idx {
374+
write!(f, "{indent} └── ")?;
375+
p.fmt_with_indent(f, format!("{indent} ").as_str())?;
376+
} else {
377+
write!(f, "{indent} ├── ")?;
378+
p.fmt_with_indent(f, format!("{indent} │ ").as_str())?;
379+
}
380+
}
381+
}
382+
}
383+
PipelineFunctionImplementation::External(id) => {
384+
writeln!(f, "(External): {id}")?;
385+
}
386+
}
387+
388+
Ok(())
389+
}
390+
}
391+
392+
impl PipelineFunction {
393+
pub fn new_with_expressions(
394+
query_location: QueryLocation,
395+
parameters: Vec<PipelineFunctionParameter>,
396+
return_value_type: Option<ValueType>,
397+
expressions: Vec<PipelineFunctionExpression>,
398+
) -> PipelineFunction {
399+
Self {
400+
query_location,
401+
parameters,
402+
return_value_type,
403+
implementation: PipelineFunctionImplementation::Expressions(expressions),
404+
}
405+
}
406+
407+
pub fn new_external(
408+
name: &str,
409+
parameters: Vec<PipelineFunctionParameter>,
410+
return_value_type: Option<ValueType>,
411+
) -> PipelineFunction {
412+
Self {
413+
query_location: QueryLocation::new(0, 0, 1, 1).unwrap(),
414+
parameters,
415+
return_value_type,
416+
implementation: PipelineFunctionImplementation::External(name.into()),
417+
}
418+
}
419+
420+
pub fn get_parameters(&self) -> &[PipelineFunctionParameter] {
421+
&self.parameters
422+
}
423+
424+
pub fn get_return_value_type(&self) -> Option<ValueType> {
425+
self.return_value_type.clone()
426+
}
427+
428+
pub fn get_implementation(&self) -> &PipelineFunctionImplementation {
429+
&self.implementation
430+
}
431+
432+
pub(crate) fn fmt_with_indent(
433+
&self,
434+
f: &mut std::fmt::Formatter<'_>,
435+
indent: &str,
436+
) -> std::fmt::Result {
437+
if self.parameters.is_empty() {
438+
writeln!(f, "{indent}├── Parameters: None")?;
439+
} else {
440+
writeln!(f, "{indent}├── Parameters: ")?;
441+
let last_idx = self.parameters.len() - 1;
442+
for (i, p) in self.parameters.iter().enumerate() {
443+
if i == last_idx {
444+
p.fmt_with_indent(f, format!("{indent}│ └── ").as_str())?;
445+
} else {
446+
p.fmt_with_indent(f, format!("{indent}│ ├── ").as_str())?;
447+
}
448+
}
449+
}
450+
451+
match &self.return_value_type {
452+
Some(t) => writeln!(f, "{indent}├── ReturnType: {t}")?,
453+
None => writeln!(f, "{indent}├── ReturnType: None")?,
454+
}
455+
456+
write!(f, "{indent}└── Implementation")?;
457+
self.implementation.fmt_with_indent(f, indent)?;
458+
459+
Ok(())
460+
}
461+
}
462+
463+
#[derive(Debug, Clone, PartialEq)]
464+
pub struct PipelineFunctionParameter {
465+
query_location: QueryLocation,
466+
name: Box<str>,
467+
parameter_type: PipelineFunctionParameterType,
468+
}
469+
470+
impl PipelineFunctionParameter {
471+
pub fn new(
472+
query_location: QueryLocation,
473+
name: &str,
474+
parameter_type: PipelineFunctionParameterType,
475+
) -> PipelineFunctionParameter {
476+
Self {
477+
query_location,
478+
name: name.into(),
479+
parameter_type,
480+
}
481+
}
482+
483+
pub fn get_name(&self) -> &str {
484+
&self.name
485+
}
486+
487+
pub fn get_parameter_type(&self) -> PipelineFunctionParameterType {
488+
self.parameter_type.clone()
489+
}
490+
491+
pub(crate) fn fmt_with_indent(
492+
&self,
493+
f: &mut std::fmt::Formatter<'_>,
494+
indent: &str,
495+
) -> std::fmt::Result {
496+
write!(f, "{indent}{} = ", &self.name)?;
497+
498+
let value_type = match &self.parameter_type {
499+
PipelineFunctionParameterType::Scalar(v) => {
500+
write!(f, "Scalar(")?;
501+
v
502+
}
503+
PipelineFunctionParameterType::MutableValue(v) => {
504+
write!(f, "MutableValue(")?;
505+
v
506+
}
507+
};
508+
509+
match value_type {
510+
Some(v) => writeln!(f, "{v:?})")?,
511+
None => writeln!(f, "Any)")?,
512+
}
513+
514+
Ok(())
515+
}
516+
}
517+
518+
#[derive(Debug, Clone, PartialEq)]
519+
pub enum PipelineFunctionParameterType {
520+
Scalar(Option<ValueType>),
521+
MutableValue(Option<ValueType>),
522+
}
523+
270524
#[cfg(test)]
271525
mod tests {
272526
use super::*;

0 commit comments

Comments
 (0)