From abd1932c6d6c7188ce1744d0a35c9542441fc2af Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Sun, 16 Nov 2025 21:52:13 -0800 Subject: [PATCH 1/3] Add method to shallow clone table Signed-off-by: Corwin Joy --- crates/core/src/operations/clone.rs | 362 ++++++++++++++++++++++++++++ crates/core/src/operations/mod.rs | 12 +- python/deltalake/_internal.pyi | 1 + python/deltalake/table.py | 43 ++++ python/src/lib.rs | 32 +++ python/tests/test_shallow_clone.py | 45 ++++ 6 files changed, 493 insertions(+), 2 deletions(-) create mode 100644 crates/core/src/operations/clone.rs create mode 100644 python/tests/test_shallow_clone.py diff --git a/crates/core/src/operations/clone.rs b/crates/core/src/operations/clone.rs new file mode 100644 index 0000000000..c44cdb7d82 --- /dev/null +++ b/crates/core/src/operations/clone.rs @@ -0,0 +1,362 @@ +//! Clone a Delta table from a source location into a target location by +//! creating a new table at the target and registering the source table's files. +use futures::TryStreamExt; +use std::path::Path; +use url::Url; + +use crate::kernel::transaction::CommitBuilder; +use crate::kernel::{resolve_snapshot, Action, EagerSnapshot}; +use crate::logstore::LogStoreRef; +use crate::operations::create::CreateBuilder; +use crate::protocol::{DeltaOperation, SaveMode}; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; + +/// Builder for performing a shallow clone of a Delta table. +/// +/// Construct via `DeltaOps::clone_table()` and configure using the builder methods. +pub struct CloneBuilder { + /// Source table log store + log_store: LogStoreRef, + /// Optional snapshot representing the active state/version of the source table + snapshot: Option, + /// Target table location + target: Option, +} + +impl CloneBuilder { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { + Self { + log_store, + snapshot, + target: None, + } + } + + /// Set the target table location (must be a `file://` URL). + pub fn with_target(mut self, target: Url) -> Self { + self.target = Some(target); + self + } +} + +impl std::future::IntoFuture for CloneBuilder { + type Output = DeltaResult; + type IntoFuture = std::pin::Pin + Send>>; + + fn into_future(self) -> Self::IntoFuture { + Box::pin(async move { + let target = self + .target + .ok_or_else(|| DeltaTableError::Generic("CloneBuilder missing target".into()))?; + shallow_clone(self.log_store, self.snapshot, target).await + }) + } +} + +/// Shallow clone a Delta table from a source location to a target location. +/// This function creates a new Delta table at the target location that +/// references the same data files as the source table, +/// without copying the actual data files. +/// +/// # Arguments +/// * `target` - The URL where the cloned Delta table will be created. Must be a `file://` URL. +/// +/// The source table is the active table referenced by the `DeltaOps` that constructed this builder. +/// +/// # Returns +/// Returns a [`DeltaResult`]. On success, contains the cloned [`DeltaTable`] instance. +/// On error, returns a [`DeltaTableError`] describing the failure. +/// In the event of an error, the target directory may contain partial data and files. +/// +/// # Errors +/// This function returns an error if: +/// - `target` URL is not a `file://` URL. +/// - The source table cannot be loaded. +/// - The target table cannot be created. +/// - File path conversion fails. +/// - Symlink creation fails. +/// +/// # Example +/// ``` +/// use url::Url; +/// use deltalake_core::DeltaOps; +/// # async fn shallow_clone_example() -> Result<(), deltalake_core::DeltaTableError> { +/// let source = Url::parse("file:///path/to/source_table")?; +/// let target = Url::parse("file:///path/to/target_table")?; +/// let ops = DeltaOps::try_from_uri(source).await?; +/// let table = ops +/// .clone_table() +/// .with_target(target) +/// .await?; +/// # Ok(()) +/// # } +/// ``` + +async fn shallow_clone( + log_store: LogStoreRef, + snapshot: Option, + target: Url, +) -> DeltaResult { + // Validate that source and target are both filesystem Urls. If not, return an error. + // We need this because we use symlinks to create the target files. + // We hope to replace this once delta-rs supports absolute paths. + let src_url = log_store.config().location.clone(); + if src_url.scheme() != "file" || target.scheme() != "file" { + return Err(DeltaTableError::InvalidTableLocation(format!( + "shallow_clone() requires file:// URLs for both source and target; got source='{}' (scheme='{}'), target='{}' (scheme='{}')", + src_url, + src_url.scheme(), + target, + target.scheme() + ))); + } + + // 1) Resolve source snapshot from provided state or active version in DeltaOps + let src_snapshot: EagerSnapshot = resolve_snapshot(log_store.as_ref(), snapshot, true).await?; + let src_metadata = src_snapshot.metadata().clone(); + let src_schema = src_metadata.parse_schema()?; + let partition_columns = src_metadata.partition_columns().to_vec(); + let configuration = src_metadata.configuration().clone(); + let src_protocol = src_snapshot.protocol().clone(); + let src_log = log_store; + + // 2) Create a target table mirroring source metadata and protocol + let mut create = CreateBuilder::new() + .with_location(target.as_ref().to_string()) + .with_columns(src_schema.fields().cloned()) + .with_partition_columns(partition_columns.clone()) + .with_configuration( + configuration + .iter() + .map(|(k, v)| (k.clone(), Some(v.clone()))), + ); + + if let Some(n) = src_metadata.name() { + create = create.with_table_name(n.to_string()); + } + if let Some(d) = src_metadata.description() { + create = create.with_comment(d.to_string()); + } + + let mut target_table = create + .with_actions([Action::Protocol(src_protocol.clone())]) + .await?; + + // 3) Gather source files from src_snapshot + let file_views: Vec<_> = src_snapshot + .file_views(&src_log, None) + .try_collect() + .await?; + + let mut actions = Vec::with_capacity(file_views.len() + 1); + actions.push(Action::Metadata(src_metadata)); + + // 4) Add files to the target table + let target_root_path = target_table + .log_store() + .config() + .location + .to_file_path() + .map_err(|_| { + DeltaTableError::InvalidTableLocation(format!( + "Failed to convert target URL to file path: {}", + target.as_ref() + )) + })?; + + let source_root_path = src_log.config().location.to_file_path().map_err(|_| { + DeltaTableError::InvalidTableLocation(format!( + "Failed to convert source URL to file path: {}", + src_url.as_ref() + )) + })?; + + for view in file_views { + let mut add = view.add_action(); + add.data_change = true; + // Absolute paths are not supported for now, create symlinks instead. + add_symlink( + source_root_path.as_path(), + target_root_path.as_path(), + &add.path, + )?; + actions.push(Action::Add(add)); + } + + // 5) Commit ADD operations + let operation = DeltaOperation::Write { + mode: SaveMode::Append, + partition_by: if partition_columns.is_empty() { + None + } else { + Some(partition_columns) + }, + predicate: None, + }; + + let target_snapshot = target_table.snapshot()?.snapshot().clone(); + let prepared_commit = CommitBuilder::default() + .with_actions(actions) + .build(Some(&target_snapshot), target_table.log_store(), operation) + .into_prepared_commit_future() + .await?; + + let log_store = target_table.log_store(); + let commit_version = target_snapshot.version() + 1; + let commit_bytes = prepared_commit.commit_or_bytes(); + let operation_id = uuid::Uuid::new_v4(); + + log_store + .write_commit_entry(commit_version, commit_bytes.clone(), operation_id) + .await?; + + target_table.update().await?; + Ok(target_table) +} + +fn add_symlink(source_root: &Path, target_root: &Path, add_filename: &str) -> std::io::Result<()> { + let file_name = Path::new(add_filename); + let src_path = source_root.join(file_name); + let link_path = target_root.join(file_name); + + // Create parent directories if needed + if let Some(parent) = link_path.parent() { + std::fs::create_dir_all(parent)?; + } + + // Best-effort symlink creation: only when both source and target are local filesystems. + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::symlink_file; + symlink_file(&src_path, &link_path)?; + } + #[cfg(target_family = "unix")] + { + use std::os::unix::fs::symlink; + symlink(&src_path, &link_path)?; + } + Ok(()) +} + +#[cfg(all(test, feature = "datafusion"))] +mod tests { + use super::*; + use crate::operations::collect_sendable_stream; + use crate::DeltaOps; + use crate::DeltaTableBuilder; + use arrow::array::RecordBatch; + use datafusion::assert_batches_sorted_eq; + use datafusion::common::test_util::format_batches; + use tracing::debug; + + #[tokio::test] + async fn test_non_file_url_rejected() { + let target = Url::parse("file:///tmp/target").unwrap(); + // Using an in-memory ops means the source is memory:// which is not file:// and should be rejected + let ops = DeltaOps::new_in_memory(); + let result = ops.clone_table().with_target(target).await; + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + DeltaTableError::InvalidTableLocation(_) + )); + } + #[tokio::test] + async fn test_simple_table_clone_with_version() -> DeltaResult<()> { + let source_path = Path::new("../test/tests/data/simple_table"); + let version = Some(2); + test_shallow_clone(source_path, version).await + } + + #[tokio::test] + async fn test_simple_table_clone_no_version() -> DeltaResult<()> { + let source_path = Path::new("../test/tests/data/simple_table"); + let version = None; + test_shallow_clone(source_path, version).await + } + + #[tokio::test] + async fn test_table_with_simple_partition() -> DeltaResult<()> { + // This partition test ensures that directories are created as needed by symlink creation. + let source_path = Path::new("../test/tests/data/delta-0.8.0-partitioned"); + let version = None; + test_shallow_clone(source_path, version).await + } + + // For now, deletion vectors are not supported in shallow clones. + // This gives the error + // Error: Transaction { source: UnsupportedReaderFeatures([DeletionVectors]) } + #[ignore] + #[tokio::test] + async fn test_deletion_vector() -> DeltaResult<()> { + let source_path = Path::new("../test/tests/data/table-with-dv-small"); + let version = None; + test_shallow_clone(source_path, version).await + } + + async fn test_shallow_clone(source_path: &Path, maybe_version: Option) -> DeltaResult<()> { + let source_uri = Url::from_directory_path(std::fs::canonicalize(source_path)?).unwrap(); + let clone_path = tempfile::TempDir::new()?.path().to_owned(); + let clone_uri = Url::from_directory_path(clone_path).unwrap(); + + let mut ops = DeltaOps::try_from_uri(source_uri.clone()).await?; + if let Some(v) = maybe_version { + ops.0.load_version(v).await?; + } + let cloned_table = ops.clone_table().with_target(clone_uri.clone()).await?; + + let mut source_table = DeltaTableBuilder::from_uri(source_uri.clone())? + .load() + .await?; + if let Some(version) = maybe_version { + source_table.load_version(version).await?; + } + + let src_uris: Vec<_> = source_table.get_file_uris()?.collect(); + let cloned_uris: Vec<_> = cloned_table.get_file_uris()?.collect(); + + let mut src_files: Vec = src_uris + .into_iter() + .filter_map(|url| Some(String::from(Path::new(&url).file_name()?.to_str()?))) + .collect(); + let mut cloned_files: Vec = cloned_uris + .into_iter() + .filter_map(|url| Some(String::from(Path::new(&url).file_name()?.to_str()?))) + .collect(); + + src_files.sort(); + cloned_files.sort(); + debug!("Source files: {:#?}", src_files); + debug!("Cloned files: {:#?}", cloned_files); + assert_eq!( + src_files, cloned_files, + "Cloned table should reference the same files as the source" + ); + + let cloned_ops = DeltaOps::try_from_uri(clone_uri).await?; + let (_table, stream) = cloned_ops.load().await?; + let cloned_data: Vec = collect_sendable_stream(stream).await?; + + let pretty_cloned_data = format_batches(&*cloned_data)?.to_string(); + debug!(""); + debug!("Cloned data:"); + debug!("{pretty_cloned_data}"); + + let mut src_ops = DeltaOps::try_from_uri(source_uri).await?; + if let Some(version) = maybe_version { + src_ops.0.load_version(version).await?; + } + let (_table, stream) = src_ops.load().await?; + let source_data: Vec = collect_sendable_stream(stream).await?; + + let expected_lines = format_batches(&*source_data)?.to_string(); + let expected_lines_vec: Vec<&str> = expected_lines.trim().lines().collect(); + + assert_batches_sorted_eq!(&expected_lines_vec, &cloned_data); + + debug!(""); + debug!("Source data:"); + debug!("{expected_lines}"); + Ok(()) + } +} diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index e122ce5584..8eece996fb 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -19,8 +19,8 @@ use url::Url; use uuid::Uuid; use self::{ - add_column::AddColumnBuilder, add_feature::AddTableFeatureBuilder, create::CreateBuilder, - filesystem_check::FileSystemCheckBuilder, restore::RestoreBuilder, + add_column::AddColumnBuilder, add_feature::AddTableFeatureBuilder, clone::CloneBuilder, + create::CreateBuilder, filesystem_check::FileSystemCheckBuilder, restore::RestoreBuilder, set_tbl_properties::SetTablePropertiesBuilder, update_field_metadata::UpdateFieldMetadataBuilder, update_table_metadata::UpdateTableMetadataBuilder, vacuum::VacuumBuilder, @@ -40,6 +40,7 @@ use crate::DeltaTable; pub mod add_column; pub mod add_feature; +pub mod clone; pub mod convert_to_delta; pub mod create; pub mod drop_constraints; @@ -229,6 +230,13 @@ impl DeltaOps { GenerateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } + /// Shallow-clone a table from a source location into a target location via builder API + /// + /// Construct a [`CloneBuilder`], then set `target`. The source is the active table of this `DeltaOps`. + pub fn clone_table(self) -> CloneBuilder { + CloneBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) + } + /// Load data from a DeltaTable #[cfg(feature = "datafusion")] #[must_use] diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 22a1409260..b7143d9e3b 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -84,6 +84,7 @@ class RawDeltaTable: data_catalog_id: str | None = None, catalog_options: dict[str, str] | None = None, ) -> str: ... + def shallow_clone(self, table_uri: str) -> RawDeltaTable: ... @staticmethod def is_deltatable( table_uri: str, storage_options: dict[str, str] | None diff --git a/python/deltalake/table.py b/python/deltalake/table.py index d6cc196eb9..2f4b6940a4 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -267,6 +267,49 @@ def create( return cls(table_uri=table_uri, storage_options=storage_options) + def shallow_clone(self, target_uri: str | Path | os.PathLike[str]) -> "DeltaTable": + """ + Create a shallow clone of this Delta table at the given target location and + return a new `DeltaTable` instance for the cloned table. + + The shallow clone references the same data files as the source table and does + not copy data. Only metadata is created at the target table to reference the + existing data files. + + Limitations: + - Currently, both source and target locations must be `file://` URLs. + + Args: + target_uri: Destination URI for the cloned table. Can be a string path, + a `pathlib.Path`, or any `os.PathLike`. + + Returns: + DeltaTable: a new `DeltaTable` instance pointing at the cloned table located + at `target_uri`. + """ + # Normalize target_uri to a string without requiring a runtime import of os + target_str = target_uri + if not isinstance(target_str, str): + target_str = str(target_uri) + + # Execute the shallow clone via the Rust bindings; a new RawDeltaTable is returned + cloned_raw = self._table.shallow_clone(target_str) + + # Build a high-level DeltaTable for the target. We reuse storage options if possible. + # Note: constructing a new DeltaTable will load table state; alternatively we could + # set the internal _table, but construction keeps behavior consistent with other APIs. + new_dt = DeltaTable( + table_uri=target_str, + storage_options=self._storage_options, + ) + # Replace with already cloned raw table to avoid an extra reload when possible + try: + new_dt._table = cloned_raw + except Exception: + # Fallback: leave as loaded by the constructor + pass + return new_dt + def version(self) -> int: """ Get the version of the DeltaTable. diff --git a/python/src/lib.rs b/python/src/lib.rs index 8dc9158bfa..321f7d8e27 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -204,6 +204,38 @@ impl RawDeltaTable { }) } + /// Shallow-clone this table to a target location, returning a new RawDeltaTable for the target. + /// + /// The clone references the same data files as the source table and does not copy data files. + /// + /// Limitations: currently only `file://` URLs are supported for both source and target. + #[pyo3(signature = (target_uri))] + pub fn shallow_clone(&self, py: Python, target_uri: String) -> PyResult { + let options = self._config.options.clone(); + py.allow_threads(|| { + let table = self._table.lock().map_err(to_rt_err)?.clone(); + // Ensure target is a proper table URL + let target_url = deltalake::table::builder::ensure_table_uri(&target_uri) + .map_err(PythonError::from)?; + + let cmd = DeltaOps(table) + .clone_table() + .with_target(target_url); + + let target_table = rt() + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + + Ok(RawDeltaTable { + _table: Arc::new(Mutex::new(target_table)), + _config: FsConfig { + root_url: target_uri, + options, + }, + }) + }) + } + #[pyo3(signature = (table_uri, storage_options = None))] #[staticmethod] pub fn is_deltatable( diff --git a/python/tests/test_shallow_clone.py b/python/tests/test_shallow_clone.py new file mode 100644 index 0000000000..d90414803a --- /dev/null +++ b/python/tests/test_shallow_clone.py @@ -0,0 +1,45 @@ +import os +import pathlib +from urllib.parse import urlparse + +from arro3.core import Table + +from deltalake import ( + DeltaTable, + write_deltalake, +) + + +def to_names(uris): + result = [] + for u in uris: + p = urlparse(u) + path = p.path if p.scheme else u + result.append(os.path.basename(path)) + return result + + +def test_shallow_clone(tmp_path: pathlib.Path, sample_table: Table): + # Create table and shallow clone + tmp_table_path = tmp_path / "path" / "to" / "table" + tmp_table_clone = tmp_path / "path" / "to" / "clone" + write_deltalake(str(tmp_table_path), sample_table) + delta_table = DeltaTable(str(tmp_table_path)) + delta_clone = delta_table.shallow_clone(str(tmp_table_clone)) + + # Compare metadata + m_tbl = delta_table.metadata() + m_clone = delta_clone.metadata() + assert m_tbl.name == m_clone.name + assert m_tbl.description == m_clone.description + assert m_tbl.configuration == m_clone.configuration + + # Compare only file names because the cloned table has a different base path + src_uris = delta_table.file_uris() + clone_uris = delta_clone.file_uris() + assert to_names(src_uris) == to_names(clone_uris) + + # Compare data + table_data = delta_table.to_pyarrow_table() + clone_data = delta_clone.to_pyarrow_table() + assert table_data == clone_data From f291aedc7ec0a09909aeea0bff5357df7b48e926 Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Sun, 16 Nov 2025 22:26:03 -0800 Subject: [PATCH 2/3] Simplify shallow_clone function in python and correct interface argument name. Signed-off-by: Corwin Joy --- python/deltalake/_internal.pyi | 2 +- python/deltalake/table.py | 24 +++++++----------------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index b7143d9e3b..8c5f72ed6f 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -84,7 +84,7 @@ class RawDeltaTable: data_catalog_id: str | None = None, catalog_options: dict[str, str] | None = None, ) -> str: ... - def shallow_clone(self, table_uri: str) -> RawDeltaTable: ... + def shallow_clone(self, target_uri: str) -> RawDeltaTable: ... @staticmethod def is_deltatable( table_uri: str, storage_options: dict[str, str] | None diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 2f4b6940a4..e65a9f861d 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -287,27 +287,17 @@ def shallow_clone(self, target_uri: str | Path | os.PathLike[str]) -> "DeltaTabl DeltaTable: a new `DeltaTable` instance pointing at the cloned table located at `target_uri`. """ - # Normalize target_uri to a string without requiring a runtime import of os - target_str = target_uri - if not isinstance(target_str, str): - target_str = str(target_uri) + # Normalize target_uri to a string + target_str = str(target_uri) if not isinstance(target_uri, str) else target_uri # Execute the shallow clone via the Rust bindings; a new RawDeltaTable is returned cloned_raw = self._table.shallow_clone(target_str) - # Build a high-level DeltaTable for the target. We reuse storage options if possible. - # Note: constructing a new DeltaTable will load table state; alternatively we could - # set the internal _table, but construction keeps behavior consistent with other APIs. - new_dt = DeltaTable( - table_uri=target_str, - storage_options=self._storage_options, - ) - # Replace with already cloned raw table to avoid an extra reload when possible - try: - new_dt._table = cloned_raw - except Exception: - # Fallback: leave as loaded by the constructor - pass + # Construct DeltaTable directly with the cloned raw table + new_dt = DeltaTable.__new__(DeltaTable) + new_dt._table = cloned_raw + new_dt._storage_options = self._storage_options + return new_dt def version(self) -> int: From 129d41472cb69fba37bddd4240f6583c7653085d Mon Sep 17 00:00:00 2001 From: Corwin Joy Date: Tue, 2 Dec 2025 14:20:30 -0800 Subject: [PATCH 3/3] cargo fmt Signed-off-by: Corwin Joy --- python/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index 321f7d8e27..9f3414355d 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -218,9 +218,7 @@ impl RawDeltaTable { let target_url = deltalake::table::builder::ensure_table_uri(&target_uri) .map_err(PythonError::from)?; - let cmd = DeltaOps(table) - .clone_table() - .with_target(target_url); + let cmd = DeltaOps(table).clone_table().with_target(target_url); let target_table = rt() .block_on(cmd.into_future())