Skip to content

Commit 857af0e

Browse files
committed
Update interfaces to better reflect CCv2 spec
1 parent 581633a commit 857af0e

File tree

6 files changed

+140
-325
lines changed

6 files changed

+140
-325
lines changed

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/CatalogManagedSnapshotManager.java

Lines changed: 0 additions & 102 deletions
This file was deleted.

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/ManagedCatalogAdapter.java

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,49 +15,66 @@
1515
*/
1616
package io.delta.kernel.spark.snapshot;
1717

18-
import io.delta.kernel.CommitRange;
19-
import io.delta.kernel.Snapshot;
20-
import io.delta.kernel.engine.Engine;
21-
import io.delta.kernel.internal.files.ParsedLogData;
22-
import java.util.List;
18+
import io.delta.storage.commit.GetCommitsResponse;
2319
import java.util.Optional;
2420

2521
/**
26-
* Adapter for catalog-managed tables that knows how to load snapshots and commit ranges for a
27-
* specific table.
22+
* Adapter interface for catalog-managed Delta tables.
23+
*
24+
* <p>This is a thin, protocol-aligned interface that adapters implement to fetch commit metadata
25+
* from a catalog's commit coordinator API. Adapters are responsible only for communication with the
26+
* catalog - they don't know about Delta snapshots or Kernel internals.
27+
*
28+
* <p>The {@link CatalogManagedSnapshotManager} uses this interface to retrieve commits and then
29+
* builds Delta snapshots and commit ranges using Kernel's TableManager APIs.
30+
*
31+
* <p>Implementations should be catalog-specific (e.g., UnityCatalogAdapter, GlueCatalogAdapter) but
32+
* share this common interface so the snapshot manager can work with any catalog.
2833
*/
2934
public interface ManagedCatalogAdapter extends AutoCloseable {
3035

31-
Snapshot loadSnapshot(Engine engine, Optional<Long> versionOpt, Optional<Long> timestampOpt);
36+
/**
37+
* Returns the unique identifier for this table in the catalog.
38+
*
39+
* @return the catalog-assigned table identifier
40+
*/
41+
String getTableId();
3242

33-
CommitRange loadCommitRange(
34-
Engine engine,
35-
Optional<Long> startVersionOpt,
36-
Optional<Long> startTimestampOpt,
37-
Optional<Long> endVersionOpt,
38-
Optional<Long> endTimestampOpt);
43+
/**
44+
* Returns the storage path for this table.
45+
*
46+
* @return the filesystem path to the Delta table root
47+
*/
48+
String getTablePath();
3949

4050
/**
41-
* Gets the ratified commits from the catalog up to the specified version.
51+
* Retrieves commits from the catalog's commit coordinator.
4252
*
43-
* <p>The returned list contains {@link ParsedLogData} representing each ratified commit, sorted
44-
* by version in ascending order. These are typically {@code ParsedCatalogCommitData} instances
45-
* for catalog-managed tables.
53+
* <p>This is the primary method that adapters must implement. It calls the catalog's API to get
54+
* the list of ratified commits within the specified version range.
4655
*
47-
* @param endVersionOpt optional end version (inclusive); if empty, returns commits up to latest
48-
* @return list of parsed log data representing ratified commits, sorted by version ascending
56+
* @param startVersion the starting version (inclusive), typically 0 for initial load
57+
* @param endVersion optional ending version (inclusive); if empty, returns up to latest
58+
* @return response containing the list of commits and the latest ratified table version
4959
*/
50-
List<ParsedLogData> getRatifiedCommits(Optional<Long> endVersionOpt);
60+
GetCommitsResponse getCommits(long startVersion, Optional<Long> endVersion);
5161

5262
/**
53-
* Gets the latest ratified table version from the catalog.
63+
* Returns the latest ratified table version from the catalog.
64+
*
65+
* <p>For catalog-managed tables, this is the highest version that has been successfully ratified
66+
* by the catalog coordinator. Returns -1 if the catalog hasn't registered any commits yet (which
67+
* can happen when version 0 exists but hasn't been ratified).
5468
*
55-
* <p>For catalog-managed tables, this is the highest version that has been ratified by the
56-
* catalog coordinator.
69+
* <p>Default implementation calls {@link #getCommits} with no end version and extracts the latest
70+
* version from the response. Implementations may override for efficiency if the catalog provides
71+
* a dedicated API.
5772
*
58-
* @return the latest version ratified by the catalog, or 0 if only the initial commit exists
73+
* @return the latest version ratified by the catalog, or -1 if none registered
5974
*/
60-
long getLatestRatifiedVersion();
75+
default long getLatestRatifiedVersion() {
76+
return getCommits(0, Optional.empty()).getLatestTableVersion();
77+
}
6178

6279
@Override
6380
void close();

kernel-spark/src/main/java/io/delta/kernel/spark/snapshot/unitycatalog/UnityCatalogAdapter.java

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,21 @@
1717

1818
import static java.util.Objects.requireNonNull;
1919

20-
import io.delta.kernel.CommitRange;
21-
import io.delta.kernel.Snapshot;
22-
import io.delta.kernel.engine.Engine;
23-
import io.delta.kernel.internal.files.ParsedLogData;
2420
import io.delta.kernel.spark.snapshot.ManagedCatalogAdapter;
25-
import java.util.List;
21+
import io.delta.storage.commit.GetCommitsResponse;
2622
import java.util.Optional;
2723
import org.apache.spark.sql.SparkSession;
2824
import org.apache.spark.sql.catalyst.catalog.CatalogTable;
2925

3026
/**
31-
* UC-backed implementation shell of {@link ManagedCatalogAdapter}.
27+
* Unity Catalog implementation of {@link ManagedCatalogAdapter}.
3228
*
33-
* <p>Methods are intentionally stubbed in this wireframe PR and will be implemented in a follow-up
34-
* once UC operations are enabled.
29+
* <p>This adapter is responsible only for fetching commit metadata from Unity Catalog's commit
30+
* coordinator API. It does not contain any Delta/Kernel snapshot building logic - that
31+
* responsibility belongs to the snapshot manager layer.
32+
*
33+
* <p>Methods are stubbed in this wireframe PR and will be implemented in a follow-up once UC
34+
* operations are enabled.
3535
*/
3636
public final class UnityCatalogAdapter implements ManagedCatalogAdapter {
3737

@@ -48,16 +48,21 @@ public UnityCatalogAdapter(String tableId, String tablePath, String endpoint, St
4848
}
4949

5050
/**
51-
* Creates adapter from Spark catalog table (convenience method).
51+
* Creates adapter from Spark catalog table.
52+
*
53+
* <p>Extracts UC connection info from Spark metadata and creates the adapter.
5254
*
53-
* <p>Extracts UC connection info from Spark metadata and delegates to {@link
54-
* #fromConnectionInfo}.
55+
* @param catalogTable the catalog table metadata
56+
* @param spark the active SparkSession
57+
* @return Optional containing the adapter if this is a UC-managed table, empty otherwise
5558
*/
5659
public static Optional<ManagedCatalogAdapter> fromCatalog(
5760
CatalogTable catalogTable, SparkSession spark) {
5861
requireNonNull(catalogTable, "catalogTable is null");
5962
requireNonNull(spark, "spark is null");
60-
throw new UnsupportedOperationException("UC wiring deferred to implementation PR");
63+
64+
return SparkUnityCatalogUtils.extractConnectionInfo(catalogTable, spark)
65+
.map(UnityCatalogAdapter::fromConnectionInfo);
6166
}
6267

6368
/** Creates adapter from connection info (no Spark dependency). */
@@ -67,50 +72,39 @@ public static ManagedCatalogAdapter fromConnectionInfo(UnityCatalogConnectionInf
6772
info.getTableId(), info.getTablePath(), info.getEndpoint(), info.getToken());
6873
}
6974

75+
@Override
7076
public String getTableId() {
7177
return tableId;
7278
}
7379

80+
@Override
7481
public String getTablePath() {
7582
return tablePath;
7683
}
7784

85+
/** Returns the UC endpoint URL. */
7886
public String getEndpoint() {
7987
return endpoint;
8088
}
8189

90+
/** Returns the UC authentication token. */
8291
public String getToken() {
8392
return token;
8493
}
8594

8695
@Override
87-
public Snapshot loadSnapshot(
88-
Engine engine, Optional<Long> versionOpt, Optional<Long> timestampOpt) {
89-
throw new UnsupportedOperationException("UC snapshot loading not implemented yet");
90-
}
91-
92-
@Override
93-
public CommitRange loadCommitRange(
94-
Engine engine,
95-
Optional<Long> startVersionOpt,
96-
Optional<Long> startTimestampOpt,
97-
Optional<Long> endVersionOpt,
98-
Optional<Long> endTimestampOpt) {
99-
throw new UnsupportedOperationException("UC commit range loading not implemented yet");
100-
}
101-
102-
@Override
103-
public List<ParsedLogData> getRatifiedCommits(Optional<Long> endVersionOpt) {
104-
throw new UnsupportedOperationException("UC commit listing not implemented yet");
96+
public GetCommitsResponse getCommits(long startVersion, Optional<Long> endVersion) {
97+
requireNonNull(endVersion, "endVersion is null");
98+
throw new UnsupportedOperationException("UC getCommits not implemented yet");
10599
}
106100

107101
@Override
108102
public long getLatestRatifiedVersion() {
109-
throw new UnsupportedOperationException("UC ratified version lookup not implemented yet");
103+
throw new UnsupportedOperationException("UC getLatestRatifiedVersion not implemented yet");
110104
}
111105

112106
@Override
113107
public void close() {
114-
// no-op in wireframe
108+
// no-op in wireframe; will close UCClient in implementation
115109
}
116110
}

kernel-spark/src/test/java/io/delta/kernel/spark/snapshot/DeltaSnapshotManagerFactoryTest.java

Lines changed: 0 additions & 49 deletions
This file was deleted.

0 commit comments

Comments
 (0)