Skip to content

Conversation

@albertlockett
Copy link
Member

@albertlockett albertlockett commented Oct 23, 2025

Proof of concept for a columnar query engine that processes OTAP batches using DataFusion.

The "engine" is the data_engine_columnar::engine::OtapBatchEngine struct. It's process takes a data_engine_expressions::PipelineExpression applies the specified transformations to a batch of OtapArrowRecords. This method first builds a datafusion::logical_plan::LogicalPlan from the pipeline's DataExpressions, then applies the plan to the root batch, then updates the child record batches.

This work is still very much proof of concept. Functionality is quite limited and there are some known issues. I'll probably continue to work on this and update the list below as fixes/features are added:

Supported Functionality:

Payload Types

  • logs only

Filtering

  • filtering using binary expressions like <left> <operator> <right>
    • supported operators: ==, > and >=
    • left support includes:
      • column names (e.g. severity_type, event_name)
      • resource/scope columns (resource.schema_url, scope.name)
      • attributes on root record, resources & scope (attributes["X"], `resource.attributes["Y"])
    • right must be a literal (putting the column on the right is not yet supported)
  • logical operators like or, and and not are supported
  • limited support for missing optional columns
    • handles when columns are missing on the root batch

to call out some known filtering gaps:

  • filtering by log.body is not yet supported
  • doesn't yet handle filtering by attributes if the attributes column/batch is missing

Transforms

  • using extend to set the value of a column is supported
    • not yet supported for attributes
    • known issue: when setting the column, it doesn't use the dynamic dictionary builder, it just sets it to the native array type.

Conditional Logic
Adds a ConditionalDataExpression meant to represent a hypothetical if/else if/else type of expression for conditionally applying transforms to certain rows in the batch. This could be hypothetically thought of as

source 
| if (<logical expression>)
    (<data expressions> | <data expression> | ...)
  
  // else if branch would be optional
  else if (<logical expression>)
    (<data expression> | ...)
    
  // else branch would be optional too
  else
    (<data expression> | ...)
  
| // output is union of branches

Examples

TL;DR examples of currently supported pipelines:

// filtering by columns:
logs | where severity_text == "WARN"
logs | where instrumentation_scope.name == "otel-rust sdk"

// filtering by attributes
logs | where attributes["X"] == "Y"
logs | where attributes["num_attr"] > 2
logs | where attributes["num_attr"] >= 2
logs | where resource.attributes["X"] == "Y"
logs | where instrumentation_scope.attributes["X"] == "Y"

// filtering with logical expressions and, or, not
logs | where severity_text == "WARN" or log.attributes["X"] == "Y"
logs | where severity_text == "WARN" and log.attributes["X"] == "Y"
logs | where not(severity_text == "WARN")
logs | where(severity_text == "WARN" or attributes["X"] == "Y") and (severity_text == "DEBUG" or attributes["X"] == "B")
logs | where not(severity_text == "WARN" or log.attributes["X"] == "Y")
// etc..

// set a field value
logs | extend severity_text = "INFO"

// conditionally set fields
logs | if (event_name == "error") (extend severity_text = "ERROR")

// conditional set fields with if/else if/else branching logic:
logs 
| if (event_name == "error")
    (extend severity_text = "ERROR")
  else if (event_name == "warn")
    (extend severity_text = "WARN")
  else
    (extend severity_text = "DEBUG" | extend event_name = "not important")
| // union of the branches is output

@github-actions github-actions bot added rust Pull requests that update Rust code query-engine Query Engine / Transform related tasks labels Oct 23, 2025
@codecov
Copy link

codecov bot commented Oct 23, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 79.08%. Comparing base (f28d58b) to head (283aa8d).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1342      +/-   ##
==========================================
- Coverage   82.84%   79.08%   -3.77%     
==========================================
  Files         392      307      -85     
  Lines      106402    70808   -35594     
==========================================
- Hits        88153    56000   -32153     
+ Misses      17715    14274    -3441     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 80.64% <ø> (+0.06%) ⬆️
otel-arrow-rust 89.97% <100.00%> (+0.27%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine ∅ <ø> (∅)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@albertlockett
Copy link
Member Author

albertlockett commented Oct 23, 2025

for my immediate next step, I plan to do some performance analysis/testing on this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

query-engine Query Engine / Transform related tasks rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

1 participant