Skip to content

Commit 0a934fc

Browse files
committed
Refactor state management in HashJoinExec
and use CASE expressions to evaluate pushed down filters only for the given partition.
1 parent f57da83 commit 0a934fc

File tree

6 files changed

+492
-225
lines changed

6 files changed

+492
-225
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
278278
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
279279
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
280280
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
281-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
281+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= aa AND d@0 <= ab ELSE false END ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
282282
"
283283
);
284284
}
@@ -1309,7 +1309,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
13091309
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
13101310
- CoalesceBatchesExec: target_batch_size=8192
13111311
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1312-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
1312+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 2 WHEN 2 THEN a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb WHEN 4 THEN a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ELSE false END ]
13131313
"
13141314
);
13151315

@@ -1326,7 +1326,7 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
13261326
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
13271327
- CoalesceBatchesExec: target_batch_size=8192
13281328
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1329-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
1329+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ELSE false END ]
13301330
"
13311331
);
13321332

@@ -1671,8 +1671,8 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
16711671
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
16721672
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true
16731673
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
1674-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab ]
1675-
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb ]
1674+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN b@0 >= aa AND b@0 <= ab ELSE false END ]
1675+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ CASE hash_repartition % 1 WHEN 0 THEN d@0 >= ca AND d@0 <= cb ELSE false END ]
16761676
"
16771677
);
16781678
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 38 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use crate::filter_pushdown::{
2626
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
2727
FilterPushdownPropagation,
2828
};
29-
use crate::joins::hash_join::shared_bounds::{ColumnBounds, SharedBoundsAccumulator};
29+
use crate::joins::hash_join::shared_bounds::{
30+
ColumnBounds, PartitionBounds, SharedBuildAccumulator,
31+
};
3032
use crate::joins::hash_join::stream::{
3133
BuildSide, BuildSideInitialState, HashJoinStream, HashJoinStreamState,
3234
};
@@ -87,7 +89,8 @@ const HASH_JOIN_SEED: RandomState =
8789
/// HashTable and input data for the left (build side) of a join
8890
pub(super) struct JoinLeftData {
8991
/// The hash table with indices into `batch`
90-
pub(super) hash_map: Box<dyn JoinHashMapType>,
92+
/// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown
93+
pub(super) hash_map: Arc<dyn JoinHashMapType>,
9194
/// The input rows for the build side
9295
batch: RecordBatch,
9396
/// The build side on expressions values
@@ -102,32 +105,13 @@ pub(super) struct JoinLeftData {
102105
/// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption.
103106
/// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle.
104107
_reservation: MemoryReservation,
105-
/// Bounds computed from the build side for dynamic filter pushdown
106-
pub(super) bounds: Option<Vec<ColumnBounds>>,
108+
/// Bounds computed from the build side for dynamic filter pushdown.
109+
/// If the partition is empty (no rows) this will be None.
110+
/// If the partition has some rows this will be Some with the bounds for each join key column.
111+
pub(super) bounds: Option<PartitionBounds>,
107112
}
108113

109114
impl JoinLeftData {
110-
/// Create a new `JoinLeftData` from its parts
111-
pub(super) fn new(
112-
hash_map: Box<dyn JoinHashMapType>,
113-
batch: RecordBatch,
114-
values: Vec<ArrayRef>,
115-
visited_indices_bitmap: SharedBitmapBuilder,
116-
probe_threads_counter: AtomicUsize,
117-
reservation: MemoryReservation,
118-
bounds: Option<Vec<ColumnBounds>>,
119-
) -> Self {
120-
Self {
121-
hash_map,
122-
batch,
123-
values,
124-
visited_indices_bitmap,
125-
probe_threads_counter,
126-
_reservation: reservation,
127-
bounds,
128-
}
129-
}
130-
131115
/// return a reference to the hash map
132116
pub(super) fn hash_map(&self) -> &dyn JoinHashMapType {
133117
&*self.hash_map
@@ -364,9 +348,9 @@ pub struct HashJoinExec {
364348
struct HashJoinExecDynamicFilter {
365349
/// Dynamic filter that we'll update with the results of the build side once that is done.
366350
filter: Arc<DynamicFilterPhysicalExpr>,
367-
/// Bounds accumulator to keep track of the min/max bounds on the join keys for each partition.
351+
/// Build accumulator to collect build-side information (hash maps and/or bounds) from each partition.
368352
/// It is lazily initialized during execution to make sure we use the actual execution time partition counts.
369-
bounds_accumulator: OnceLock<Arc<SharedBoundsAccumulator>>,
353+
build_accumulator: OnceLock<Arc<SharedBuildAccumulator>>,
370354
}
371355

372356
impl fmt::Debug for HashJoinExec {
@@ -977,8 +961,10 @@ impl ExecutionPlan for HashJoinExec {
977961

978962
let batch_size = context.session_config().batch_size();
979963

980-
// Initialize bounds_accumulator lazily with runtime partition counts (only if enabled)
981-
let bounds_accumulator = enable_dynamic_filter_pushdown
964+
// Initialize build_accumulator lazily with runtime partition counts (only if enabled)
965+
// Use RepartitionExec's random state (seeds: 0,0,0,0) for partition routing
966+
let repartition_random_state = RandomState::with_seeds(0, 0, 0, 0);
967+
let build_accumulator = enable_dynamic_filter_pushdown
982968
.then(|| {
983969
self.dynamic_filter.as_ref().map(|df| {
984970
let filter = Arc::clone(&df.filter);
@@ -987,13 +973,14 @@ impl ExecutionPlan for HashJoinExec {
987973
.iter()
988974
.map(|(_, right_expr)| Arc::clone(right_expr))
989975
.collect::<Vec<_>>();
990-
Some(Arc::clone(df.bounds_accumulator.get_or_init(|| {
991-
Arc::new(SharedBoundsAccumulator::new_from_partition_mode(
976+
Some(Arc::clone(df.build_accumulator.get_or_init(|| {
977+
Arc::new(SharedBuildAccumulator::new_from_partition_mode(
992978
self.mode,
993979
self.left.as_ref(),
994980
self.right.as_ref(),
995981
filter,
996982
on_right,
983+
repartition_random_state,
997984
))
998985
})))
999986
})
@@ -1036,7 +1023,7 @@ impl ExecutionPlan for HashJoinExec {
10361023
batch_size,
10371024
vec![],
10381025
self.right.output_ordering().is_some(),
1039-
bounds_accumulator,
1026+
build_accumulator,
10401027
self.mode,
10411028
)))
10421029
}
@@ -1197,7 +1184,7 @@ impl ExecutionPlan for HashJoinExec {
11971184
cache: self.cache.clone(),
11981185
dynamic_filter: Some(HashJoinExecDynamicFilter {
11991186
filter: dynamic_filter,
1200-
bounds_accumulator: OnceLock::new(),
1187+
build_accumulator: OnceLock::new(),
12011188
}),
12021189
});
12031190
result = result.with_updated_node(new_node as Arc<dyn ExecutionPlan>);
@@ -1346,7 +1333,7 @@ impl BuildSideState {
13461333
/// When `should_compute_bounds` is true, this function computes the min/max bounds
13471334
/// for each join key column but does NOT update the dynamic filter. Instead, the
13481335
/// bounds are stored in the returned `JoinLeftData` and later coordinated by
1349-
/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds
1336+
/// `SharedBuildAccumulator` to ensure all partitions contribute their bounds
13501337
/// before updating the filter exactly once.
13511338
///
13521339
/// # Returns
@@ -1417,6 +1404,7 @@ async fn collect_left_input(
14171404

14181405
// Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the
14191406
// `u64` indice variant
1407+
// Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown
14201408
let mut hashmap: Box<dyn JoinHashMapType> = if num_rows > u32::MAX as usize {
14211409
let estimated_hashtable_size =
14221410
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?;
@@ -1452,15 +1440,15 @@ async fn collect_left_input(
14521440
offset += batch.num_rows();
14531441
}
14541442
// Merge all batches into a single batch, so we can directly index into the arrays
1455-
let single_batch = concat_batches(&schema, batches_iter)?;
1443+
let batch = concat_batches(&schema, batches_iter)?;
14561444

14571445
// Reserve additional memory for visited indices bitmap and create shared builder
14581446
let visited_indices_bitmap = if with_visited_indices_bitmap {
1459-
let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8);
1447+
let bitmap_size = bit_util::ceil(batch.num_rows(), 8);
14601448
reservation.try_grow(bitmap_size)?;
14611449
metrics.build_mem_used.add(bitmap_size);
14621450

1463-
let mut bitmap_buffer = BooleanBufferBuilder::new(single_batch.num_rows());
1451+
let mut bitmap_buffer = BooleanBufferBuilder::new(batch.num_rows());
14641452
bitmap_buffer.append_n(num_rows, false);
14651453
bitmap_buffer
14661454
} else {
@@ -1469,10 +1457,7 @@ async fn collect_left_input(
14691457

14701458
let left_values = on_left
14711459
.iter()
1472-
.map(|c| {
1473-
c.evaluate(&single_batch)?
1474-
.into_array(single_batch.num_rows())
1475-
})
1460+
.map(|c| c.evaluate(&batch)?.into_array(batch.num_rows()))
14761461
.collect::<Result<Vec<_>>>()?;
14771462

14781463
// Compute bounds for dynamic filter if enabled
@@ -1482,20 +1467,23 @@ async fn collect_left_input(
14821467
.into_iter()
14831468
.map(CollectLeftAccumulator::evaluate)
14841469
.collect::<Result<Vec<_>>>()?;
1485-
Some(bounds)
1470+
Some(PartitionBounds::new(bounds))
14861471
}
14871472
_ => None,
14881473
};
14891474

1490-
let data = JoinLeftData::new(
1491-
hashmap,
1492-
single_batch,
1493-
left_values.clone(),
1494-
Mutex::new(visited_indices_bitmap),
1495-
AtomicUsize::new(probe_threads_count),
1496-
reservation,
1475+
// Convert Box to Arc for sharing with SharedBuildAccumulator
1476+
let hash_map: Arc<dyn JoinHashMapType> = hashmap.into();
1477+
1478+
let data = JoinLeftData {
1479+
hash_map,
1480+
batch,
1481+
values: left_values,
1482+
visited_indices_bitmap: Mutex::new(visited_indices_bitmap),
1483+
probe_threads_counter: AtomicUsize::new(probe_threads_count),
1484+
_reservation: reservation,
14971485
bounds,
1498-
);
1486+
};
14991487

15001488
Ok(data)
15011489
}

datafusion/physical-plan/src/joins/hash_join/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@
2020
pub use exec::HashJoinExec;
2121

2222
mod exec;
23+
mod partitioned_hash_eval;
2324
mod shared_bounds;
2425
mod stream;

0 commit comments

Comments
 (0)