diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java index 7281862481dd..db8da944c03d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalogProperties.java @@ -40,9 +40,9 @@ private RESTCatalogProperties() {} public static final String NAMESPACE_SEPARATOR = "namespace-separator"; - // Enable planning on the REST server side - public static final String REST_SCAN_PLANNING_ENABLED = "rest-scan-planning-enabled"; - public static final boolean REST_SCAN_PLANNING_ENABLED_DEFAULT = false; + // Configure scan planning mode + // Can be set by server in LoadTableResponse.config() for table-level override + public static final String SCAN_PLANNING_MODE = "scan-planning-mode"; public static final String REST_SCAN_PLAN_ID = "rest-scan-plan-id"; @@ -59,4 +59,38 @@ public enum SnapshotMode { ALL, REFS } + + /** + * Enum to represent scan planning mode. + * + * + */ + public enum ScanPlanningMode { + CLIENT("client"), + SERVER("server"); + + private final String modeName; + + ScanPlanningMode(String modeName) { + this.modeName = modeName; + } + + public String modeName() { + return modeName; + } + + public static ScanPlanningMode fromString(String mode) { + for (ScanPlanningMode planningMode : values()) { + if (planningMode.modeName.equalsIgnoreCase(mode)) { + return planningMode; + } + } + + throw new IllegalArgumentException( + String.format("Invalid scan planning mode: %s. Valid values are: client, server", mode)); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index cda71fccda3a..8f89afdfb09d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -166,7 +166,6 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private MetricsReporter reporter = null; private boolean reportingViaRestEnabled; private Integer pageSize = null; - private boolean restScanPlanningEnabled; private CloseableGroup closeables = null; private Set endpoints; private Supplier> mutationHeaders = Map::of; @@ -281,12 +280,6 @@ public void initialize(String name, Map unresolved) { RESTCatalogProperties.NAMESPACE_SEPARATOR, RESTUtil.NAMESPACE_SEPARATOR_URLENCODED_UTF_8); - this.restScanPlanningEnabled = - PropertyUtil.propertyAsBoolean( - mergedProps, - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED_DEFAULT); - this.tableCache = createTableCache(mergedProps); this.closeables.addCloseable(this.tableCache); @@ -584,7 +577,7 @@ private Supplier createTableSupplier( trackFileIO(ops); - RESTTable table = restTableForScanPlanning(ops, identifier, tableClient); + RESTTable table = restTableForScanPlanning(ops, identifier, tableClient, tableConf); if (table != null) { return table; } @@ -595,9 +588,40 @@ private Supplier createTableSupplier( } private RESTTable restTableForScanPlanning( - TableOperations ops, TableIdentifier finalIdentifier, RESTClient restClient) { - // server supports remote planning endpoint and server / client wants to do server side planning - if (endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN) && restScanPlanningEnabled) { + TableOperations ops, + TableIdentifier finalIdentifier, + RESTClient restClient, + Map tableConf) { + // Get client-side and server-side scan planning modes + String planningModeClientConfig = properties().get(RESTCatalogProperties.SCAN_PLANNING_MODE); + String planningModeServerConfig = tableConf.get(RESTCatalogProperties.SCAN_PLANNING_MODE); + + // Validate that client and server configs don't conflict + // Only validate if BOTH are explicitly set (not null) + if (planningModeClientConfig != null && planningModeServerConfig != null) { + Preconditions.checkState( + planningModeClientConfig.equalsIgnoreCase(planningModeServerConfig), + "Scan planning mode mismatch for table %s: client config=%s, server config=%s", + finalIdentifier, + planningModeClientConfig, + planningModeServerConfig); + } + + // Determine effective mode: prefer server config if present, otherwise use client config + String effectiveModeConfig = + planningModeServerConfig != null ? planningModeServerConfig : planningModeClientConfig; + RESTCatalogProperties.ScanPlanningMode effectiveMode = + effectiveModeConfig != null + ? RESTCatalogProperties.ScanPlanningMode.fromString(effectiveModeConfig) + : RESTCatalogProperties.ScanPlanningMode.CLIENT; + + if (effectiveMode == RESTCatalogProperties.ScanPlanningMode.SERVER) { + Preconditions.checkState( + endpoints.contains(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN), + "Server requires server-side scan planning for table %s but does not support endpoint %s", + finalIdentifier, + Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN); + return new RESTTable( ops, fullTableName(finalIdentifier), @@ -610,6 +634,8 @@ private RESTTable restTableForScanPlanning( properties(), conf); } + + // Default to client-side planning return null; } @@ -683,7 +709,7 @@ public Table registerTable( trackFileIO(ops); - RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient, tableConf); if (restTable != null) { return restTable; } @@ -952,7 +978,7 @@ public Table create() { trackFileIO(ops); - RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient); + RESTTable restTable = restTableForScanPlanning(ops, ident, tableClient, tableConf); if (restTable != null) { return restTable; } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestBaseWithRESTServer.java b/core/src/test/java/org/apache/iceberg/rest/TestBaseWithRESTServer.java index f1a172a4237c..a79977c2464e 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestBaseWithRESTServer.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestBaseWithRESTServer.java @@ -81,7 +81,7 @@ public void before() throws Exception { httpServer.setHandler(servletContext); httpServer.start(); - restCatalog = initCatalog(catalogName(), ImmutableMap.of()); + restCatalog = initCatalog(catalogName(), additionalCatalogProperties()); } @AfterEach @@ -119,6 +119,10 @@ public T execute( protected abstract String catalogName(); + protected Map additionalCatalogProperties() { + return ImmutableMap.of(); + } + @SuppressWarnings("unchecked") protected T roundTripSerialize(T payload, String description) { if (payload != null) { diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java index ab0e1d9c56d0..c37c3e2a7400 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java @@ -34,15 +34,12 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectReader; import java.io.IOException; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.ContentFile; -import org.apache.iceberg.ContentScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; @@ -64,6 +61,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.ErrorResponse; +import org.apache.iceberg.rest.responses.LoadTableResponse; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -81,22 +79,22 @@ public T execute( Class responseType, Consumer errorHandler, Consumer> responseHeaders) { - if (ResourcePaths.config().equals(request.path())) { - return castResponse( - responseType, - ConfigResponse.builder() - .withEndpoints( - Arrays.stream(Route.values()) - .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) - .collect(Collectors.toList())) - .withOverrides( - ImmutableMap.of(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true")) - .build()); - } + // roundTripSerialize before intercepting so we modify the deserialized response Object body = roundTripSerialize(request.body(), "request"); HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); T response = super.execute(req, responseType, errorHandler, responseHeaders); - return roundTripSerialize(response, "response"); + response = roundTripSerialize(response, "response"); + + // Add scan planning mode to table config for LoadTableResponse + if (response instanceof LoadTableResponse) { + return castResponse( + responseType, + withPlanningMode( + (LoadTableResponse) response, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())); + } + + return response; } }); } @@ -106,8 +104,24 @@ protected String catalogName() { return "prod-with-scan-planning"; } + @Override + protected Map additionalCatalogProperties() { + return ImmutableMap.of( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()); + } + // ==================== Helper Methods ==================== + private static LoadTableResponse withPlanningMode(LoadTableResponse response, String mode) { + return LoadTableResponse.builder() + .withTableMetadata(response.tableMetadata()) + .addAllConfig(response.config()) + .addConfig(RESTCatalogProperties.SCAN_PLANNING_MODE, mode) + .addAllCredentials(response.credentials()) + .build(); + } + @Override @SuppressWarnings("unchecked") protected T roundTripSerialize(T payload, String description) { @@ -805,6 +819,41 @@ private static class CatalogWithAdapter { } } + private CatalogWithAdapter catalogWithModes(String clientMode, String serverMode) { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + T response = super.execute(request, responseType, errorHandler, responseHeaders); + if (response instanceof LoadTableResponse && serverMode != null) { + return castResponse( + responseType, withPlanningMode((LoadTableResponse) response, serverMode)); + } + + return response; + } + }); + + RESTCatalog catalog = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + + ImmutableMap.Builder configBuilder = + ImmutableMap.builder() + .put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"); + + if (clientMode != null) { + configBuilder.put(RESTCatalogProperties.SCAN_PLANNING_MODE, clientMode); + } + + catalog.initialize("test-scan-planning-modes", configBuilder.build()); + return new CatalogWithAdapter(catalog, adapter); + } + // Helper: Create base catalog endpoints (namespace and table operations) private List baseCatalogEndpoints() { return ImmutableList.of( @@ -840,7 +889,18 @@ public T execute( return castResponse( responseType, ConfigResponse.builder().withEndpoints(endpoints).build()); } - return super.execute(request, responseType, errorHandler, responseHeaders); + T response = super.execute(request, responseType, errorHandler, responseHeaders); + + // Add scan planning mode to table config for LoadTableResponse + if (response instanceof LoadTableResponse) { + return castResponse( + responseType, + withPlanningMode( + (LoadTableResponse) response, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())); + } + + return response; } }); @@ -855,27 +915,24 @@ public T execute( ImmutableMap.of( CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO", - RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, - "true")); + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName())); return new CatalogWithAdapter(catalog, adapter); } @Test public void serverDoesNotSupportPlanningEndpoint() throws IOException { - // Server doesn't support scan planning at all - should fall back to client-side planning + // Server requires server-side planning but doesn't support the endpoint - should fail CatalogWithAdapter catalogWithAdapter = catalogWithEndpoints(baseCatalogEndpoints(), null); RESTCatalog catalog = catalogWithAdapter.catalog; - Table table = createTableWithScanPlanning(catalog, "no_planning_support"); - assertThat(table).isNotInstanceOf(RESTTable.class); - table.newAppend().appendFile(FILE_A).commit(); - // Should fall back to client-side planning when endpoint is not supported - assertThat(table.newScan().planFiles()) - .hasSize(1) - .first() - .extracting(ContentScanTask::file) - .extracting(ContentFile::location) - .isEqualTo(FILE_A.location()); + catalog.createNamespace(NS); + assertThatThrownBy( + () -> + catalog.buildTable(TableIdentifier.of(NS, "no_planning_support"), SCHEMA).create()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Server requires server-side scan planning") + .hasMessageContaining(Endpoint.V1_SUBMIT_TABLE_SCAN_PLAN.toString()); } @Test @@ -953,4 +1010,78 @@ public void serverSupportsPlanningButNotCancellation() throws IOException { // Verify no exception was thrown - cancelPlan returns false when endpoint not supported assertThat(cancelled).isFalse(); } + + @Test + public void catalogAndTableConfigMismatch() { + CatalogWithAdapter catalogWithAdapter = + catalogWithModes( + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName(), + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName()); + catalogWithAdapter.catalog.createNamespace(NS); + + assertThatThrownBy( + () -> + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "mismatch_test"), SCHEMA) + .create()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Scan planning mode mismatch") + .hasMessageContaining("client config=server") + .hasMessageContaining("server config=client"); + } + + @Test + public void clientExplicitlyRequestsClientSidePlanning() { + CatalogWithAdapter catalogWithAdapter = + catalogWithModes( + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName(), + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName()); + catalogWithAdapter.catalog.createNamespace(NS); + + Table table = + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "client_explicit_test"), SCHEMA) + .create(); + + assertThat(table).isNotInstanceOf(RESTTable.class); + assertThat(table).isInstanceOf(BaseTable.class); + } + + @Test + public void clientAndServerModeConflict() { + CatalogWithAdapter catalogWithAdapter = + catalogWithModes( + RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName(), + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()); + catalogWithAdapter.catalog.createNamespace(NS); + + assertThatThrownBy( + () -> + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "client_override_rejected_test"), SCHEMA) + .create()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Scan planning mode mismatch") + .hasMessageContaining("client config=client") + .hasMessageContaining("server config=server"); + } + + @Test + public void clientRequestsClientAndServerReturnsNothing() { + CatalogWithAdapter catalogWithAdapter = + catalogWithModes(RESTCatalogProperties.ScanPlanningMode.CLIENT.modeName(), null); + catalogWithAdapter.catalog.createNamespace(NS); + + Table table = + catalogWithAdapter + .catalog + .buildTable(TableIdentifier.of(NS, "client_server_null_test"), SCHEMA) + .create(); + + assertThat(table).isNotInstanceOf(RESTTable.class); + assertThat(table).isInstanceOf(BaseTable.class); + } } diff --git a/open-api/rest-catalog-open-api.py b/open-api/rest-catalog-open-api.py index 411881cb31ab..60363f5819c8 100644 --- a/open-api/rest-catalog-open-api.py +++ b/open-api/rest-catalog-open-api.py @@ -1468,6 +1468,9 @@ class LoadTableResult(BaseModel): ## General Configurations - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled + - `scan-planning-mode`: Communicates to clients the supported scanning mode. Clients should use this value to fail fast if the supported scanning mode is not available on the client. Valid values: + - `client`: Clients MUST use client-side scan planning + - `server`: Clients MUST use server-side scan planning via the `planTableScan` endpoint ## AWS Configurations diff --git a/open-api/rest-catalog-open-api.yaml b/open-api/rest-catalog-open-api.yaml index fff71128e5e5..c295d46975bd 100644 --- a/open-api/rest-catalog-open-api.yaml +++ b/open-api/rest-catalog-open-api.yaml @@ -3468,6 +3468,9 @@ components: ## General Configurations - `token`: Authorization bearer token to use for table requests if OAuth2 security is enabled + - `scan-planning-mode`: Communicates to clients the supported scanning mode. Clients should use this value to fail fast if the supported scanning mode is not available on the client. Valid values: + - `client`: Clients MUST use client-side scan planning + - `server`: Clients MUST use server-side scan planning via the `planTableScan` endpoint ## AWS Configurations diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..9c31eb970b56 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..9c31eb970b56 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..9c31eb970b56 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" } diff --git a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java index ed90da7fd4a8..9c31eb970b56 100644 --- a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java +++ b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoteScanPlanning.java @@ -40,7 +40,9 @@ protected static Object[][] parameters() { .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) // this flag is typically only set by the server, but we set it from the client for // testing - .put(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true") + .put( + RESTCatalogProperties.SCAN_PLANNING_MODE, + RESTCatalogProperties.ScanPlanningMode.SERVER.modeName()) .build(), SparkCatalogConfig.REST.catalogName() + ".default.binary_table" }