Skip to content

Custom ExecutionPlan impl for filtering by attributes #1410

@albertlockett

Description

@albertlockett

In #1342 , filtering by attributes are supported, including statements like:

logs | where attributes["x"] == "y"
logs | where attributes["x"] == "y" and attributes["x2"] == ["y2"]
logs | where attributes["x"] == "y" or resource.attributes["x2"] != ["y2"]
// etc.

However, in OTAP attributes are stored in a separate record batch which must be joined. That means the three plans above are actually implemented like:

// logs | where attributes["x"] == "y"
HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, parent_id@0)]
  DataSourceExec: partitions=1, partition_sizes=[1] // Logs
  FilterExec: key@1 = x AND str@2 = Y, projection=[parent_id@0]
    DataSourceExec: partitions=1, partition_sizes=[1] // LogAttrs

// logs | where attributes["x"] == "y" and attributes["x2"] == ["y2"]
HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, parent_id@0)]
  DataSourceExec: partitions=1, partition_sizes=[1] // Logs
    HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, parent_id@0)], projection=[_row_number@1]
      HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, parent_id@0)]
          DataSourceExec: partitions=1, partition_sizes=[1]
          FilterExec: key@1 = x AND str@2 = Y, projection=[parent_id@0]
            DataSourceExec: partitions=1, partition_sizes=[1] // LogAttrs
    FilterExec: key@1 = x2 AND str@2 = Y2, projection=[parent_id@0]
      DataSourceExec: partitions=1, partition_sizes=[1] // LogAttrs

// logs | where attributes["x"] == "y" or attributes["x2"] != ["y2"]
HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@8, parent_id@0)]
  DataSourceExec: partitions=1, partition_sizes=[1] // Logs
  AggregateExec: mode=Partial, gby=[_row_number@0 as _row_number], aggr=[first_value(_row_number)]
    UnionExec:
      FilterExec: key@1 = x AND str@2 = Y, projection=[parent_id@0]
        DataSourceExec: partitions=1, partition_sizes=[1] // LogAttrs
      FilterExec: key@1 = x2 AND str@2 != Y2, projection=[parent_id@0]
        DataSourceExec: partitions=1, partition_sizes=[1] // ResourceAttrs

In all of these cases, which are somewhat simple filters, the plans are quite complicated. Furthermore, although datafusion's optimizer is quite good, it's not a panacea and doesn't really do much to optimize these plans.

There's an opportunity here to simplify things, and get better performance.

Once we've implemented #1409 we'll have the opportunity do build a custom filtering ExecutionPlan implementation that has access to both the root record batch (Logs) and the various child batches (LogAttrs, ResourceAttrs). It would be better to use this do create a filter that:

  • filters the attributes according to the predicate
  • creates a unique list of parent IDs from the filtered child record batch
  • uses this list to create a mask for selected rows from the parent batch
  • filters the parent batch
    This is actually similar to what we do in the current Filter Processor implementation that was added in [otap-dataflow]basic filter processor logs followup #1341

I added a partial implementation of this here https://github.com/albertlockett/otel-arrow/blob/840293f189d2226a0e64d1662d4aeb5b24f083be/rust/experimental/query_engine/engine-columnar/src/attributes/filter.rs#L250-L310 and was able to get ~30-40% improvement on query performance filtering by a single attribute compared with using HashJoinExec

Metadata

Metadata

Assignees

No one assigned

    Labels

    query-engineQuery Engine / Transform related tasksquery-engine-columnarColumnar query engine which uses DataFusion to process OTAP Batches

    Type

    No type

    Projects

    Status

    No status

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions