-
Notifications
You must be signed in to change notification settings - Fork 58
Description
In #1342 , filtering by attributes are supported, including statements like:
logs | where attributes["x"] == "y"
logs | where attributes["x"] == "y" and attributes["x2"] == ["y2"]
logs | where attributes["x"] == "y" or resource.attributes["x2"] != ["y2"]
// etc.However, in OTAP attributes are stored in a separate record batch which must be joined. That means the three plans above are actually implemented like:
// logs | where attributes["x"] == "y"
HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, parent_id@0)]
DataSourceExec: partitions=1, partition_sizes=[1] // Logs
FilterExec: key@1 = x AND str@2 = Y, projection=[parent_id@0]
DataSourceExec: partitions=1, partition_sizes=[1] // LogAttrs
// logs | where attributes["x"] == "y" and attributes["x2"] == ["y2"]
HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, parent_id@0)]
DataSourceExec: partitions=1, partition_sizes=[1] // Logs
HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, parent_id@0)], projection=[_row_number@1]
HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, parent_id@0)]
DataSourceExec: partitions=1, partition_sizes=[1]
FilterExec: key@1 = x AND str@2 = Y, projection=[parent_id@0]
DataSourceExec: partitions=1, partition_sizes=[1] // LogAttrs
FilterExec: key@1 = x2 AND str@2 = Y2, projection=[parent_id@0]
DataSourceExec: partitions=1, partition_sizes=[1] // LogAttrs
// logs | where attributes["x"] == "y" or attributes["x2"] != ["y2"]
HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@8, parent_id@0)]
DataSourceExec: partitions=1, partition_sizes=[1] // Logs
AggregateExec: mode=Partial, gby=[_row_number@0 as _row_number], aggr=[first_value(_row_number)]
UnionExec:
FilterExec: key@1 = x AND str@2 = Y, projection=[parent_id@0]
DataSourceExec: partitions=1, partition_sizes=[1] // LogAttrs
FilterExec: key@1 = x2 AND str@2 != Y2, projection=[parent_id@0]
DataSourceExec: partitions=1, partition_sizes=[1] // ResourceAttrsIn all of these cases, which are somewhat simple filters, the plans are quite complicated. Furthermore, although datafusion's optimizer is quite good, it's not a panacea and doesn't really do much to optimize these plans.
There's an opportunity here to simplify things, and get better performance.
Once we've implemented #1409 we'll have the opportunity do build a custom filtering ExecutionPlan implementation that has access to both the root record batch (Logs) and the various child batches (LogAttrs, ResourceAttrs). It would be better to use this do create a filter that:
- filters the attributes according to the predicate
- creates a unique list of parent IDs from the filtered child record batch
- uses this list to create a mask for selected rows from the parent batch
- filters the parent batch
This is actually similar to what we do in the current Filter Processor implementation that was added in [otap-dataflow]basic filter processor logs followup #1341
I added a partial implementation of this here https://github.com/albertlockett/otel-arrow/blob/840293f189d2226a0e64d1662d4aeb5b24f083be/rust/experimental/query_engine/engine-columnar/src/attributes/filter.rs#L250-L310 and was able to get ~30-40% improvement on query performance filtering by a single attribute compared with using HashJoinExec
Metadata
Metadata
Assignees
Labels
Type
Projects
Status