Skip to content

Commit 332179c

Browse files
committed
feat(optimize): add max_temp_directory_size parameter for z-order and compact operations for DataFusion
Signed-off-by: Florian Valeye <[email protected]>
1 parent acd75d6 commit 332179c

File tree

5 files changed

+97
-5
lines changed

5 files changed

+97
-5
lines changed

crates/core/src/operations/optimize.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use arrow_array::RecordBatch;
2929
use arrow_schema::SchemaRef as ArrowSchemaRef;
3030
use datafusion::catalog::Session;
3131
use datafusion::execution::context::SessionState;
32+
use datafusion::execution::disk_manager::DiskManagerBuilder;
3233
use datafusion::execution::memory_pool::FairSpillPool;
3334
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3435
use datafusion::execution::SessionStateBuilder;
@@ -217,6 +218,8 @@ pub struct OptimizeBuilder<'a> {
217218
max_concurrent_tasks: usize,
218219
/// Maximum number of bytes allowed in memory before spilling to disk
219220
max_spill_size: usize,
221+
/// Maximum size of temporary directory for spilling (default 100GB)
222+
max_temp_directory_size: u64,
220223
/// Optimize type
221224
optimize_type: OptimizeType,
222225
/// Datafusion session state relevant for executing the input plan
@@ -247,6 +250,7 @@ impl<'a> OptimizeBuilder<'a> {
247250
preserve_insertion_order: false,
248251
max_concurrent_tasks: num_cpus::get(),
249252
max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB.
253+
max_temp_directory_size: 100 * 1024 * 1024 * 1024, // 100 GB.
250254
optimize_type: OptimizeType::Compact,
251255
min_commit_interval: None,
252256
session: None,
@@ -306,6 +310,16 @@ impl<'a> OptimizeBuilder<'a> {
306310
self
307311
}
308312

313+
/// Max temporary directory size for spilling
314+
#[deprecated(
315+
since = "0.29.0",
316+
note = "Pass in a `SessionState` configured with a `RuntimeEnv` and a `DiskManager`"
317+
)]
318+
pub fn with_max_temp_directory_size(mut self, max_temp_directory_size: u64) -> Self {
319+
self.max_temp_directory_size = max_temp_directory_size;
320+
self
321+
}
322+
309323
/// Min commit interval
310324
pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self {
311325
self.min_commit_interval = Some(min_commit_interval);
@@ -351,8 +365,11 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
351365
.and_then(|session| session.as_any().downcast_ref::<SessionState>().cloned())
352366
.unwrap_or_else(|| {
353367
let memory_pool = FairSpillPool::new(this.max_spill_size);
368+
let disk_manager_builder = DiskManagerBuilder::default()
369+
.with_max_temp_directory_size(this.max_temp_directory_size);
354370
let runtime = RuntimeEnvBuilder::new()
355371
.with_memory_pool(Arc::new(memory_pool))
372+
.with_disk_manager_builder(disk_manager_builder)
356373
.build_arc()
357374
.unwrap();
358375
SessionStateBuilder::new()

python/deltalake/_internal.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ class RawDeltaTable:
117117
partition_filters: PartitionFilterType | None,
118118
target_size: int | None,
119119
max_concurrent_tasks: int | None,
120+
max_spill_size: int | None,
121+
max_temp_directory_size: int | None,
120122
min_commit_interval: int | None,
121123
writer_properties: WriterProperties | None,
122124
commit_properties: CommitProperties | None,
@@ -129,6 +131,7 @@ class RawDeltaTable:
129131
target_size: int | None,
130132
max_concurrent_tasks: int | None,
131133
max_spill_size: int | None,
134+
max_temp_directory_size: int | None,
132135
min_commit_interval: int | None,
133136
writer_properties: WriterProperties | None,
134137
commit_properties: CommitProperties | None,

python/deltalake/table.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1934,6 +1934,8 @@ def compact(
19341934
partition_filters: FilterConjunctionType | None = None,
19351935
target_size: int | None = None,
19361936
max_concurrent_tasks: int | None = None,
1937+
max_spill_size: int | None = None,
1938+
max_temp_directory_size: int | None = None,
19371939
min_commit_interval: int | timedelta | None = None,
19381940
writer_properties: WriterProperties | None = None,
19391941
post_commithook_properties: PostCommitHookProperties | None = None,
@@ -1956,6 +1958,8 @@ def compact(
19561958
max_concurrent_tasks: the maximum number of concurrent tasks to use for
19571959
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
19581960
faster, but will also use more memory.
1961+
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. Defaults to 20GB if not specified.
1962+
max_temp_directory_size: the maximum size of temporary directory for spilling. Defaults to 100GB if not specified.
19591963
min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
19601964
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
19611965
want a commit per partition.
@@ -1989,6 +1993,8 @@ def compact(
19891993
self.table._stringify_partition_values(partition_filters),
19901994
target_size,
19911995
max_concurrent_tasks,
1996+
max_spill_size,
1997+
max_temp_directory_size,
19921998
min_commit_interval,
19931999
writer_properties,
19942000
commit_properties,
@@ -2003,7 +2009,8 @@ def z_order(
20032009
partition_filters: FilterConjunctionType | None = None,
20042010
target_size: int | None = None,
20052011
max_concurrent_tasks: int | None = None,
2006-
max_spill_size: int = 20 * 1024 * 1024 * 1024,
2012+
max_spill_size: int | None = None,
2013+
max_temp_directory_size: int | None = None,
20072014
min_commit_interval: int | timedelta | None = None,
20082015
writer_properties: WriterProperties | None = None,
20092016
post_commithook_properties: PostCommitHookProperties | None = None,
@@ -2023,7 +2030,8 @@ def z_order(
20232030
max_concurrent_tasks: the maximum number of concurrent tasks to use for
20242031
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
20252032
faster, but will also use more memory.
2026-
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. Defaults to 20GB.
2033+
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. Defaults to 20GB if not specified.
2034+
max_temp_directory_size: the maximum size of temporary directory for spilling. Defaults to 100GB if not specified.
20272035
min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
20282036
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
20292037
want a commit per partition.
@@ -2059,6 +2067,7 @@ def z_order(
20592067
target_size,
20602068
max_concurrent_tasks,
20612069
max_spill_size,
2070+
max_temp_directory_size,
20622071
min_commit_interval,
20632072
writer_properties,
20642073
commit_properties,

python/src/lib.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,8 @@ impl RawDeltaTable {
592592
partition_filters = None,
593593
target_size = None,
594594
max_concurrent_tasks = None,
595+
max_spill_size = None,
596+
max_temp_directory_size = None,
595597
min_commit_interval = None,
596598
writer_properties=None,
597599
commit_properties=None,
@@ -604,6 +606,8 @@ impl RawDeltaTable {
604606
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
605607
target_size: Option<u64>,
606608
max_concurrent_tasks: Option<usize>,
609+
max_spill_size: Option<usize>,
610+
max_temp_directory_size: Option<u64>,
607611
min_commit_interval: Option<u64>,
608612
writer_properties: Option<PyWriterProperties>,
609613
commit_properties: Option<PyCommitProperties>,
@@ -615,6 +619,12 @@ impl RawDeltaTable {
615619
if let Some(size) = target_size {
616620
cmd = cmd.with_target_size(size);
617621
}
622+
if let Some(spill_size) = max_spill_size {
623+
cmd = cmd.with_max_spill_size(spill_size);
624+
}
625+
if let Some(directory_size) = max_temp_directory_size {
626+
cmd = cmd.with_max_temp_directory_size(directory_size);
627+
}
618628
if let Some(commit_interval) = min_commit_interval {
619629
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
620630
}
@@ -654,7 +664,8 @@ impl RawDeltaTable {
654664
partition_filters = None,
655665
target_size = None,
656666
max_concurrent_tasks = None,
657-
max_spill_size = 20 * 1024 * 1024 * 1024,
667+
max_spill_size = None,
668+
max_temp_directory_size = None,
658669
min_commit_interval = None,
659670
writer_properties=None,
660671
commit_properties=None,
@@ -666,7 +677,8 @@ impl RawDeltaTable {
666677
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
667678
target_size: Option<u64>,
668679
max_concurrent_tasks: Option<usize>,
669-
max_spill_size: usize,
680+
max_spill_size: Option<usize>,
681+
max_temp_directory_size: Option<u64>,
670682
min_commit_interval: Option<u64>,
671683
writer_properties: Option<PyWriterProperties>,
672684
commit_properties: Option<PyCommitProperties>,
@@ -675,11 +687,16 @@ impl RawDeltaTable {
675687
let (table, metrics) = py.allow_threads(|| {
676688
let mut cmd = OptimizeBuilder::new(self.log_store()?, self.cloned_state()?)
677689
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
678-
.with_max_spill_size(max_spill_size)
679690
.with_type(OptimizeType::ZOrder(z_order_columns));
680691
if let Some(size) = target_size {
681692
cmd = cmd.with_target_size(size);
682693
}
694+
if let Some(spill_size) = max_spill_size {
695+
cmd = cmd.with_max_spill_size(spill_size);
696+
}
697+
if let Some(directory_size) = max_temp_directory_size {
698+
cmd = cmd.with_max_temp_directory_size(directory_size);
699+
}
683700
if let Some(commit_interval) = min_commit_interval {
684701
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
685702
}

python/tests/test_optimize.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,3 +266,49 @@ def test_optimize_schema_evolved_3185(tmp_path):
266266
assert dt.version() == 2
267267
last_action = dt.history(1)[0]
268268
assert last_action["operation"] == "OPTIMIZE"
269+
270+
271+
def test_compact_with_spill_parameters(
272+
tmp_path: pathlib.Path,
273+
sample_table: Table,
274+
):
275+
write_deltalake(tmp_path, sample_table, mode="append")
276+
write_deltalake(tmp_path, sample_table, mode="append")
277+
write_deltalake(tmp_path, sample_table, mode="append")
278+
279+
dt = DeltaTable(tmp_path)
280+
old_version = dt.version()
281+
old_num_files = len(dt.file_uris())
282+
283+
dt.optimize.compact(
284+
max_spill_size=100 * 1024 * 1024 * 1024, # 100 GB
285+
max_temp_directory_size=500 * 1024 * 1024 * 1024, # 500 GB
286+
)
287+
288+
last_action = dt.history(1)[0]
289+
assert last_action["operation"] == "OPTIMIZE"
290+
assert dt.version() == old_version + 1
291+
assert len(dt.file_uris()) <= old_num_files
292+
293+
294+
def test_z_order_with_spill_parameters(
295+
tmp_path: pathlib.Path,
296+
sample_table: Table,
297+
):
298+
write_deltalake(tmp_path, sample_table, mode="append")
299+
write_deltalake(tmp_path, sample_table, mode="append")
300+
write_deltalake(tmp_path, sample_table, mode="append")
301+
302+
dt = DeltaTable(tmp_path)
303+
old_version = dt.version()
304+
305+
dt.optimize.z_order(
306+
columns=["sold", "price"],
307+
max_spill_size=100 * 1024 * 1024 * 1024, # 100 GB
308+
max_temp_directory_size=500 * 1024 * 1024 * 1024, # 500 GB
309+
)
310+
311+
last_action = dt.history(1)[0]
312+
assert last_action["operation"] == "OPTIMIZE"
313+
assert dt.version() == old_version + 1
314+
assert len(dt.file_uris()) == 1

0 commit comments

Comments
 (0)