-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Refactor InListExpr to support structs by re-using existing hashing infrastructure #18449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| // TODO: serialize the inner ArrayRef directly to avoid materialization into literals | ||
| // by extending the protobuf definition to support both representations and adding a public | ||
| // accessor method to InListExpr to get the inner ArrayRef |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll create a followup issue once we merge this
| 05)--------ProjectionExec: expr=[] | ||
| 06)----------CoalesceBatchesExec: target_batch_size=8192 | ||
| 07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]) | ||
| 07)------------FilterExec: substr(md5(CAST(value@0 AS Utf8View)), 1, 32) IN (SET) ([7f4b18de3cfeb9b4ac78c381ee2ad278, a, b, c]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is because we now support Utf8View for building the sets 😄
| let random_state = RandomState::with_seed(0); | ||
| let mut hashes_buf = vec![0u64; array.len()]; | ||
| let Ok(hashes_buf) = create_hashes_from_arrays( | ||
| &[array.as_ref()], | ||
| &random_state, | ||
| &mut hashes_buf, | ||
| ) else { | ||
| unreachable!("Failed to create hashes for InList array. This shouldn't happen because make_set should have succeeded earlier."); | ||
| }; | ||
| hashes_buf.hash(state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could pre-compute and store a hash: u64 which would be both more performant when Hash is called and avoid this error, but it would add more complexity and some overhead when building the InListExpr
4d4b797 to
9a0f6be
Compare
9a0f6be to
f1f3b66
Compare
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - (This PR): apache#18448 - apache#18449 (depends on apache#18448) - apache#18451 ## Changes in this PR Change create_hashes and related functions to work with &dyn Array references instead of requiring ArrayRef (Arc-wrapped arrays). This avoids unnecessary Arc::clone() calls and enables calls that only have an &dyn Array to use the hashing utilities. - Add create_hashes_from_arrays(&[&dyn Array]) function - Refactor hash_dictionary, hash_list_array, hash_fixed_list_array to use references instead of cloning - Extract hash_single_array() helper for common logic --------- Co-authored-by: Andrew Lamb <[email protected]>
f1f3b66 to
d1b9d05
Compare
| /// supported. Returns None otherwise. See [`LiteralGuarantee::analyze`] to | ||
| /// create these structures from an predicate (boolean expression). | ||
| fn new<'a>( | ||
| fn new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth discussing in this review how far we propagate the changes.
In particular, InListExpr will now have two operations modes:
- Was built with an
ArrayRefor was able to build anArrayReffrom a homogeneously typedVec<Arc<dyn PhysicalExpr>>which are all literals. - Was built with a
Vec<Arc<dyn PhysicalExpr>>which are not literals or homogeneously typed.
If we restrict LiteralGuarantee to only operate on the first cases, I think we could lift out a lot of computation: instead of transforming ArrayRef -> Vec<Arc<dyn PhysicalExpr>> -> Vec<ScalarValue> -> HashSet<ScalarValue> which then gets fed into bloom filters which are per-column and don't really support heterogenous ScalarValues we could re-use the already deduplicated ArraySet that InListExpr has internally or something. The ultimate thing to do, but that would require even more work and changes, would be to make PruningPredicate::contains accept an enum ArrayOrScalars { Array(ArrayRef), Scalars(Vec<ScalarValue>) } so that we can push down and iterate over the Arc'ed ArrayRef the whole way down. I think we could make this backwards compatible.
I think that change is worth it, but it requires a bit more coordination (with arrow-rs) and a bigger change.
The end result would be that:
- When you create an
InListExproperates in mode (1) we are able to push down into bloom filters with no data copies at all. - When the
InListExproperates in mode (2) we'd bail on the pushdown early (e.g.list() -> Option<ArrayRef>) and avoid building theHashSet<ScalarValue>, etc. that won't be used.
Wdyt @alamb ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay I've looked into this and it is entirely possible, I think we should do it.
Basically the status quo currently is that we always try to build an ArrayHashSet which is only possible if we can convert the Vec<ScalarValue> into an ArrayRef.
At that point the only reason to store the Vec<SclarValue> is to later pass it into PruningPredicate -> bloom filters and LiteralGuarantee. If we can refactor those two to also handle an ArrayRef we could probably avoid a lot of cloning and make things more efficient by using arrays. I don't even think we need to support Vec<ScalarValue> at all: the only reason to have that is if you could not build a homogeneously typed array, and if that is the case you probably can't do any sort of pushdown into a bloom filter. E.g. select variant_get(col, 'abc') in (1, 2.0, 'c') might make sense and work but I don't think we could ever push that down into a bloom filter. So InListExpr needs to operate on both but I don't think the pruning machinery does.
So anyway I think I may try to reduce this change to only be about using create_hashes and ignore any inefficiencies as a TODO for a followup issue. At the end of the day if we can make HashJoinExec faster even if that's with some inefficiencies I think that's okay and we can improve more later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll record a preview of some of the changes I had made to explore this (by no means ready) just for future reference: https://github.com/pydantic/datafusion/compare/refactor-in-list...pydantic:datafusion:use-array-in-pruning?expand=1
| pub trait Set: Send + Sync { | ||
| fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>; | ||
| fn has_nulls(&self) -> bool; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get rid of the Set trait. The only implementer was ArraySet
| array => Arc::new(ArraySet::new(array, make_hash_set(array))), | ||
| DataType::Boolean => { | ||
| let array = as_boolean_array(array)?; | ||
| Arc::new(ArraySet::new(array, make_hash_set(array))) | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get rid of this type matching logic
| trait IsEqual: HashValue { | ||
| fn is_equal(&self, other: &Self) -> bool; | ||
| } | ||
|
|
||
| impl<T: IsEqual + ?Sized> IsEqual for &T { | ||
| fn is_equal(&self, other: &Self) -> bool { | ||
| T::is_equal(self, other) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We get rid of these custom equality / hash traits
ab74641 to
f412ead
Compare
|
🤖 |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @adriangb
I looked through the code and the basic idea makes a lot of sense to me 👍
I kicked off some benchmarks to see what impact, if any, this change has on performance. Assuming it is the same or better, I think it would be good to merge
I do suggest adding some slt level logic for struct IN lists if we don't already have some, but I don't think it is necessary
| false => Some(negated), | ||
| } | ||
| }) | ||
| let mut hashes_buf = vec![0u64; v.len()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow on PR we could potentially look into reusing this hashes_buf -- aka rather than reallocating each invocations of contains instead make a field (probably needs to be a Mutex or something) that is a Vec
| }) | ||
| let mut hashes_buf = vec![0u64; v.len()]; | ||
| create_hashes([v], &self.state, &mut hashes_buf)?; | ||
| let cmp = make_comparator(v, in_array, SortOptions::default())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comparator is some dynamic function -- the overhead of using the dynamic dispatch in the critical path may be substantial).
If it turns out to be too slow, we can potentially create specializations for comparisons (aka make a speicalized hash set for the different physical array types, and fall back to the dynamic comparator)
| /// | ||
| /// The `list` field will be empty when using this constructor, as the array is stored | ||
| /// directly in the static filter. | ||
| pub fn in_list_from_array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be more discoverable if this was a method on InList rather than a free function
Something like
impl InLIst
fn new_from_array( expr: Arc<dyn PhysicalExpr>,
array: ArrayRef,
negated: bool,
) -> Result<Self> {
...
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, I was just following the existing patterns
| } | ||
|
|
||
| #[test] | ||
| fn in_list_struct() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also please add some .slt level tests for IN on a set?
|
🤖: Benchmark completed Details
|
|
It looks like there are indeed some regressions. I propose we do two things:
I think that will get us the broader type support and code re-use while avoiding any slowdown. Once we do the upstreaming into arrow it won’t even be any more code than it is now (a bit more code in arrow but not even that much). And we should be able to do it all in one PR here |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
@alamb I'm not sure what's going on with these benchmarks. @friendlymatthew and I both ran it independently and got mostly big speedups as per above. Could you maybe try on your local machine? Here's how I've been running them: git checkout main && git log -1 --oneline && cargo bench --bench in_list -- --save-baseline before && git checkout refactor-in-list && git log -1 --oneline && cargo bench --bench in_list -- --baseline before |
## Background This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in apache#17171. A "target state" is tracked in apache#18393. There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own: - (This PR): apache#18448 - apache#18449 (depends on apache#18448) - apache#18451 ## Changes in this PR Change create_hashes and related functions to work with &dyn Array references instead of requiring ArrayRef (Arc-wrapped arrays). This avoids unnecessary Arc::clone() calls and enables calls that only have an &dyn Array to use the hashing utilities. - Add create_hashes_from_arrays(&[&dyn Array]) function - Refactor hash_dictionary, hash_list_array, hash_fixed_list_array to use references instead of cloning - Extract hash_single_array() helper for common logic --------- Co-authored-by: Andrew Lamb <[email protected]>
|
🤖 |
|
🤖: Benchmark completed Details
|
|
I don't know what is going on on the benchmarks 🤔 I ran this on my laptop: gh pr checkout https://github.com/apache/datafusion/pull/18449 && git log -1 --oneline && cargo bench --bench in_list -- --save-baseline in_list && git checkout `git merge-base apache/main head` && git log -1 --oneline && cargo bench --bench in_list -- --save-baseline beforeThen critcmp reports: |
|
So something is up with the automated benchmarks? If I'm reading your report correctly this branch is faster in practically all cases, including those that the automated benchmarks reported it is worse in. Am I missing something here? |
|
Could it somehow be architecture related? I assume we both have ARM MacBooks while the automated runner is x86? |
That is my reading of the benchmarks too
It is a good theory -- I am rerunning the same commands on my benchmarking machine (GCP c2-standard-8 (8 vCPUs, 32 GB Memory)) and will report results |
|
Here are the results from the gcp runner (aka I can reproduce my script and it shows several cases much slower) I am also going to see if |
|
Results are the same when I used target-native |
|
Crazy stuff. So this is something like 70% on ARM but 70% slower on x86. I
have not faced this sort of thing before. Will try to investigate.
…On Wed, Nov 12, 2025 at 3:42 AM Andrew Lamb ***@***.***> wrote:
*alamb* left a comment (apache/datafusion#18449)
<#18449 (comment)>
I could reproduce the slowdown on a local x86_64 dedicated machine
(windows!) so i think it is reasonable to conclude this is something
related to the platform
Screenshot.2025-11-11.at.2.41.24.PM.png (view on web)
<https://github.com/user-attachments/assets/1d56ab37-5363-427f-a33d-02b2b131adc0>
—
Reply to this email directly, view it on GitHub
<#18449 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AANMPP7B73XUR6CUFNDNZ4334I327AVCNFSM6AAAAACK5MVYFOVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTKMJYGQ4TGMJXGI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
5958baf to
45e9e89
Compare
|
@NGA-TRAN and @gene-bordegaray - this PR may interest you. Specifically, this is a step towards a more efficient pushdown of join filters -- #18451 The idea is to push down dynamic join filters as InLists -- perhaps you can help move this one along |
|
@adriangb -- what is the status of this PR? is it ready for another look? Do you need help trying to get the performance back? I sort of lost track with everything else that is going on |
|
I had a look at the perf regression on my local x86_64 dev box - I think the following might solve it pydantic#43 - appears to be singificantly better for me locally. |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
Yay we got the performance back, and then some! @alamb I need to resolve a ton of conflicts now. And I’d like to take the opportunity to benchmark with/without:
|
|
One possibility is that you replace the hashes buffer with a "hasher" similar to the comparator and then hash the value inside the iteration in |
…nfrastructure Co-authored-by: David Hewitt <[email protected]>
2db9927 to
eff65ad
Compare
Yeah I think that makes a ton of sense here & in other places as well, but it would be a new thing to add into
I'd also guess if there are few nulls (2) will beat (3) but if there are a lot of nulls (3) will beat 2... So for now I'm going to stick with the current approach if perf is acceptable, this is already a tangent on a larger change, we can make a ticket to consider doing (3) in the future. |
|
I removed the enum comparator, benchmarks showed it was slower than the dynamic dispatch version. The thread local hashing / buffer re-use seems to be a big win though. @alamb could you kick off benchmarks again? If they look good are we good to merge this? |

Background
This PR is part of an EPIC to push down hash table references from HashJoinExec into scans. The EPIC is tracked in #17171.
A "target state" is tracked in #18393.
There is a series of PRs to get us to this target state in smaller more reviewable changes that are still valuable on their own:
HashJoinExecand use CASE expressions for more precise filters #18451Changes in this PR
by adding an internal InListStorage enum with Array and Exprs variants
in_list_from_array(expr, list_array, negated)for creating InList from arraysAlthough the diff looks large most of it is actually tests and docs. I think the actual code change is a negative LOC change, or at least negative complexity (eliminates a trait, a macro, matching on data types).