Skip to content

Commit 8508d21

Browse files
committed
version proper field
1 parent 77c139c commit 8508d21

File tree

5 files changed

+389
-46
lines changed

5 files changed

+389
-46
lines changed

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManager.java

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,7 @@ public CommitRange getTableChanges(Engine engine, long startVersion, Optional<Lo
182182
endVersion.ifPresent(v -> checkArgument(v >= 0, "endVersion must be non-negative"));
183183

184184
return loadCommitRangeInternal(
185-
engine,
186-
Optional.of(startVersion),
187-
Optional.empty(),
188-
endVersion,
189-
Optional.empty());
185+
engine, Optional.of(startVersion), Optional.empty(), endVersion, Optional.empty());
190186
}
191187

192188
/**
@@ -210,8 +206,8 @@ public void close() {
210206
/**
211207
* Internal method to load a snapshot at a specific version or timestamp.
212208
*
213-
* <p>This method fetches commits from the catalog adapter, converts them to Kernel's ParsedLogData
214-
* format, and uses TableManager to build the snapshot.
209+
* <p>This method fetches commits from the catalog adapter, converts them to Kernel's
210+
* ParsedLogData format, and uses TableManager to build the snapshot.
215211
*/
216212
private Snapshot loadSnapshotInternal(Optional<Long> versionOpt, Optional<Long> timestampOpt) {
217213
checkArgument(
@@ -243,17 +239,21 @@ private Snapshot loadSnapshotInternal(Optional<Long> versionOpt, Optional<Long>
243239
if (timestampOpt.isPresent()) {
244240
// For timestamp queries, first build the latest snapshot for resolution
245241
Snapshot latestSnapshot =
246-
snapshotBuilder.withLogData(logData).withMaxCatalogVersion(catalogVersion).build(kernelEngine);
247-
snapshotBuilder = TableManager.loadSnapshot(tablePath)
248-
.atTimestamp(timestampOpt.get(), latestSnapshot);
242+
snapshotBuilder
243+
.withLogData(logData)
244+
.withMaxCatalogVersion(catalogVersion)
245+
.build(kernelEngine);
246+
snapshotBuilder =
247+
TableManager.loadSnapshot(tablePath).atTimestamp(timestampOpt.get(), latestSnapshot);
249248
}
250249

251-
return snapshotBuilder.withLogData(logData).withMaxCatalogVersion(catalogVersion).build(kernelEngine);
250+
return snapshotBuilder
251+
.withLogData(logData)
252+
.withMaxCatalogVersion(catalogVersion)
253+
.build(kernelEngine);
252254
}
253255

254-
/**
255-
* Internal method to load a commit range with version or timestamp boundaries.
256-
*/
256+
/** Internal method to load a commit range with version or timestamp boundaries. */
257257
private CommitRange loadCommitRangeInternal(
258258
Engine engine,
259259
Optional<Long> startVersionOpt,
@@ -286,58 +286,56 @@ private CommitRange loadCommitRangeInternal(
286286
CommitRangeBuilder builder = TableManager.loadCommitRange(tablePath);
287287

288288
if (startVersionOpt.isPresent()) {
289-
builder = builder.withStartBoundary(
290-
CommitRangeBuilder.CommitBoundary.atVersion(startVersionOpt.get()));
289+
builder =
290+
builder.withStartBoundary(
291+
CommitRangeBuilder.CommitBoundary.atVersion(startVersionOpt.get()));
291292
}
292293
if (startTimestampOpt.isPresent()) {
293294
Snapshot latestSnapshot = loadLatestSnapshot();
294-
builder = builder.withStartBoundary(
295-
CommitRangeBuilder.CommitBoundary.atTimestamp(startTimestampOpt.get(), latestSnapshot));
295+
builder =
296+
builder.withStartBoundary(
297+
CommitRangeBuilder.CommitBoundary.atTimestamp(
298+
startTimestampOpt.get(), latestSnapshot));
296299
}
297300
if (endVersionOpt.isPresent()) {
298-
builder = builder.withEndBoundary(
299-
CommitRangeBuilder.CommitBoundary.atVersion(endVersionOpt.get()));
301+
builder =
302+
builder.withEndBoundary(CommitRangeBuilder.CommitBoundary.atVersion(endVersionOpt.get()));
300303
}
301304
if (endTimestampOpt.isPresent()) {
302305
Snapshot latestSnapshot = loadLatestSnapshot();
303-
builder = builder.withEndBoundary(
304-
CommitRangeBuilder.CommitBoundary.atTimestamp(endTimestampOpt.get(), latestSnapshot));
306+
builder =
307+
builder.withEndBoundary(
308+
CommitRangeBuilder.CommitBoundary.atTimestamp(endTimestampOpt.get(), latestSnapshot));
305309
}
306310

307311
return builder.withLogData(logData).build(engine);
308312
}
309313

310-
/**
311-
* Converts catalog commits to Kernel's ParsedLogData format.
312-
*/
314+
/** Converts catalog commits to Kernel's ParsedLogData format. */
313315
private List<ParsedLogData> convertToKernelLogData(List<Commit> commits) {
314316
return commits.stream()
315317
.sorted(Comparator.comparingLong(Commit::getVersion))
316-
.map(commit -> ParsedCatalogCommitData.forFileStatus(
317-
hadoopFileStatusToKernelFileStatus(commit.getFileStatus())))
318+
.map(
319+
commit ->
320+
ParsedCatalogCommitData.forFileStatus(
321+
hadoopFileStatusToKernelFileStatus(commit.getFileStatus())))
318322
.collect(Collectors.toList());
319323
}
320324

321-
/**
322-
* Converts Hadoop FileStatus to Kernel FileStatus.
323-
*/
325+
/** Converts Hadoop FileStatus to Kernel FileStatus. */
324326
private static io.delta.kernel.utils.FileStatus hadoopFileStatusToKernelFileStatus(
325327
org.apache.hadoop.fs.FileStatus hadoopFS) {
326328
return io.delta.kernel.utils.FileStatus.of(
327329
hadoopFS.getPath().toString(), hadoopFS.getLen(), hadoopFS.getModificationTime());
328330
}
329331

330-
/**
331-
* Gets the true catalog version, handling the -1 case for newly created tables.
332-
*/
332+
/** Gets the true catalog version, handling the -1 case for newly created tables. */
333333
private long getCatalogVersion(long rawVersion) {
334334
// UC returns -1 when only 0.json exists but hasn't been registered with UC
335335
return rawVersion == -1 ? 0 : rawVersion;
336336
}
337337

338-
/**
339-
* Validates that the requested version exists.
340-
*/
338+
/** Validates that the requested version exists. */
341339
private void validateVersionExists(long version, long maxVersion) {
342340
if (version > maxVersion) {
343341
throw new IllegalArgumentException(
@@ -347,7 +345,8 @@ private void validateVersionExists(long version, long maxVersion) {
347345
}
348346
}
349347

350-
private String getVersionOrTimestampString(Optional<Long> versionOpt, Optional<Long> timestampOpt) {
348+
private String getVersionOrTimestampString(
349+
Optional<Long> versionOpt, Optional<Long> timestampOpt) {
351350
if (versionOpt.isPresent()) {
352351
return "version=" + versionOpt.get();
353352
} else if (timestampOpt.isPresent()) {

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
*
3535
* <p>This adapter is responsible only for fetching commit metadata from Unity Catalog's commit
3636
* coordinator API. It does not contain any Delta/Kernel snapshot building logic - that
37-
* responsibility belongs to the {@link io.delta.kernel.spark.snapshot.CatalogManagedSnapshotManager}
38-
* layer.
37+
* responsibility belongs to the {@link
38+
* io.delta.kernel.spark.snapshot.CatalogManagedSnapshotManager} layer.
3939
*/
4040
public final class UnityCatalogAdapter implements ManagedCatalogAdapter {
4141

kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManagerSuite.scala

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,4 +242,131 @@ class CatalogManagedSnapshotManagerSuite extends AnyFunSuite with UCCatalogManag
242242
manager.close()
243243
}
244244
}
245+
246+
// Time-travel tests for getActiveCommitAtTime
247+
248+
test("getActiveCommitAtTime returns commit at exact timestamp") {
249+
withUCClientAndTestTable { (ucClient, tablePath, _) =>
250+
val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient)
251+
val manager = new CatalogManagedSnapshotManager(
252+
adapter,
253+
testUcTableId,
254+
tablePath,
255+
new Configuration())
256+
257+
try {
258+
// Timestamp at v1 should return version 1
259+
val commit = manager.getActiveCommitAtTime(
260+
v1Ts,
261+
/* canReturnLastCommit = */ false,
262+
/* mustBeRecreatable = */ true,
263+
/* canReturnEarliestCommit = */ false)
264+
265+
assert(commit != null, "Commit should not be null")
266+
assert(commit.getVersion == 1L, s"Expected version 1, got ${commit.getVersion}")
267+
} finally {
268+
manager.close()
269+
}
270+
}
271+
}
272+
273+
test("getActiveCommitAtTime returns latest when canReturnLastCommit is true") {
274+
withUCClientAndTestTable { (ucClient, tablePath, maxRatifiedVersion) =>
275+
val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient)
276+
val manager = new CatalogManagedSnapshotManager(
277+
adapter,
278+
testUcTableId,
279+
tablePath,
280+
new Configuration())
281+
282+
try {
283+
// Timestamp after all commits with canReturnLastCommit=true
284+
val futureTs = v2Ts + 1000000L
285+
val commit = manager.getActiveCommitAtTime(
286+
futureTs,
287+
/* canReturnLastCommit = */ true,
288+
/* mustBeRecreatable = */ true,
289+
/* canReturnEarliestCommit = */ false)
290+
291+
assert(commit != null, "Commit should not be null")
292+
assert(commit.getVersion == maxRatifiedVersion,
293+
s"Expected version $maxRatifiedVersion, got ${commit.getVersion}")
294+
} finally {
295+
manager.close()
296+
}
297+
}
298+
}
299+
300+
test("getActiveCommitAtTime returns earliest when canReturnEarliestCommit is true") {
301+
withUCClientAndTestTable { (ucClient, tablePath, _) =>
302+
val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient)
303+
val manager = new CatalogManagedSnapshotManager(
304+
adapter,
305+
testUcTableId,
306+
tablePath,
307+
new Configuration())
308+
309+
try {
310+
// Timestamp before all commits with canReturnEarliestCommit=true
311+
val pastTs = v0Ts - 1000000L
312+
val commit = manager.getActiveCommitAtTime(
313+
pastTs,
314+
/* canReturnLastCommit = */ false,
315+
/* mustBeRecreatable = */ true,
316+
/* canReturnEarliestCommit = */ true)
317+
318+
assert(commit != null, "Commit should not be null")
319+
// Should return the earliest available version
320+
assert(commit.getVersion >= 0L, s"Expected version >= 0, got ${commit.getVersion}")
321+
} finally {
322+
manager.close()
323+
}
324+
}
325+
}
326+
327+
test("getActiveCommitAtTime returns commit between versions") {
328+
withUCClientAndTestTable { (ucClient, tablePath, _) =>
329+
val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient)
330+
val manager = new CatalogManagedSnapshotManager(
331+
adapter,
332+
testUcTableId,
333+
tablePath,
334+
new Configuration())
335+
336+
try {
337+
// Timestamp between v1 and v2 should return v1
338+
val betweenTs = (v1Ts + v2Ts) / 2
339+
val commit = manager.getActiveCommitAtTime(
340+
betweenTs,
341+
/* canReturnLastCommit = */ false,
342+
/* mustBeRecreatable = */ true,
343+
/* canReturnEarliestCommit = */ false)
344+
345+
assert(commit != null, "Commit should not be null")
346+
assert(commit.getVersion == 1L, s"Expected version 1, got ${commit.getVersion}")
347+
} finally {
348+
manager.close()
349+
}
350+
}
351+
}
352+
353+
test("loadSnapshotAt loads version 0") {
354+
withUCClientAndTestTable { (ucClient, tablePath, _) =>
355+
val adapter = new UnityCatalogAdapter(testUcTableId, tablePath, ucClient)
356+
val manager = new CatalogManagedSnapshotManager(
357+
adapter,
358+
testUcTableId,
359+
tablePath,
360+
new Configuration())
361+
362+
try {
363+
val snapshot = manager.loadSnapshotAt(0L)
364+
365+
assert(snapshot != null, "Snapshot should not be null")
366+
assert(snapshot.getVersion == 0L, "Should load version 0")
367+
} finally {
368+
manager.close()
369+
}
370+
}
371+
}
245372
}

kernel-spark/src/test/scala/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactorySuite.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,52 @@ class DeltaSnapshotManagerFactorySuite extends AnyFunSuite {
7575
spark.stop()
7676
}
7777
}
78+
79+
// Null parameter validation tests
80+
81+
test("fromPath throws on null tablePath") {
82+
assertThrows[NullPointerException] {
83+
DeltaSnapshotManagerFactory.fromPath(null, new Configuration())
84+
}
85+
}
86+
87+
test("fromPath throws on null hadoopConf") {
88+
assertThrows[NullPointerException] {
89+
DeltaSnapshotManagerFactory.fromPath("/tmp/test", null)
90+
}
91+
}
92+
93+
test("fromCatalogTable throws on null catalogTable") {
94+
val spark = SparkSession.builder().master("local[1]").appName("factory-null-table").getOrCreate()
95+
try {
96+
assertThrows[NullPointerException] {
97+
DeltaSnapshotManagerFactory.fromCatalogTable(null, spark, new Configuration())
98+
}
99+
} finally {
100+
spark.stop()
101+
}
102+
}
103+
104+
test("fromCatalogTable throws on null spark") {
105+
val table = nonUcTable("file:/tmp/test")
106+
assertThrows[NullPointerException] {
107+
DeltaSnapshotManagerFactory.fromCatalogTable(table, null, new Configuration())
108+
}
109+
}
110+
111+
test("fromCatalogTable throws on null hadoopConf") {
112+
val spark = SparkSession.builder().master("local[1]").appName("factory-null-conf").getOrCreate()
113+
try {
114+
val table = nonUcTable("file:/tmp/test")
115+
assertThrows[NullPointerException] {
116+
DeltaSnapshotManagerFactory.fromCatalogTable(table, spark, null)
117+
}
118+
} finally {
119+
spark.stop()
120+
}
121+
}
122+
123+
// NOTE: Testing fromCatalogTable returning CatalogManagedSnapshotManager for valid UC tables
124+
// requires full SparkSession integration with UC catalog configuration (UCSingleCatalog,
125+
// endpoint, token). This is covered by integration tests rather than unit tests.
78126
}

0 commit comments

Comments
 (0)