From 07d24768cb9f8511dbd89f054a808f8450c9b97d Mon Sep 17 00:00:00 2001 From: manuzhang Date: Mon, 28 Apr 2025 09:55:59 +0800 Subject: [PATCH 1/7] Core: Remove deprecated RemoveSnapshot --- .palantir/revapi.yml | 3 +++ .../org/apache/iceberg/MetadataUpdate.java | 26 +++---------------- .../apache/iceberg/MetadataUpdateParser.java | 19 ++------------ .../org/apache/iceberg/TableMetadata.java | 6 +---- .../iceberg/TestMetadataUpdateParser.java | 22 ++++++---------- .../iceberg/TestUpdateRequirements.java | 2 +- .../TestCommitTransactionRequestParser.java | 2 +- 7 files changed, 20 insertions(+), 60 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a19898f443e0..9daf9fd4c589 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1178,6 +1178,9 @@ acceptedBreaks: new: "class org.apache.iceberg.Metrics" justification: "Java serialization across versions is not guaranteed" org.apache.iceberg:iceberg-core: + - code: "java.class.removed" + old: "class org.apache.iceberg.MetadataUpdate.RemoveSnapshot" + justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\ \ org.apache.hadoop.conf.Configuration)" diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index a63bff81209e..526cbf589730 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -329,31 +329,13 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { } } - /** - * @deprecated since 1.9.0, will be removed in 1.10.0; Use {@link MetadataUpdate.RemoveSnapshots} - * instead. - */ - @Deprecated - class RemoveSnapshot implements MetadataUpdate { - private final long snapshotId; - - public RemoveSnapshot(long snapshotId) { - this.snapshotId = snapshotId; - } - - public long snapshotId() { - return snapshotId; - } - - @Override - public void applyTo(TableMetadata.Builder metadataBuilder) { - metadataBuilder.removeSnapshots(ImmutableSet.of(snapshotId)); - } - } - class RemoveSnapshots implements MetadataUpdate { private final Set snapshotIds; + public RemoveSnapshots(Long snapshotId) { + this.snapshotIds = ImmutableSet.of(snapshotId); + } + public RemoveSnapshots(Set snapshotIds) { this.snapshotIds = snapshotIds; } diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java index d0503d996378..595781e51fd6 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java @@ -26,8 +26,6 @@ import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.util.JsonUtil; import org.apache.iceberg.view.ViewVersionParser; @@ -157,7 +155,6 @@ private MetadataUpdateParser() {} .put(MetadataUpdate.SetPartitionStatistics.class, SET_PARTITION_STATISTICS) .put(MetadataUpdate.RemovePartitionStatistics.class, REMOVE_PARTITION_STATISTICS) .put(MetadataUpdate.AddSnapshot.class, ADD_SNAPSHOT) - .put(MetadataUpdate.RemoveSnapshot.class, REMOVE_SNAPSHOTS) .put(MetadataUpdate.RemoveSnapshots.class, REMOVE_SNAPSHOTS) .put(MetadataUpdate.RemoveSnapshotRef.class, REMOVE_SNAPSHOT_REF) .put(MetadataUpdate.SetSnapshotRef.class, SET_SNAPSHOT_REF) @@ -239,12 +236,7 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator break; case REMOVE_SNAPSHOTS: MetadataUpdate.RemoveSnapshots removeSnapshots; - if (metadataUpdate instanceof MetadataUpdate.RemoveSnapshot) { - Long snapshotId = ((MetadataUpdate.RemoveSnapshot) metadataUpdate).snapshotId(); - removeSnapshots = new MetadataUpdate.RemoveSnapshots(ImmutableSet.of(snapshotId)); - } else { - removeSnapshots = (MetadataUpdate.RemoveSnapshots) metadataUpdate; - } + removeSnapshots = (MetadataUpdate.RemoveSnapshots) metadataUpdate; writeRemoveSnapshots(removeSnapshots, generator); break; case REMOVE_SNAPSHOT_REF: @@ -591,14 +583,7 @@ private static MetadataUpdate readRemoveSnapshots(JsonNode node) { snapshotIds != null, "Invalid set of snapshot ids to remove: must be non-null", snapshotIds); - MetadataUpdate metadataUpdate; - if (snapshotIds.size() == 1) { - Long snapshotId = Iterables.getOnlyElement(snapshotIds); - metadataUpdate = new MetadataUpdate.RemoveSnapshot(snapshotId); - } else { - metadataUpdate = new MetadataUpdate.RemoveSnapshots(snapshotIds); - } - return metadataUpdate; + return new MetadataUpdate.RemoveSnapshots(snapshotIds); } private static MetadataUpdate readSetSnapshotRef(JsonNode node) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 6ae792d7657a..ab03d6d4f9b4 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -1882,11 +1882,7 @@ private static List updateSnapshotLog( Set intermediateSnapshotIds = intermediateSnapshotIdSet(changes, currentSnapshotId); boolean hasIntermediateSnapshots = !intermediateSnapshotIds.isEmpty(); boolean hasRemovedSnapshots = - changes.stream() - .anyMatch( - change -> - change instanceof MetadataUpdate.RemoveSnapshots - || change instanceof MetadataUpdate.RemoveSnapshot); + changes.stream().anyMatch(change -> change instanceof MetadataUpdate.RemoveSnapshots); if (!hasIntermediateSnapshots && !hasRemovedSnapshots) { return snapshotLog; diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index 7fd6eca68a07..48241d8e5aa1 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -428,7 +428,7 @@ public void testRemoveSnapshotFromJson() { String action = MetadataUpdateParser.REMOVE_SNAPSHOTS; long snapshotId = 2L; String json = String.format("{\"action\":\"%s\",\"snapshot-ids\":[2]}", action); - MetadataUpdate expected = new MetadataUpdate.RemoveSnapshot(snapshotId); + MetadataUpdate expected = new MetadataUpdate.RemoveSnapshots(snapshotId); assertEquals(action, expected, MetadataUpdateParser.fromJson(json)); } @@ -437,7 +437,7 @@ public void testRemoveSnapshotToJson() { String action = MetadataUpdateParser.REMOVE_SNAPSHOTS; long snapshotId = 2L; String expected = String.format("{\"action\":\"%s\",\"snapshot-ids\":[2]}", action); - MetadataUpdate update = new MetadataUpdate.RemoveSnapshot(snapshotId); + MetadataUpdate update = new MetadataUpdate.RemoveSnapshots(snapshotId); String actual = MetadataUpdateParser.toJson(update); assertThat(actual) .as("Remove snapshots should serialize to the correct JSON value") @@ -1065,15 +1065,9 @@ public void assertEquals( (MetadataUpdate.AddSnapshot) expectedUpdate, (MetadataUpdate.AddSnapshot) actualUpdate); break; case MetadataUpdateParser.REMOVE_SNAPSHOTS: - if (actualUpdate instanceof MetadataUpdate.RemoveSnapshot) { - assertEqualsRemoveSnapshot( - (MetadataUpdate.RemoveSnapshot) expectedUpdate, - (MetadataUpdate.RemoveSnapshot) actualUpdate); - } else { - assertEqualsRemoveSnapshots( - (MetadataUpdate.RemoveSnapshots) expectedUpdate, - (MetadataUpdate.RemoveSnapshots) actualUpdate); - } + assertEqualsRemoveSnapshot( + (MetadataUpdate.RemoveSnapshots) expectedUpdate, + (MetadataUpdate.RemoveSnapshots) actualUpdate); break; case MetadataUpdateParser.REMOVE_SNAPSHOT_REF: assertEqualsRemoveSnapshotRef( @@ -1285,10 +1279,10 @@ private static void assertEqualsAddSnapshot( } private static void assertEqualsRemoveSnapshot( - MetadataUpdate.RemoveSnapshot expected, MetadataUpdate.RemoveSnapshot actual) { - assertThat(actual.snapshotId()) + MetadataUpdate.RemoveSnapshots expected, MetadataUpdate.RemoveSnapshots actual) { + assertThat(actual.snapshotIds()) .as("Snapshot to remove should be the same") - .isEqualTo(expected.snapshotId()); + .isEqualTo(expected.snapshotIds()); } private static void assertEqualsRemoveSnapshots( diff --git a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java index 8e95d48068ca..2c6757092f70 100644 --- a/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java +++ b/core/src/test/java/org/apache/iceberg/TestUpdateRequirements.java @@ -704,7 +704,7 @@ public void addAndRemoveSnapshot() { requirements = UpdateRequirements.forUpdateTable( - metadata, ImmutableList.of(new MetadataUpdate.RemoveSnapshot(0L))); + metadata, ImmutableList.of(new MetadataUpdate.RemoveSnapshots(0L))); assertThat(requirements) .hasSize(1) diff --git a/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTransactionRequestParser.java b/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTransactionRequestParser.java index 97bc5bba5cf0..b4617a86f3f6 100644 --- a/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTransactionRequestParser.java +++ b/core/src/test/java/org/apache/iceberg/rest/requests/TestCommitTransactionRequestParser.java @@ -97,7 +97,7 @@ public void roundTripSerde() { new UpdateRequirement.AssertDefaultSpecID(4), new UpdateRequirement.AssertCurrentSchemaID(24)), ImmutableList.of( - new MetadataUpdate.RemoveSnapshot(101L), new MetadataUpdate.SetCurrentSchema(25))); + new MetadataUpdate.RemoveSnapshots(101L), new MetadataUpdate.SetCurrentSchema(25))); CommitTransactionRequest request = new CommitTransactionRequest( From 1ed876589382e43560980753eb7847f65d598089 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Mon, 28 Apr 2025 23:15:56 +0800 Subject: [PATCH 2/7] Remove unused classes and methods --- .palantir/revapi.yml | 63 +++++++ .../apache/iceberg/aws/RESTSigV4Signer.java | 160 ------------------ .../org/apache/iceberg/MetadataUpdate.java | 13 -- .../org/apache/iceberg/PartitionStats.java | 8 - .../org/apache/iceberg/SnapshotProducer.java | 33 ---- .../org/apache/iceberg/TableMetadata.java | 26 --- .../org/apache/iceberg/TableProperties.java | 7 - .../data/parquet/GenericParquetReaders.java | 13 -- .../data/parquet/GenericParquetWriter.java | 21 --- .../iceberg/data/parquet/InternalReader.java | 14 -- .../iceberg/data/parquet/InternalWriter.java | 21 --- .../iceberg/parquet/ParquetValueWriters.java | 12 -- 12 files changed, 63 insertions(+), 328 deletions(-) delete mode 100644 aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 9daf9fd4c589..bdd935657b38 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1178,13 +1178,25 @@ acceptedBreaks: new: "class org.apache.iceberg.Metrics" justification: "Java serialization across versions is not guaranteed" org.apache.iceberg:iceberg-core: + - code: "java.class.removed" + old: "class org.apache.iceberg.MetadataUpdate.EnableRowLineage" + justification: "Removing deprecations for 1.10.0" - code: "java.class.removed" old: "class org.apache.iceberg.MetadataUpdate.RemoveSnapshot" justification: "Removing deprecations for 1.10.0" + - code: "java.field.removedWithConstant" + old: "field org.apache.iceberg.TableProperties.ROW_LINEAGE" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method boolean org.apache.iceberg.TableMetadata::rowLineageEnabled()" + justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\ \ org.apache.hadoop.conf.Configuration)" justification: "Removing deprecated code" + - code: "java.method.removed" + old: "method long org.apache.iceberg.PartitionStats::totalRecordCount()" + justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method org.apache.iceberg.TableMetadata org.apache.iceberg.TableMetadata::updateSchema(org.apache.iceberg.Schema,\ \ int)" @@ -1193,6 +1205,9 @@ acceptedBreaks: old: "method org.apache.iceberg.TableMetadata.Builder org.apache.iceberg.TableMetadata.Builder::addSchema(org.apache.iceberg.Schema,\ \ int)" justification: "Removing deprecated code" + - code: "java.method.removed" + old: "method org.apache.iceberg.TableMetadata.Builder org.apache.iceberg.TableMetadata.Builder::enableRowLineage()" + justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method org.apache.iceberg.TableMetadata.Builder org.apache.iceberg.TableMetadata.Builder::setStatistics(long,\ \ org.apache.iceberg.StatisticsFile)" @@ -1239,6 +1254,54 @@ acceptedBreaks: old: "class org.apache.iceberg.data.parquet.BaseParquetWriter" new: "class org.apache.iceberg.data.parquet.BaseParquetWriter" justification: "Changing deprecated code" + - code: "java.method.numberOfParametersChanged" + old: "method org.apache.iceberg.parquet.ParquetValueWriter\ + \ org.apache.iceberg.data.parquet.InternalWriter::create(org.apache.parquet.schema.MessageType)" + new: "method org.apache.iceberg.parquet.ParquetValueWriter\ + \ org.apache.iceberg.data.parquet.InternalWriter::create(org.apache.iceberg.types.Types.StructType,\ + \ org.apache.parquet.schema.MessageType)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.numberOfParametersChanged" + old: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ + \ org.apache.iceberg.parquet.ParquetValueWriters::recordWriter(java.util.List>)" + new: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ + \ org.apache.iceberg.parquet.ParquetValueWriters::recordWriter(org.apache.iceberg.types.Types.StructType,\ + \ java.util.List>)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.numberOfParametersChanged" + old: "method org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.InternalReader::createStructReader(java.util.List,\ + \ java.util.List>, org.apache.iceberg.types.Types.StructType)" + new: "method org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.InternalReader::createStructReader(java.util.List>,\ + \ org.apache.iceberg.types.Types.StructType)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.numberOfParametersChanged" + old: "method org.apache.iceberg.parquet.ParquetValueReader\ + \ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List,\ + \ java.util.List>, org.apache.iceberg.types.Types.StructType)" + new: "method org.apache.iceberg.parquet.ParquetValueReader\ + \ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List>,\ + \ org.apache.iceberg.types.Types.StructType)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.numberOfParametersChanged" + old: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ + \ org.apache.iceberg.data.parquet.InternalWriter::createStructWriter(java.util.List>)" + new: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ + \ org.apache.iceberg.data.parquet.InternalWriter::createStructWriter(org.apache.iceberg.types.Types.StructType,\ + \ java.util.List>)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.numberOfParametersChanged" + old: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ + \ org.apache.iceberg.data.parquet.GenericParquetWriter::createStructWriter(java.util.List>)" + new: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ + \ org.apache.iceberg.data.parquet.GenericParquetWriter::createStructWriter(org.apache.iceberg.types.Types.StructType,\ + \ java.util.List>)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method org.apache.iceberg.parquet.ParquetValueWriter\ + \ org.apache.iceberg.data.parquet.GenericParquetWriter::buildWriter(org.apache.parquet.schema.MessageType)" + justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method void org.apache.iceberg.parquet.ParquetValueReader::setPageSource(org.apache.parquet.column.page.PageReadStore,\ \ long)" diff --git a/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java b/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java deleted file mode 100644 index fc9de32cc6fd..000000000000 --- a/aws/src/main/java/org/apache/iceberg/aws/RESTSigV4Signer.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.aws; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.hc.core5.http.EntityDetails; -import org.apache.hc.core5.http.Header; -import org.apache.hc.core5.http.HttpHeaders; -import org.apache.hc.core5.http.HttpRequest; -import org.apache.hc.core5.http.HttpRequestInterceptor; -import org.apache.hc.core5.http.io.entity.StringEntity; -import org.apache.hc.core5.http.protocol.HttpContext; -import org.apache.iceberg.exceptions.RESTException; -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.signer.Aws4Signer; -import software.amazon.awssdk.auth.signer.internal.SignerConstant; -import software.amazon.awssdk.auth.signer.params.Aws4SignerParams; -import software.amazon.awssdk.auth.signer.params.SignerChecksumParams; -import software.amazon.awssdk.core.checksums.Algorithm; -import software.amazon.awssdk.http.SdkHttpFullRequest; -import software.amazon.awssdk.http.SdkHttpMethod; -import software.amazon.awssdk.regions.Region; - -/** - * Provides a request interceptor for use with the HTTPClient that calculates the required signature - * for the SigV4 protocol and adds the necessary headers for all requests created by the client. - * - *

See Signing AWS - * API requests for details about the protocol. - * - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link RESTSigV4AuthManager} instead. - */ -@Deprecated -public class RESTSigV4Signer implements HttpRequestInterceptor { - static final String EMPTY_BODY_SHA256 = - "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"; - static final String RELOCATED_HEADER_PREFIX = "Original-"; - - private final Aws4Signer signer = Aws4Signer.create(); - private AwsCredentialsProvider credentialsProvider; - - private String signingName; - private Region signingRegion; - - public void initialize(Map properties) { - AwsProperties awsProperties = new AwsProperties(properties); - - this.signingRegion = awsProperties.restSigningRegion(); - this.signingName = awsProperties.restSigningName(); - this.credentialsProvider = awsProperties.restCredentialsProvider(); - } - - @Override - public void process(HttpRequest request, EntityDetails entity, HttpContext context) { - URI requestUri; - - try { - requestUri = request.getUri(); - } catch (URISyntaxException e) { - throw new RESTException(e, "Invalid uri for request: %s", request); - } - - Aws4SignerParams params = - Aws4SignerParams.builder() - .signingName(signingName) - .signingRegion(signingRegion) - .awsCredentials(credentialsProvider.resolveCredentials()) - .checksumParams( - SignerChecksumParams.builder() - .algorithm(Algorithm.SHA256) - .isStreamingRequest(false) - .checksumHeaderName(SignerConstant.X_AMZ_CONTENT_SHA256) - .build()) - .build(); - - SdkHttpFullRequest.Builder sdkRequestBuilder = SdkHttpFullRequest.builder(); - - sdkRequestBuilder - .method(SdkHttpMethod.fromValue(request.getMethod())) - .protocol(request.getScheme()) - .uri(requestUri) - .headers(convertHeaders(request.getHeaders())); - - if (entity == null) { - // This is a workaround for the signer implementation incorrectly producing - // an invalid content checksum for empty body requests. - sdkRequestBuilder.putHeader(SignerConstant.X_AMZ_CONTENT_SHA256, EMPTY_BODY_SHA256); - } else if (entity instanceof StringEntity) { - sdkRequestBuilder.contentStreamProvider( - () -> { - try { - return ((StringEntity) entity).getContent(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - } else { - throw new UnsupportedOperationException("Unsupported entity type: " + entity.getClass()); - } - - SdkHttpFullRequest signedSdkRequest = signer.sign(sdkRequestBuilder.build(), params); - updateRequestHeaders(request, signedSdkRequest.headers()); - } - - private Map> convertHeaders(Header[] headers) { - return Arrays.stream(headers) - .collect( - Collectors.groupingBy( - // Relocate Authorization header as SigV4 takes precedence - header -> - HttpHeaders.AUTHORIZATION.equals(header.getName()) - ? RELOCATED_HEADER_PREFIX + header.getName() - : header.getName(), - Collectors.mapping(Header::getValue, Collectors.toList()))); - } - - private void updateRequestHeaders(HttpRequest request, Map> headers) { - headers.forEach( - (name, values) -> { - if (request.containsHeader(name)) { - Header[] original = request.getHeaders(name); - request.removeHeaders(name); - Arrays.asList(original) - .forEach( - header -> { - // Relocate headers if there is a conflict with signed headers - if (!values.contains(header.getValue())) { - request.addHeader(RELOCATED_HEADER_PREFIX + name, header.getValue()); - } - }); - } - - values.forEach(value -> request.setHeader(name, value)); - }); - } -} diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 526cbf589730..6552b0cba5bc 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -559,17 +559,4 @@ public void applyTo(TableMetadata.Builder builder) { builder.removeEncryptionKey(keyId); } } - - /** - * Update to enable row lineage. - * - * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. - */ - @Deprecated - class EnableRowLineage implements MetadataUpdate { - @Override - public void applyTo(TableMetadata.Builder metadataBuilder) { - metadataBuilder.enableRowLineage(); - } - } } diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java index d40b9f3e7f7a..4f3870de7bbd 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStats.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -78,14 +78,6 @@ public int equalityDeleteFileCount() { return equalityDeleteFileCount; } - /** - * @deprecated since 1.9.0, will be removed in 1.10.0, use {@link #totalRecords()} instead. - */ - @Deprecated - public long totalRecordCount() { - return totalRecordCount == null ? 0L : totalRecordCount; - } - public Long totalRecords() { return totalRecordCount; } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 929c60b84ce6..118ae0b328a5 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -799,37 +799,4 @@ private static void updateTotal( } } } - - /** - * A wrapper to set the dataSequenceNumber of a DeleteFile. - * - * @deprecated will be removed in 1.10.0; use {@link Delegates#pendingDeleteFile(DeleteFile, - * Long)} instead. - */ - @Deprecated - protected static class PendingDeleteFile extends Delegates.PendingDeleteFile { - /** - * Wrap a delete file for commit with a given data sequence number. - * - * @param deleteFile delete file - * @param dataSequenceNumber data sequence number to apply - */ - PendingDeleteFile(DeleteFile deleteFile, long dataSequenceNumber) { - super(deleteFile, dataSequenceNumber); - } - - /** - * Wrap a delete file for commit with the latest sequence number. - * - * @param deleteFile delete file - */ - PendingDeleteFile(DeleteFile deleteFile) { - super(deleteFile, null); - } - - @Override - PendingDeleteFile wrap(DeleteFile file) { - return new PendingDeleteFile(file, dataSequenceNumber()); - } - } } diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index ab03d6d4f9b4..f932827b760c 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -576,16 +576,6 @@ public TableMetadata withUUID() { return new Builder(this).assignUUID().build(); } - /** - * Whether row lineage is enabled. - * - * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. - */ - @Deprecated - public boolean rowLineageEnabled() { - return formatVersion >= MIN_FORMAT_VERSION_ROW_LINEAGE; - } - public long nextRowId() { return nextRowId; } @@ -1017,22 +1007,6 @@ private Builder(TableMetadata base) { this.nextRowId = base.nextRowId; } - /** - * Enables row lineage in v3 tables. - * - * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. - */ - @Deprecated - public Builder enableRowLineage() { - if (formatVersion < MIN_FORMAT_VERSION_ROW_LINEAGE) { - throw new UnsupportedOperationException( - "Cannot enable row lineage for format-version=" + formatVersion); - } - - // otherwise this is a no-op - return this; - } - public Builder withMetadataLocation(String newMetadataLocation) { this.metadataLocation = newMetadataLocation; if (null != base) { diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 605e41fe90f1..577ce653e0ba 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -388,11 +388,4 @@ private TableProperties() {} public static final int ENCRYPTION_DEK_LENGTH_DEFAULT = 16; public static final int ENCRYPTION_AAD_LENGTH_DEFAULT = 16; - - /** - * Property to enable row lineage. - * - * @deprecated will be removed in 1.10.0; row lineage is required for all v3+ tables. - */ - @Deprecated public static final String ROW_LINEAGE = "row-lineage"; } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 182412cfb54c..4af7ee381f61 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -41,7 +41,6 @@ import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; -import org.apache.parquet.schema.Type; public class GenericParquetReaders extends BaseParquetReaders { @@ -59,18 +58,6 @@ public static ParquetValueReader buildReader( return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); } - /** - * Create a struct reader. - * - * @deprecated will be removed in 1.10.0; use {@link #createStructReader(List, StructType)} - * instead. - */ - @Deprecated - protected ParquetValueReader createStructReader( - List types, List> fieldReaders, StructType structType) { - return ParquetValueReaders.recordReader(fieldReaders, structType); - } - @Override protected ParquetValueReader createStructReader( List> fieldReaders, StructType structType) { diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index 08ce14a23cc0..fb3c38e17b90 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -44,31 +44,10 @@ public class GenericParquetWriter extends BaseParquetWriter { private GenericParquetWriter() {} - /** - * Build a writer for a Parquet schema. - * - * @deprecated will be removed in 1.10.0; use {@link #create(Schema, MessageType)} instead. - */ - @Deprecated - public static ParquetValueWriter buildWriter(MessageType type) { - return INSTANCE.createWriter(type); - } - public static ParquetValueWriter create(Schema schema, MessageType type) { return INSTANCE.createWriter(schema.asStruct(), type); } - /** - * Create a struct writer from a list of writers. - * - * @deprecated will be removed in 1.10.0; use {@link #createWriter(Types.StructType, MessageType)} - * instead. - */ - @Deprecated - protected StructWriter createStructWriter(List> writers) { - return ParquetValueWriters.recordWriter(null, writers); - } - @Override protected StructWriter createStructWriter( Types.StructType struct, List> writers) { diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java index 692a9857cf77..03585c55c9b6 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -27,7 +27,6 @@ import org.apache.iceberg.types.Types.StructType; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; public class InternalReader extends BaseParquetReaders { @@ -47,19 +46,6 @@ public static ParquetValueReader create( return (ParquetValueReader) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); } - /** - * Create a struct reader. - * - * @deprecated will be removed in 1.10.0; use {@link #createStructReader(List, StructType)} - * instead. - */ - @Deprecated - @SuppressWarnings("unchecked") - protected ParquetValueReader createStructReader( - List types, List> fieldReaders, StructType structType) { - return (ParquetValueReader) ParquetValueReaders.recordReader(fieldReaders, structType); - } - @Override @SuppressWarnings("unchecked") protected ParquetValueReader createStructReader( diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java index 0c0f7bce751c..08aaacebbb81 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java @@ -40,16 +40,6 @@ public class InternalWriter extends BaseParquetWriter { private InternalWriter() {} - /** - * Build a writer for a Parquet schema. - * - * @deprecated will be removed in 1.10.0; use {@link #createWriter(Schema, MessageType)} instead. - */ - @Deprecated - public static ParquetValueWriter create(MessageType type) { - return create((Types.StructType) null, type); - } - public static ParquetValueWriter createWriter( Schema schema, MessageType type) { return create(schema.asStruct(), type); @@ -61,17 +51,6 @@ public static ParquetValueWriter create( return (ParquetValueWriter) INSTANCE.createWriter(struct, type); } - /** - * Create a struct writer from a list of writers. - * - * @deprecated will be removed in 1.10.0; use {@link #createWriter(Types.StructType, MessageType)} - * instead. - */ - @Deprecated - protected StructWriter createStructWriter(List> writers) { - return ParquetValueWriters.recordWriter(null, writers); - } - @Override protected StructWriter createStructWriter( Types.StructType struct, List> writers) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index e3cbd9718851..2c5b5f90f6f6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -133,18 +133,6 @@ public static MapWriter maps( return new MapWriter<>(dl, rl, keyWriter, valueWriter); } - /** - * Create a struct writer that produces a StructLike record. - * - * @deprecated will be removed in 1.10.0; use {@link #recordWriter(Types.StructType, List)} - * instead. - */ - @Deprecated - public static StructWriter recordWriter( - List> writers) { - return recordWriter(null, writers); - } - public static StructWriter recordWriter( Types.StructType struct, List> writers) { return new RecordWriter<>(struct, writers); From f2173ffe476b81a42d2c115f7f4b903a2b338f5a Mon Sep 17 00:00:00 2001 From: manuzhang Date: Mon, 28 Apr 2025 23:35:37 +0800 Subject: [PATCH 3/7] Remove deprecated constructor of GenericManifestFile --- .palantir/revapi.yml | 6 +++ .../apache/iceberg/GenericManifestFile.java | 41 ------------------- .../flink/sink/TestIcebergCommitter.java | 28 ++++--------- .../flink/sink/TestIcebergFilesCommitter.java | 28 ++++--------- .../flink/sink/TestIcebergCommitter.java | 28 ++++--------- .../flink/sink/TestIcebergFilesCommitter.java | 28 ++++--------- .../flink/sink/TestIcebergCommitter.java | 28 ++++--------- .../flink/sink/TestIcebergFilesCommitter.java | 28 ++++--------- 8 files changed, 60 insertions(+), 155 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index bdd935657b38..485a6c7760ad 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1216,6 +1216,12 @@ acceptedBreaks: old: "method org.apache.iceberg.UpdateStatistics org.apache.iceberg.SetStatistics::setStatistics(long,\ \ org.apache.iceberg.StatisticsFile)" justification: "Removing deprecated code" + - code: "java.method.removed" + old: "method void org.apache.iceberg.GenericManifestFile::(java.lang.String,\ + \ long, int, org.apache.iceberg.ManifestContent, long, long, java.lang.Long,\ + \ int, long, int, long, int, long, java.util.List,\ + \ java.nio.ByteBuffer)" + justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method void org.apache.iceberg.MetadataUpdate.AddSchema::(org.apache.iceberg.Schema,\ \ int)" diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index 0ac0fddf5516..ac93222d01b5 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -134,47 +134,6 @@ public GenericManifestFile(Schema avroSchema) { this.firstRowId = firstRowId; } - /** - * GenericManifestFile constructor. - * - * @deprecated will be removed in 1.10.0; use {@link ManifestWriter#toManifestFile()} instead. - */ - @Deprecated - public GenericManifestFile( - String path, - long length, - int specId, - ManifestContent content, - long sequenceNumber, - long minSequenceNumber, - Long snapshotId, - int addedFilesCount, - long addedRowsCount, - int existingFilesCount, - long existingRowsCount, - int deletedFilesCount, - long deletedRowsCount, - List partitions, - ByteBuffer keyMetadata) { - super(ManifestFile.schema().columns().size()); - this.avroSchema = AVRO_SCHEMA; - this.manifestPath = path; - this.length = length; - this.specId = specId; - this.content = content; - this.sequenceNumber = sequenceNumber; - this.minSequenceNumber = minSequenceNumber; - this.snapshotId = snapshotId; - this.addedFilesCount = addedFilesCount; - this.addedRowsCount = addedRowsCount; - this.existingFilesCount = existingFilesCount; - this.existingRowsCount = existingRowsCount; - this.deletedFilesCount = deletedFilesCount; - this.deletedRowsCount = deletedRowsCount; - this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]); - this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); - } - /** * Copy constructor. * diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java index 4d012f9f9733..9882c43f1fc5 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java @@ -66,9 +66,9 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.GenericManifestFile; -import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.Metrics; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; @@ -1165,23 +1165,13 @@ public void testDeleteFiles() throws Exception { } } - private ManifestFile createTestingManifestFile(Path manifestPath) { - return new GenericManifestFile( - manifestPath.toAbsolutePath().toString(), - manifestPath.toFile().length(), - 0, - ManifestContent.DATA, - 0, - 0, - 0L, - 0, - 0, - 0, - 0, - 0, - 0, - null, - null); + private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { + try (ManifestWriter writer = + ManifestFiles.write( + PartitionSpec.unpartitioned(), + org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { + return writer.toManifestFile(); + } } private IcebergWriteAggregator buildIcebergWriteAggregator(String myJobId, String operatorId) { diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 47494cb9bbae..c4a298c64a00 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -55,9 +55,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.GenericManifestFile; -import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; @@ -1119,23 +1119,13 @@ private FileAppenderFactory createDeletableAppenderFactory() { null); } - private ManifestFile createTestingManifestFile(Path manifestPath) { - return new GenericManifestFile( - manifestPath.toAbsolutePath().toString(), - manifestPath.toFile().length(), - 0, - ManifestContent.DATA, - 0, - 0, - 0L, - 0, - 0, - 0, - 0, - 0, - 0, - null, - null); + private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { + try (ManifestWriter writer = + ManifestFiles.write( + PartitionSpec.unpartitioned(), + org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { + return writer.toManifestFile(); + } } private List assertFlinkManifests(int expectedCount) throws IOException { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java index 04db2781a67c..f6716d638e6a 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java @@ -66,9 +66,9 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.GenericManifestFile; -import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.Metrics; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; @@ -1164,23 +1164,13 @@ public void testDeleteFiles() throws Exception { } } - private ManifestFile createTestingManifestFile(Path manifestPath) { - return new GenericManifestFile( - manifestPath.toAbsolutePath().toString(), - manifestPath.toFile().length(), - 0, - ManifestContent.DATA, - 0, - 0, - 0L, - 0, - 0, - 0, - 0, - 0, - 0, - null, - null); + private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { + try (ManifestWriter writer = + ManifestFiles.write( + PartitionSpec.unpartitioned(), + org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { + return writer.toManifestFile(); + } } private IcebergWriteAggregator buildIcebergWriteAggregator(String myJobId, String operatorId) { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 47494cb9bbae..c4a298c64a00 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -55,9 +55,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.GenericManifestFile; -import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; @@ -1119,23 +1119,13 @@ private FileAppenderFactory createDeletableAppenderFactory() { null); } - private ManifestFile createTestingManifestFile(Path manifestPath) { - return new GenericManifestFile( - manifestPath.toAbsolutePath().toString(), - manifestPath.toFile().length(), - 0, - ManifestContent.DATA, - 0, - 0, - 0L, - 0, - 0, - 0, - 0, - 0, - 0, - null, - null); + private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { + try (ManifestWriter writer = + ManifestFiles.write( + PartitionSpec.unpartitioned(), + org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { + return writer.toManifestFile(); + } } private List assertFlinkManifests(int expectedCount) throws IOException { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java index ef3e0e8e50e0..60a5c9297341 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java @@ -66,9 +66,9 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.GenericManifestFile; -import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.Metrics; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; @@ -1154,23 +1154,13 @@ public void testDeleteFiles() throws Exception { } } - private ManifestFile createTestingManifestFile(Path manifestPath) { - return new GenericManifestFile( - manifestPath.toAbsolutePath().toString(), - manifestPath.toFile().length(), - 0, - ManifestContent.DATA, - 0, - 0, - 0L, - 0, - 0, - 0, - 0, - 0, - 0, - null, - null); + private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { + try (ManifestWriter writer = + ManifestFiles.write( + PartitionSpec.unpartitioned(), + org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { + return writer.toManifestFile(); + } } private IcebergWriteAggregator buildIcebergWriteAggregator(String myJobId, String operatorId) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 0d09af73b977..8d7ea6532e10 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -55,9 +55,9 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.GenericManifestFile; -import org.apache.iceberg.ManifestContent; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestWriter; import org.apache.iceberg.Parameter; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; @@ -1119,23 +1119,13 @@ private FileAppenderFactory createDeletableAppenderFactory() { null); } - private ManifestFile createTestingManifestFile(Path manifestPath) { - return new GenericManifestFile( - manifestPath.toAbsolutePath().toString(), - manifestPath.toFile().length(), - 0, - ManifestContent.DATA, - 0, - 0, - 0L, - 0, - 0, - 0, - 0, - 0, - 0, - null, - null); + private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { + try (ManifestWriter writer = + ManifestFiles.write( + PartitionSpec.unpartitioned(), + org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { + return writer.toManifestFile(); + } } private List assertFlinkManifests(int expectedCount) throws IOException { From 46b8791c44a01dc71ff59df56909e93105448cda Mon Sep 17 00:00:00 2001 From: manuzhang Date: Wed, 30 Apr 2025 17:19:12 +0800 Subject: [PATCH 4/7] Address comments --- .palantir/revapi.yml | 157 ++++++++------- .../org/apache/iceberg/MetadataUpdate.java | 2 +- .../iceberg/TestMetadataUpdateParser.java | 9 +- .../iceberg/spark/data/SparkAvroReader.java | 180 ------------------ 4 files changed, 90 insertions(+), 258 deletions(-) delete mode 100644 spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 485a6c7760ad..a68635d117b1 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1178,25 +1178,10 @@ acceptedBreaks: new: "class org.apache.iceberg.Metrics" justification: "Java serialization across versions is not guaranteed" org.apache.iceberg:iceberg-core: - - code: "java.class.removed" - old: "class org.apache.iceberg.MetadataUpdate.EnableRowLineage" - justification: "Removing deprecations for 1.10.0" - - code: "java.class.removed" - old: "class org.apache.iceberg.MetadataUpdate.RemoveSnapshot" - justification: "Removing deprecations for 1.10.0" - - code: "java.field.removedWithConstant" - old: "field org.apache.iceberg.TableProperties.ROW_LINEAGE" - justification: "Removing deprecations for 1.10.0" - - code: "java.method.removed" - old: "method boolean org.apache.iceberg.TableMetadata::rowLineageEnabled()" - justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\ \ org.apache.hadoop.conf.Configuration)" justification: "Removing deprecated code" - - code: "java.method.removed" - old: "method long org.apache.iceberg.PartitionStats::totalRecordCount()" - justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method org.apache.iceberg.TableMetadata org.apache.iceberg.TableMetadata::updateSchema(org.apache.iceberg.Schema,\ \ int)" @@ -1205,9 +1190,6 @@ acceptedBreaks: old: "method org.apache.iceberg.TableMetadata.Builder org.apache.iceberg.TableMetadata.Builder::addSchema(org.apache.iceberg.Schema,\ \ int)" justification: "Removing deprecated code" - - code: "java.method.removed" - old: "method org.apache.iceberg.TableMetadata.Builder org.apache.iceberg.TableMetadata.Builder::enableRowLineage()" - justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method org.apache.iceberg.TableMetadata.Builder org.apache.iceberg.TableMetadata.Builder::setStatistics(long,\ \ org.apache.iceberg.StatisticsFile)" @@ -1216,12 +1198,6 @@ acceptedBreaks: old: "method org.apache.iceberg.UpdateStatistics org.apache.iceberg.SetStatistics::setStatistics(long,\ \ org.apache.iceberg.StatisticsFile)" justification: "Removing deprecated code" - - code: "java.method.removed" - old: "method void org.apache.iceberg.GenericManifestFile::(java.lang.String,\ - \ long, int, org.apache.iceberg.ManifestContent, long, long, java.lang.Long,\ - \ int, long, int, long, int, long, java.util.List,\ - \ java.nio.ByteBuffer)" - justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method void org.apache.iceberg.MetadataUpdate.AddSchema::(org.apache.iceberg.Schema,\ \ int)" @@ -1260,71 +1236,114 @@ acceptedBreaks: old: "class org.apache.iceberg.data.parquet.BaseParquetWriter" new: "class org.apache.iceberg.data.parquet.BaseParquetWriter" justification: "Changing deprecated code" - - code: "java.method.numberOfParametersChanged" + - code: "java.method.removed" + old: "method void org.apache.iceberg.parquet.ParquetValueReader::setPageSource(org.apache.parquet.column.page.PageReadStore,\ + \ long)" + justification: "Removing deprecated code" + - code: "java.method.removed" + old: "method void org.apache.iceberg.parquet.ParquetValueReaders.StructReader::(java.util.List, java.util.List>)" + justification: "Removing deprecated code" + - code: "java.method.removed" + old: "method void org.apache.iceberg.parquet.VectorizedReader::setRowGroupInfo(org.apache.parquet.column.page.PageReadStore,\ + \ java.util.Map,\ + \ long)" + justification: "Removing deprecated code" + - code: "java.method.visibilityReduced" + old: "method void org.apache.iceberg.data.parquet.BaseParquetWriter::()" + new: "method void org.apache.iceberg.data.parquet.BaseParquetWriter::()" + justification: "Changing deprecated code" + "1.9.0": + org.apache.iceberg:iceberg-core: + - code: "java.class.removed" + old: "class org.apache.iceberg.MetadataUpdate.EnableRowLineage" + justification: "Removing deprecations for 1.10.0" + - code: "java.class.removed" + old: "class org.apache.iceberg.MetadataUpdate.RemoveSnapshot" + justification: "Removing deprecations for 1.10.0" + - code: "java.field.removedWithConstant" + old: "field org.apache.iceberg.TableProperties.ROW_LINEAGE" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.parameterTypeChanged" + old: "parameter org.apache.iceberg.io.ImmutableStorageCredential org.apache.iceberg.io.ImmutableStorageCredential::withConfig(===java.util.Map===)" + new: "parameter org.apache.iceberg.io.ImmutableStorageCredential org.apache.iceberg.io.ImmutableStorageCredential::withConfig(===org.apache.iceberg.util.SerializableMap===)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.parameterTypeChanged" + old: "parameter org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::config(===java.util.Map===)" + new: "parameter org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::config(===org.apache.iceberg.util.SerializableMap===)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method boolean org.apache.iceberg.TableMetadata::rowLineageEnabled()" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method long org.apache.iceberg.PartitionStats::totalRecordCount()" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method org.apache.iceberg.TableMetadata.Builder org.apache.iceberg.TableMetadata.Builder::enableRowLineage()" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::putAllConfig(java.util.Map)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::putConfig(java.lang.String,\ + \ java.lang.String)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::putConfig(java.util.Map.Entry)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method void org.apache.iceberg.GenericManifestFile::(java.lang.String,\ + \ long, int, org.apache.iceberg.ManifestContent, long, long, java.lang.Long,\ + \ int, long, int, long, int, long, java.util.List,\ + \ java.nio.ByteBuffer)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.returnTypeChangedCovariantly" + old: "method java.util.Map org.apache.iceberg.io.ImmutableStorageCredential::config()" + new: "method org.apache.iceberg.util.SerializableMap\ + \ org.apache.iceberg.io.ImmutableStorageCredential::config()" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.returnTypeChangedCovariantly" + old: "method java.util.Map org.apache.iceberg.io.StorageCredential::config()" + new: "method org.apache.iceberg.util.SerializableMap\ + \ org.apache.iceberg.io.StorageCredential::config()" + justification: "Removing deprecations for 1.10.0" + org.apache.iceberg:iceberg-parquet: + - code: "java.method.removed" old: "method org.apache.iceberg.parquet.ParquetValueWriter\ \ org.apache.iceberg.data.parquet.InternalWriter::create(org.apache.parquet.schema.MessageType)" - new: "method org.apache.iceberg.parquet.ParquetValueWriter\ - \ org.apache.iceberg.data.parquet.InternalWriter::create(org.apache.iceberg.types.Types.StructType,\ - \ org.apache.parquet.schema.MessageType)" justification: "Removing deprecations for 1.10.0" - - code: "java.method.numberOfParametersChanged" + - code: "java.method.removed" old: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ \ org.apache.iceberg.parquet.ParquetValueWriters::recordWriter(java.util.List>)" - new: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ - \ org.apache.iceberg.parquet.ParquetValueWriters::recordWriter(org.apache.iceberg.types.Types.StructType,\ - \ java.util.List>)" justification: "Removing deprecations for 1.10.0" - - code: "java.method.numberOfParametersChanged" + - code: "java.method.removed" old: "method org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.InternalReader::createStructReader(java.util.List,\ \ java.util.List>, org.apache.iceberg.types.Types.StructType)" - new: "method org.apache.iceberg.parquet.ParquetValueReader org.apache.iceberg.data.parquet.InternalReader::createStructReader(java.util.List>,\ - \ org.apache.iceberg.types.Types.StructType)" justification: "Removing deprecations for 1.10.0" - - code: "java.method.numberOfParametersChanged" + - code: "java.method.removed" old: "method org.apache.iceberg.parquet.ParquetValueReader\ \ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List,\ \ java.util.List>, org.apache.iceberg.types.Types.StructType)" - new: "method org.apache.iceberg.parquet.ParquetValueReader\ - \ org.apache.iceberg.data.parquet.GenericParquetReaders::createStructReader(java.util.List>,\ - \ org.apache.iceberg.types.Types.StructType)" - justification: "Removing deprecations for 1.10.0" - - code: "java.method.numberOfParametersChanged" - old: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ - \ org.apache.iceberg.data.parquet.InternalWriter::createStructWriter(java.util.List>)" - new: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ - \ org.apache.iceberg.data.parquet.InternalWriter::createStructWriter(org.apache.iceberg.types.Types.StructType,\ - \ java.util.List>)" - justification: "Removing deprecations for 1.10.0" - - code: "java.method.numberOfParametersChanged" - old: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ - \ org.apache.iceberg.data.parquet.GenericParquetWriter::createStructWriter(java.util.List>)" - new: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ - \ org.apache.iceberg.data.parquet.GenericParquetWriter::createStructWriter(org.apache.iceberg.types.Types.StructType,\ - \ java.util.List>)" justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method org.apache.iceberg.parquet.ParquetValueWriter\ \ org.apache.iceberg.data.parquet.GenericParquetWriter::buildWriter(org.apache.parquet.schema.MessageType)" justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" - old: "method void org.apache.iceberg.parquet.ParquetValueReader::setPageSource(org.apache.parquet.column.page.PageReadStore,\ - \ long)" - justification: "Removing deprecated code" - - code: "java.method.removed" - old: "method void org.apache.iceberg.parquet.ParquetValueReaders.StructReader::(java.util.List, java.util.List>)" - justification: "Removing deprecated code" + old: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ + \ org.apache.iceberg.data.parquet.InternalWriter::createStructWriter(java.util.List>)" + justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" - old: "method void org.apache.iceberg.parquet.VectorizedReader::setRowGroupInfo(org.apache.parquet.column.page.PageReadStore,\ - \ java.util.Map,\ - \ long)" - justification: "Removing deprecated code" - - code: "java.method.visibilityReduced" - old: "method void org.apache.iceberg.data.parquet.BaseParquetWriter::()" - new: "method void org.apache.iceberg.data.parquet.BaseParquetWriter::()" - justification: "Changing deprecated code" + old: "method org.apache.iceberg.parquet.ParquetValueWriters.StructWriter\ + \ org.apache.iceberg.data.parquet.GenericParquetWriter::createStructWriter(java.util.List>)" + justification: "Removing deprecations for 1.10.0" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java index 6552b0cba5bc..ced373ef85aa 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataUpdate.java +++ b/core/src/main/java/org/apache/iceberg/MetadataUpdate.java @@ -332,7 +332,7 @@ public void applyTo(TableMetadata.Builder metadataBuilder) { class RemoveSnapshots implements MetadataUpdate { private final Set snapshotIds; - public RemoveSnapshots(Long snapshotId) { + public RemoveSnapshots(long snapshotId) { this.snapshotIds = ImmutableSet.of(snapshotId); } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java index 48241d8e5aa1..696e68992b9b 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java @@ -1065,7 +1065,7 @@ public void assertEquals( (MetadataUpdate.AddSnapshot) expectedUpdate, (MetadataUpdate.AddSnapshot) actualUpdate); break; case MetadataUpdateParser.REMOVE_SNAPSHOTS: - assertEqualsRemoveSnapshot( + assertEqualsRemoveSnapshots( (MetadataUpdate.RemoveSnapshots) expectedUpdate, (MetadataUpdate.RemoveSnapshots) actualUpdate); break; @@ -1278,13 +1278,6 @@ private static void assertEqualsAddSnapshot( assertThat(actual.snapshot().schemaId()).isEqualTo(expected.snapshot().schemaId()); } - private static void assertEqualsRemoveSnapshot( - MetadataUpdate.RemoveSnapshots expected, MetadataUpdate.RemoveSnapshots actual) { - assertThat(actual.snapshotIds()) - .as("Snapshot to remove should be the same") - .isEqualTo(expected.snapshotIds()); - } - private static void assertEqualsRemoveSnapshots( MetadataUpdate.RemoveSnapshots expected, MetadataUpdate.RemoveSnapshots actual) { assertThat(actual.snapshotIds()) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java deleted file mode 100644 index 7d92d963a9f4..000000000000 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark.data; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.function.Supplier; -import org.apache.avro.LogicalType; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.Decoder; -import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor; -import org.apache.iceberg.avro.SupportsRowPosition; -import org.apache.iceberg.avro.ValueReader; -import org.apache.iceberg.avro.ValueReaders; -import org.apache.iceberg.data.avro.DecoderResolver; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; -import org.apache.spark.sql.catalyst.InternalRow; - -/** - * @deprecated will be removed in 1.8.0; use SparkPlannedAvroReader instead. - */ -@Deprecated -public class SparkAvroReader implements DatumReader, SupportsRowPosition { - - private final Schema readSchema; - private final ValueReader reader; - private Schema fileSchema = null; - - /** - * @deprecated will be removed in 1.8.0; use SparkPlannedAvroReader instead. - */ - @Deprecated - public SparkAvroReader(org.apache.iceberg.Schema expectedSchema, Schema readSchema) { - this(expectedSchema, readSchema, ImmutableMap.of()); - } - - /** - * @deprecated will be removed in 1.8.0; use SparkPlannedAvroReader instead. - */ - @Deprecated - @SuppressWarnings("unchecked") - public SparkAvroReader( - org.apache.iceberg.Schema expectedSchema, Schema readSchema, Map constants) { - this.readSchema = readSchema; - this.reader = - (ValueReader) - AvroSchemaWithTypeVisitor.visit(expectedSchema, readSchema, new ReadBuilder(constants)); - } - - @Override - public void setSchema(Schema newFileSchema) { - this.fileSchema = Schema.applyAliases(newFileSchema, readSchema); - } - - @Override - public InternalRow read(InternalRow reuse, Decoder decoder) throws IOException { - return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse); - } - - @Override - public void setRowPositionSupplier(Supplier posSupplier) { - if (reader instanceof SupportsRowPosition) { - ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); - } - } - - private static class ReadBuilder extends AvroSchemaWithTypeVisitor> { - private final Map idToConstant; - - private ReadBuilder(Map idToConstant) { - this.idToConstant = idToConstant; - } - - @Override - public ValueReader record( - Types.StructType expected, Schema record, List names, List> fields) { - return SparkValueReaders.struct(fields, expected, idToConstant); - } - - @Override - public ValueReader union(Type expected, Schema union, List> options) { - return ValueReaders.union(options); - } - - @Override - public ValueReader array( - Types.ListType expected, Schema array, ValueReader elementReader) { - return SparkValueReaders.array(elementReader); - } - - @Override - public ValueReader map( - Types.MapType expected, Schema map, ValueReader keyReader, ValueReader valueReader) { - return SparkValueReaders.arrayMap(keyReader, valueReader); - } - - @Override - public ValueReader map(Types.MapType expected, Schema map, ValueReader valueReader) { - return SparkValueReaders.map(SparkValueReaders.strings(), valueReader); - } - - @Override - public ValueReader primitive(Type.PrimitiveType expected, Schema primitive) { - LogicalType logicalType = primitive.getLogicalType(); - if (logicalType != null) { - switch (logicalType.getName()) { - case "date": - // Spark uses the same representation - return ValueReaders.ints(); - - case "timestamp-millis": - // adjust to microseconds - ValueReader longs = ValueReaders.longs(); - return (ValueReader) (decoder, ignored) -> longs.read(decoder, null) * 1000L; - - case "timestamp-micros": - // Spark uses the same representation - return ValueReaders.longs(); - - case "decimal": - return SparkValueReaders.decimal( - ValueReaders.decimalBytesReader(primitive), - ((LogicalTypes.Decimal) logicalType).getScale()); - - case "uuid": - return SparkValueReaders.uuids(); - - default: - throw new IllegalArgumentException("Unknown logical type: " + logicalType); - } - } - - switch (primitive.getType()) { - case NULL: - return ValueReaders.nulls(); - case BOOLEAN: - return ValueReaders.booleans(); - case INT: - return ValueReaders.ints(); - case LONG: - return ValueReaders.longs(); - case FLOAT: - return ValueReaders.floats(); - case DOUBLE: - return ValueReaders.doubles(); - case STRING: - return SparkValueReaders.strings(); - case FIXED: - return ValueReaders.fixed(primitive.getFixedSize()); - case BYTES: - return ValueReaders.bytes(); - case ENUM: - return SparkValueReaders.enums(primitive.getEnumSymbols()); - default: - throw new IllegalArgumentException("Unsupported type: " + primitive); - } - } - } -} From 5b190967734b86b498d8f7a230877be22c0be134 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Wed, 30 Apr 2025 18:30:13 +0800 Subject: [PATCH 5/7] Fix Flink UT --- .../flink/sink/TestIcebergCommitter.java | 16 ++++++++++------ .../flink/sink/TestIcebergFilesCommitter.java | 18 +++++++++++------- .../flink/sink/TestIcebergCommitter.java | 16 ++++++++++------ .../flink/sink/TestIcebergFilesCommitter.java | 18 +++++++++++------- .../flink/sink/TestIcebergCommitter.java | 16 ++++++++++------ .../flink/sink/TestIcebergFilesCommitter.java | 18 +++++++++++------- 6 files changed, 63 insertions(+), 39 deletions(-) diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java index 9882c43f1fc5..cd61ef009bf0 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java @@ -1029,7 +1029,7 @@ public void testFlinkManifests() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( - createTestingManifestFile(manifestPath), table.io(), table.specs()); + createTestingManifestFile(manifestPath, dataFile1), table.io(), table.specs()); assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -1165,13 +1165,17 @@ public void testDeleteFiles() throws Exception { } } - private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { - try (ManifestWriter writer = + private ManifestFile createTestingManifestFile(Path manifestPath, DataFile dataFile) + throws IOException { + ManifestWriter writer = ManifestFiles.write( + formatVersion, PartitionSpec.unpartitioned(), - org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { - return writer.toManifestFile(); - } + table.io().newOutputFile(manifestPath.toString()), + 0L); + writer.add(dataFile); + writer.close(); + return writer.toManifestFile(); } private IcebergWriteAggregator buildIcebergWriteAggregator(String myJobId, String operatorId) { diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index c4a298c64a00..bc6e00b03d9b 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -763,7 +763,7 @@ public void testFlinkManifests() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( - createTestingManifestFile(manifestPath), table.io(), table.specs()); + createTestingManifestFile(manifestPath, dataFile1), table.io(), table.specs()); assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -813,7 +813,7 @@ public void testDeleteFiles() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( - createTestingManifestFile(manifestPath), table.io(), table.specs()); + createTestingManifestFile(manifestPath, dataFile1), table.io(), table.specs()); assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -1119,13 +1119,17 @@ private FileAppenderFactory createDeletableAppenderFactory() { null); } - private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { - try (ManifestWriter writer = + private ManifestFile createTestingManifestFile(Path manifestPath, DataFile dataFile) + throws IOException { + ManifestWriter writer = ManifestFiles.write( + formatVersion, PartitionSpec.unpartitioned(), - org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { - return writer.toManifestFile(); - } + table.io().newOutputFile(manifestPath.toString()), + 0L); + writer.add(dataFile); + writer.close(); + return writer.toManifestFile(); } private List assertFlinkManifests(int expectedCount) throws IOException { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java index f6716d638e6a..a9aef169671b 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java @@ -1028,7 +1028,7 @@ public void testFlinkManifests() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( - createTestingManifestFile(manifestPath), table.io(), table.specs()); + createTestingManifestFile(manifestPath, dataFile1), table.io(), table.specs()); assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -1164,13 +1164,17 @@ public void testDeleteFiles() throws Exception { } } - private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { - try (ManifestWriter writer = + private ManifestFile createTestingManifestFile(Path manifestPath, DataFile dataFile) + throws IOException { + ManifestWriter writer = ManifestFiles.write( + formatVersion, PartitionSpec.unpartitioned(), - org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { - return writer.toManifestFile(); - } + table.io().newOutputFile(manifestPath.toString()), + 0L); + writer.add(dataFile); + writer.close(); + return writer.toManifestFile(); } private IcebergWriteAggregator buildIcebergWriteAggregator(String myJobId, String operatorId) { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index c4a298c64a00..bc6e00b03d9b 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -763,7 +763,7 @@ public void testFlinkManifests() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( - createTestingManifestFile(manifestPath), table.io(), table.specs()); + createTestingManifestFile(manifestPath, dataFile1), table.io(), table.specs()); assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -813,7 +813,7 @@ public void testDeleteFiles() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( - createTestingManifestFile(manifestPath), table.io(), table.specs()); + createTestingManifestFile(manifestPath, dataFile1), table.io(), table.specs()); assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -1119,13 +1119,17 @@ private FileAppenderFactory createDeletableAppenderFactory() { null); } - private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { - try (ManifestWriter writer = + private ManifestFile createTestingManifestFile(Path manifestPath, DataFile dataFile) + throws IOException { + ManifestWriter writer = ManifestFiles.write( + formatVersion, PartitionSpec.unpartitioned(), - org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { - return writer.toManifestFile(); - } + table.io().newOutputFile(manifestPath.toString()), + 0L); + writer.add(dataFile); + writer.close(); + return writer.toManifestFile(); } private List assertFlinkManifests(int expectedCount) throws IOException { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java index 60a5c9297341..c68c7dfbeac7 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergCommitter.java @@ -1022,7 +1022,7 @@ public void testFlinkManifests() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( - createTestingManifestFile(manifestPath), table.io(), table.specs()); + createTestingManifestFile(manifestPath, dataFile1), table.io(), table.specs()); assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -1154,13 +1154,17 @@ public void testDeleteFiles() throws Exception { } } - private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { - try (ManifestWriter writer = + private ManifestFile createTestingManifestFile(Path manifestPath, DataFile dataFile) + throws IOException { + ManifestWriter writer = ManifestFiles.write( + formatVersion, PartitionSpec.unpartitioned(), - org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { - return writer.toManifestFile(); - } + table.io().newOutputFile(manifestPath.toString()), + 0L); + writer.add(dataFile); + writer.close(); + return writer.toManifestFile(); } private IcebergWriteAggregator buildIcebergWriteAggregator(String myJobId, String operatorId) { diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java index 8d7ea6532e10..65fb9b8f69b4 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java @@ -763,7 +763,7 @@ public void testFlinkManifests() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( - createTestingManifestFile(manifestPath), table.io(), table.specs()); + createTestingManifestFile(manifestPath, dataFile1), table.io(), table.specs()); assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -813,7 +813,7 @@ public void testDeleteFiles() throws Exception { // 2. Read the data files from manifests and assert. List dataFiles = FlinkManifestUtil.readDataFiles( - createTestingManifestFile(manifestPath), table.io(), table.specs()); + createTestingManifestFile(manifestPath, dataFile1), table.io(), table.specs()); assertThat(dataFiles).hasSize(1); TestHelpers.assertEquals(dataFile1, dataFiles.get(0)); @@ -1119,13 +1119,17 @@ private FileAppenderFactory createDeletableAppenderFactory() { null); } - private ManifestFile createTestingManifestFile(Path manifestPath) throws IOException { - try (ManifestWriter writer = + private ManifestFile createTestingManifestFile(Path manifestPath, DataFile dataFile) + throws IOException { + ManifestWriter writer = ManifestFiles.write( + formatVersion, PartitionSpec.unpartitioned(), - org.apache.iceberg.Files.localOutput(manifestPath.toFile()))) { - return writer.toManifestFile(); - } + table.io().newOutputFile(manifestPath.toString()), + 0L); + writer.add(dataFile); + writer.close(); + return writer.toManifestFile(); } private List assertFlinkManifests(int expectedCount) throws IOException { From 1795f43bc98bff6cfd458739aee5ab7d974c5589 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Thu, 8 May 2025 00:05:42 +0800 Subject: [PATCH 6/7] Remove FileRewriter --- .palantir/revapi.yml | 40 ++ .../apache/iceberg/actions/FileRewriter.java | 80 ---- .../iceberg/actions/RewriteFileGroup.java | 32 -- .../actions/RewritePositionDeletesGroup.java | 32 -- .../actions/SizeBasedDataRewriter.java | 173 --------- .../actions/SizeBasedFileRewriter.java | 348 ------------------ .../SizeBasedPositionDeletesRewriter.java | 63 ---- .../actions/TestSizeBasedRewriter.java | 95 ----- 8 files changed, 40 insertions(+), 823 deletions(-) delete mode 100644 core/src/main/java/org/apache/iceberg/actions/FileRewriter.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java delete mode 100644 core/src/main/java/org/apache/iceberg/actions/SizeBasedPositionDeletesRewriter.java delete mode 100644 core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a68635d117b1..b00c04264617 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1261,6 +1261,20 @@ acceptedBreaks: - code: "java.class.removed" old: "class org.apache.iceberg.MetadataUpdate.RemoveSnapshot" justification: "Removing deprecations for 1.10.0" + - code: "java.class.removed" + old: "class org.apache.iceberg.actions.SizeBasedDataRewriter" + justification: "Removing deprecations for 1.10.0" + - code: "java.class.removed" + old: "class org.apache.iceberg.actions.SizeBasedFileRewriter>, F>" + justification: "Removing deprecations for 1.10.0" + - code: "java.class.removed" + old: "class org.apache.iceberg.actions.SizeBasedPositionDeletesRewriter" + justification: "Removing deprecations for 1.10.0" + - code: "java.class.removed" + old: "interface org.apache.iceberg.actions.FileRewriter>, F>" + justification: "Removing deprecations for 1.10.0" - code: "java.field.removedWithConstant" old: "field org.apache.iceberg.TableProperties.ROW_LINEAGE" justification: "Removing deprecations for 1.10.0" @@ -1279,9 +1293,27 @@ acceptedBreaks: - code: "java.method.removed" old: "method boolean org.apache.iceberg.TableMetadata::rowLineageEnabled()" justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method int org.apache.iceberg.actions.RewriteFileGroup::numFiles()" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method int org.apache.iceberg.actions.RewritePositionDeletesGroup::numRewrittenDeleteFiles()" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method java.util.List org.apache.iceberg.actions.RewriteFileGroup::fileScans()" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method java.util.List org.apache.iceberg.actions.RewritePositionDeletesGroup::tasks()" + justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method long org.apache.iceberg.PartitionStats::totalRecordCount()" justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method long org.apache.iceberg.actions.RewriteFileGroup::sizeInBytes()" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method long org.apache.iceberg.actions.RewritePositionDeletesGroup::rewrittenBytes()" + justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method org.apache.iceberg.TableMetadata.Builder org.apache.iceberg.TableMetadata.Builder::enableRowLineage()" justification: "Removing deprecations for 1.10.0" @@ -1303,6 +1335,14 @@ acceptedBreaks: \ int, long, int, long, int, long, java.util.List,\ \ java.nio.ByteBuffer)" justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method void org.apache.iceberg.actions.RewriteFileGroup::(org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo,\ + \ java.util.List)" + justification: "Removing deprecations for 1.10.0" + - code: "java.method.removed" + old: "method void org.apache.iceberg.actions.RewritePositionDeletesGroup::(org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupInfo,\ + \ java.util.List)" + justification: "Removing deprecations for 1.10.0" - code: "java.method.returnTypeChangedCovariantly" old: "method java.util.Map org.apache.iceberg.io.ImmutableStorageCredential::config()" new: "method org.apache.iceberg.util.SerializableMap\ diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java deleted file mode 100644 index 3aabb63c155c..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.ContentFile; -import org.apache.iceberg.ContentScanTask; - -/** - * A class for rewriting content files. - * - *

The entire rewrite operation is broken down into pieces based on partitioning, and size-based - * groups within a partition. These subunits of the rewrite are referred to as file groups. A file - * group will be processed by a single framework "action". For example, in Spark this means that - * each group would be rewritten in its own Spark job. - * - * @param the Java type of tasks to read content files - * @param the Java type of content files - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link FileRewritePlanner} and {@link - * FileRewriteRunner} - */ -@Deprecated -public interface FileRewriter, F extends ContentFile> { - - /** Returns a description for this rewriter. */ - default String description() { - return getClass().getName(); - } - - /** - * Returns a set of supported options for this rewriter. Only options specified in this list will - * be accepted at runtime. Any other options will be rejected. - */ - Set validOptions(); - - /** - * Initializes this rewriter using provided options. - * - * @param options options to initialize this rewriter - */ - void init(Map options); - - /** - * Selects files which this rewriter believes are valid targets to be rewritten based on their - * scan tasks and groups those scan tasks into file groups. The file groups are then rewritten in - * a single executable unit, such as a Spark job. - * - * @param tasks an iterable of scan task for files in a partition - * @return groups of scan tasks for files to be rewritten in a single executable unit - */ - Iterable> planFileGroups(Iterable tasks); - - /** - * Rewrite a group of files represented by the given list of scan tasks. - * - *

The implementation is supposed to be engine-specific (e.g. Spark, Flink, Trino). - * - * @param group a group of scan tasks for files to be rewritten together - * @return a set of newly written files - */ - Set rewrite(List group); -} diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java index e5982bb44732..ee9d5c54cb27 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java @@ -38,14 +38,6 @@ public class RewriteFileGroup extends RewriteGroupBase fileScanTasks) { - this(info, fileScanTasks, 0, 0L, 0L, 0); - } - public RewriteFileGroup( FileGroupInfo info, List fileScanTasks, @@ -57,14 +49,6 @@ public RewriteFileGroup( this.outputSpecId = outputSpecId; } - /** - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link #fileScanTasks()} - */ - @Deprecated - public List fileScans() { - return fileScanTasks(); - } - public void setOutputFiles(Set files) { addedFiles = DataFileSet.of(files); } @@ -105,22 +89,6 @@ public String toString() { .toString(); } - /** - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link #inputFilesSizeInBytes()} - */ - @Deprecated - public long sizeInBytes() { - return inputFilesSizeInBytes(); - } - - /** - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link #inputFileNum()} - */ - @Deprecated - public int numFiles() { - return inputFileNum(); - } - public int outputSpecId() { return outputSpecId; } diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java index f3b3a6a81ffa..d5ffe7f1e01b 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java @@ -42,14 +42,6 @@ public class RewritePositionDeletesGroup private DeleteFileSet addedDeleteFiles = DeleteFileSet.create(); - /** - * @deprecated since 1.9.0, will be removed in 1.10.0 - */ - @Deprecated - public RewritePositionDeletesGroup(FileGroupInfo info, List tasks) { - this(info, tasks, 0L, 0L, 0); - } - public RewritePositionDeletesGroup( FileGroupInfo info, List tasks, @@ -62,14 +54,6 @@ public RewritePositionDeletesGroup( tasks.stream().mapToLong(t -> t.file().dataSequenceNumber()).max().getAsLong(); } - /** - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link #fileScanTasks()} - */ - @Deprecated - public List tasks() { - return fileScanTasks(); - } - public void setOutputFiles(Set files) { addedDeleteFiles = DeleteFileSet.of(files); } @@ -119,26 +103,10 @@ public String toString() { .toString(); } - /** - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link #inputFilesSizeInBytes()} - */ - @Deprecated - public long rewrittenBytes() { - return inputFilesSizeInBytes(); - } - public long addedBytes() { return addedDeleteFiles.stream().mapToLong(ScanTaskUtil::contentSizeInBytes).sum(); } - /** - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link #inputFileNum()} - */ - @Deprecated - public int numRewrittenDeleteFiles() { - return inputFileNum(); - } - public static Comparator comparator(RewriteJobOrder order) { switch (order) { case BYTES_ASC: diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java deleted file mode 100644 index a4304ee22c4d..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.ContentFile; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.util.ContentFileUtil; -import org.apache.iceberg.util.PropertyUtil; - -/** - * Deprecated {@link SizeBasedDataRewriter} abstract class. - * - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link BinPackRewriteFilePlanner} and - * {@link FileRewriteRunner} - */ -@Deprecated -public abstract class SizeBasedDataRewriter extends SizeBasedFileRewriter { - - /** - * The minimum number of deletes that needs to be associated with a data file for it to be - * considered for rewriting. If a data file has this number of deletes or more, it will be - * rewritten regardless of its file size determined by {@link #MIN_FILE_SIZE_BYTES} and {@link - * #MAX_FILE_SIZE_BYTES}. If a file group contains a file that satisfies this condition, the file - * group will be rewritten regardless of the number of files in the file group determined by - * {@link #MIN_INPUT_FILES}. - * - *

Defaults to Integer.MAX_VALUE, which means this feature is not enabled by default. - */ - public static final String DELETE_FILE_THRESHOLD = "delete-file-threshold"; - - public static final int DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE; - - /** - * The ratio of the deleted rows in a data file for it to be considered for rewriting. If the - * deletion ratio of a data file is greater than or equal to this value, it will be rewritten - * regardless of its file size determined by {@link #MIN_FILE_SIZE_BYTES} and {@link - * #MAX_FILE_SIZE_BYTES}. If a file group contains a file that satisfies this condition, the file - * group will be rewritten regardless of the number of files in the file group determined by - * {@link #MIN_INPUT_FILES}. - * - *

Defaults to 0.3, which means that if the number of deleted records in a file reaches or - * exceeds 30%, it will trigger the rewriting operation. - */ - public static final String DELETE_RATIO_THRESHOLD = "delete-ratio-threshold"; - - public static final double DELETE_RATIO_THRESHOLD_DEFAULT = 0.3; - - private int deleteFileThreshold; - - private double deleteRatioThreshold; - - protected SizeBasedDataRewriter(Table table) { - super(table); - } - - @Override - public Set validOptions() { - return ImmutableSet.builder() - .addAll(super.validOptions()) - .add(DELETE_FILE_THRESHOLD) - .add(DELETE_RATIO_THRESHOLD) - .build(); - } - - @Override - public void init(Map options) { - super.init(options); - this.deleteFileThreshold = deleteFileThreshold(options); - this.deleteRatioThreshold = deleteRatioThreshold(options); - } - - private double deleteRatioThreshold(Map options) { - double value = - PropertyUtil.propertyAsDouble( - options, DELETE_RATIO_THRESHOLD, DELETE_RATIO_THRESHOLD_DEFAULT); - Preconditions.checkArgument( - value > 0, "'%s' is set to %s but must be > 0", DELETE_RATIO_THRESHOLD, value); - Preconditions.checkArgument( - value <= 1, "'%s' is set to %s but must be <= 1", DELETE_RATIO_THRESHOLD, value); - return value; - } - - @Override - protected Iterable filterFiles(Iterable tasks) { - return Iterables.filter(tasks, this::shouldRewrite); - } - - private boolean shouldRewrite(FileScanTask task) { - return wronglySized(task) || tooManyDeletes(task) || tooHighDeleteRatio(task); - } - - private boolean tooManyDeletes(FileScanTask task) { - return task.deletes() != null && task.deletes().size() >= deleteFileThreshold; - } - - @Override - protected Iterable> filterFileGroups(List> groups) { - return Iterables.filter(groups, this::shouldRewrite); - } - - private boolean shouldRewrite(List group) { - return enoughInputFiles(group) - || enoughContent(group) - || tooMuchContent(group) - || anyTaskHasTooManyDeletes(group) - || anyTaskHasTooHighDeleteRatio(group); - } - - private boolean anyTaskHasTooManyDeletes(List group) { - return group.stream().anyMatch(this::tooManyDeletes); - } - - private boolean anyTaskHasTooHighDeleteRatio(List group) { - return group.stream().anyMatch(this::tooHighDeleteRatio); - } - - private boolean tooHighDeleteRatio(FileScanTask task) { - if (task.deletes() == null || task.deletes().isEmpty()) { - return false; - } - - long knownDeletedRecordCount = - task.deletes().stream() - .filter(ContentFileUtil::isFileScoped) - .mapToLong(ContentFile::recordCount) - .sum(); - - double deletedRecords = (double) Math.min(knownDeletedRecordCount, task.file().recordCount()); - double deleteRatio = deletedRecords / task.file().recordCount(); - return deleteRatio >= deleteRatioThreshold; - } - - @Override - protected long defaultTargetFileSize() { - return PropertyUtil.propertyAsLong( - table().properties(), - TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, - TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); - } - - private int deleteFileThreshold(Map options) { - int value = - PropertyUtil.propertyAsInt(options, DELETE_FILE_THRESHOLD, DELETE_FILE_THRESHOLD_DEFAULT); - Preconditions.checkArgument( - value >= 0, "'%s' is set to %s but must be >= 0", DELETE_FILE_THRESHOLD, value); - return value; - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java deleted file mode 100644 index 785360c75f9e..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import java.math.RoundingMode; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.ContentFile; -import org.apache.iceberg.ContentScanTask; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Table; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.math.LongMath; -import org.apache.iceberg.util.BinPacking; -import org.apache.iceberg.util.PropertyUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A file rewriter that determines which files to rewrite based on their size. - * - *

If files are smaller than the {@link #MIN_FILE_SIZE_BYTES} threshold or larger than the {@link - * #MAX_FILE_SIZE_BYTES} threshold, they are considered targets for being rewritten. - * - *

Once selected, files are grouped based on the {@link BinPacking bin-packing algorithm} into - * groups of no more than {@link #MAX_FILE_GROUP_SIZE_BYTES}. Groups will be actually rewritten if - * they contain more than {@link #MIN_INPUT_FILES} or if they would produce at least one file of - * {@link #TARGET_FILE_SIZE_BYTES}. - * - *

Note that implementations may add extra conditions for selecting files or filtering groups. - * - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link SizeBasedFileRewritePlanner} and - * {@link FileRewriteRunner} - */ -@Deprecated -public abstract class SizeBasedFileRewriter, F extends ContentFile> - implements FileRewriter { - - private static final Logger LOG = LoggerFactory.getLogger(SizeBasedFileRewriter.class); - - /** The target output file size that this file rewriter will attempt to generate. */ - public static final String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes"; - - /** - * Controls which files will be considered for rewriting. Files with sizes under this threshold - * will be considered for rewriting regardless of any other criteria. - * - *

Defaults to 75% of the target file size. - */ - public static final String MIN_FILE_SIZE_BYTES = "min-file-size-bytes"; - - public static final double MIN_FILE_SIZE_DEFAULT_RATIO = 0.75; - - /** - * Controls which files will be considered for rewriting. Files with sizes above this threshold - * will be considered for rewriting regardless of any other criteria. - * - *

Defaults to 180% of the target file size. - */ - public static final String MAX_FILE_SIZE_BYTES = "max-file-size-bytes"; - - public static final double MAX_FILE_SIZE_DEFAULT_RATIO = 1.80; - - /** - * Any file group exceeding this number of files will be rewritten regardless of other criteria. - * This config ensures file groups that contain many files are compacted even if the total size of - * that group is less than the target file size. This can also be thought of as the maximum number - * of wrongly sized files that could remain in a partition after rewriting. - */ - public static final String MIN_INPUT_FILES = "min-input-files"; - - public static final int MIN_INPUT_FILES_DEFAULT = 5; - - /** Overrides other options and forces rewriting of all provided files. */ - public static final String REWRITE_ALL = "rewrite-all"; - - public static final boolean REWRITE_ALL_DEFAULT = false; - - /** - * This option controls the largest amount of data that should be rewritten in a single file - * group. It helps with breaking down the rewriting of very large partitions which may not be - * rewritable otherwise due to the resource constraints of the cluster. For example, a sort-based - * rewrite may not scale to TB-sized partitions, and those partitions need to be worked on in - * small subsections to avoid exhaustion of resources. - */ - public static final String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes"; - - public static final long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 100L * 1024 * 1024 * 1024; // 100 GB - - private static final long SPLIT_OVERHEAD = 5 * 1024; - - private final Table table; - private long targetFileSize; - private long minFileSize; - private long maxFileSize; - private int minInputFiles; - private boolean rewriteAll; - private long maxGroupSize; - - private int outputSpecId; - - protected SizeBasedFileRewriter(Table table) { - this.table = table; - } - - protected abstract long defaultTargetFileSize(); - - protected abstract Iterable filterFiles(Iterable tasks); - - protected abstract Iterable> filterFileGroups(List> groups); - - protected Table table() { - return table; - } - - @Override - public Set validOptions() { - return ImmutableSet.of( - TARGET_FILE_SIZE_BYTES, - MIN_FILE_SIZE_BYTES, - MAX_FILE_SIZE_BYTES, - MIN_INPUT_FILES, - REWRITE_ALL, - MAX_FILE_GROUP_SIZE_BYTES); - } - - @Override - public void init(Map options) { - Map sizeThresholds = sizeThresholds(options); - this.targetFileSize = sizeThresholds.get(TARGET_FILE_SIZE_BYTES); - this.minFileSize = sizeThresholds.get(MIN_FILE_SIZE_BYTES); - this.maxFileSize = sizeThresholds.get(MAX_FILE_SIZE_BYTES); - - this.minInputFiles = minInputFiles(options); - this.rewriteAll = rewriteAll(options); - this.maxGroupSize = maxGroupSize(options); - this.outputSpecId = outputSpecId(options); - - if (rewriteAll) { - LOG.info("Configured to rewrite all provided files in table {}", table.name()); - } - } - - protected boolean wronglySized(T task) { - return task.length() < minFileSize || task.length() > maxFileSize; - } - - @Override - public Iterable> planFileGroups(Iterable tasks) { - Iterable filteredTasks = rewriteAll ? tasks : filterFiles(tasks); - BinPacking.ListPacker packer = new BinPacking.ListPacker<>(maxGroupSize, 1, false); - List> groups = packer.pack(filteredTasks, ContentScanTask::length); - return rewriteAll ? groups : filterFileGroups(groups); - } - - protected boolean enoughInputFiles(List group) { - return group.size() > 1 && group.size() >= minInputFiles; - } - - protected boolean enoughContent(List group) { - return group.size() > 1 && inputSize(group) > targetFileSize; - } - - protected boolean tooMuchContent(List group) { - return inputSize(group) > maxFileSize; - } - - protected long inputSize(List group) { - return group.stream().mapToLong(ContentScanTask::length).sum(); - } - - /** - * Calculates the split size to use in bin-packing rewrites. - * - *

This method determines the target split size as the input size divided by the desired number - * of output files. The final split size is adjusted to be at least as big as the target file size - * but less than the max write file size. - */ - protected long splitSize(long inputSize) { - long estimatedSplitSize = (inputSize / numOutputFiles(inputSize)) + SPLIT_OVERHEAD; - if (estimatedSplitSize < targetFileSize) { - return targetFileSize; - } else if (estimatedSplitSize > writeMaxFileSize()) { - return writeMaxFileSize(); - } else { - return estimatedSplitSize; - } - } - - /** - * Determines the preferable number of output files when rewriting a particular file group. - * - *

If the rewriter is handling 10.1 GB of data with a target file size of 1 GB, it could - * produce 11 files, one of which would only have 0.1 GB. This would most likely be less - * preferable to 10 files with 1.01 GB each. So this method decides whether to round up or round - * down based on what the estimated average file size will be if the remainder (0.1 GB) is - * distributed amongst other files. If the new average file size is no more than 10% greater than - * the target file size, then this method will round down when determining the number of output - * files. Otherwise, the remainder will be written into a separate file. - * - * @param inputSize a total input size for a file group - * @return the number of files this rewriter should create - */ - protected long numOutputFiles(long inputSize) { - if (inputSize < targetFileSize) { - return 1; - } - - long numFilesWithRemainder = LongMath.divide(inputSize, targetFileSize, RoundingMode.CEILING); - long numFilesWithoutRemainder = LongMath.divide(inputSize, targetFileSize, RoundingMode.FLOOR); - long avgFileSizeWithoutRemainder = inputSize / numFilesWithoutRemainder; - - if (LongMath.mod(inputSize, targetFileSize) > minFileSize) { - // the remainder file is of a valid size for this rewrite so keep it - return numFilesWithRemainder; - - } else if (avgFileSizeWithoutRemainder - < Math.min(1.1 * targetFileSize, (double) writeMaxFileSize())) { - // if the reminder is distributed amongst other files, - // the average file size will be no more than 10% bigger than the target file size - // so round down and distribute remainder amongst other files - return numFilesWithoutRemainder; - - } else { - // keep the remainder file as it is not OK to distribute it amongst other files - return numFilesWithRemainder; - } - } - - /** - * Estimates a larger max target file size than the target size used in task creation to avoid - * creating tiny remainder files. - * - *

While we create tasks that should all be smaller than our target size, there is a chance - * that the actual data will end up being larger than our target size due to various factors of - * compression, serialization, which are outside our control. If this occurs, instead of making a - * single file that is close in size to our target, we would end up producing one file of the - * target size, and then a small extra file with the remaining data. - * - *

For example, if our target is 512 MB, we may generate a rewrite task that should be 500 MB. - * When we write the data we may find we actually have to write out 530 MB. If we use the target - * size while writing, we would produce a 512 MB file and an 18 MB file. If instead we use a - * larger size estimated by this method, then we end up writing a single file. - * - * @return the target size plus one half of the distance between max and target - */ - protected long writeMaxFileSize() { - return (long) (targetFileSize + ((maxFileSize - targetFileSize) * 0.5)); - } - - protected PartitionSpec outputSpec() { - return table.specs().get(outputSpecId); - } - - protected int outputSpecId() { - return outputSpecId; - } - - private int outputSpecId(Map options) { - int specId = - PropertyUtil.propertyAsInt(options, RewriteDataFiles.OUTPUT_SPEC_ID, table.spec().specId()); - Preconditions.checkArgument( - table.specs().containsKey(specId), - "Cannot use output spec id %s because the table does not contain a reference to this spec-id.", - specId); - return specId; - } - - private Map sizeThresholds(Map options) { - long target = - PropertyUtil.propertyAsLong(options, TARGET_FILE_SIZE_BYTES, defaultTargetFileSize()); - - long defaultMin = (long) (target * MIN_FILE_SIZE_DEFAULT_RATIO); - long min = PropertyUtil.propertyAsLong(options, MIN_FILE_SIZE_BYTES, defaultMin); - - long defaultMax = (long) (target * MAX_FILE_SIZE_DEFAULT_RATIO); - long max = PropertyUtil.propertyAsLong(options, MAX_FILE_SIZE_BYTES, defaultMax); - - Preconditions.checkArgument( - target > 0, "'%s' is set to %s but must be > 0", TARGET_FILE_SIZE_BYTES, target); - - Preconditions.checkArgument( - min >= 0, "'%s' is set to %s but must be >= 0", MIN_FILE_SIZE_BYTES, min); - - Preconditions.checkArgument( - target > min, - "'%s' (%s) must be > '%s' (%s), all new files will be smaller than the min threshold", - TARGET_FILE_SIZE_BYTES, - target, - MIN_FILE_SIZE_BYTES, - min); - - Preconditions.checkArgument( - target < max, - "'%s' (%s) must be < '%s' (%s), all new files will be larger than the max threshold", - TARGET_FILE_SIZE_BYTES, - target, - MAX_FILE_SIZE_BYTES, - max); - - Map values = Maps.newHashMap(); - - values.put(TARGET_FILE_SIZE_BYTES, target); - values.put(MIN_FILE_SIZE_BYTES, min); - values.put(MAX_FILE_SIZE_BYTES, max); - - return values; - } - - private int minInputFiles(Map options) { - int value = PropertyUtil.propertyAsInt(options, MIN_INPUT_FILES, MIN_INPUT_FILES_DEFAULT); - Preconditions.checkArgument( - value > 0, "'%s' is set to %s but must be > 0", MIN_INPUT_FILES, value); - return value; - } - - private long maxGroupSize(Map options) { - long value = - PropertyUtil.propertyAsLong( - options, MAX_FILE_GROUP_SIZE_BYTES, MAX_FILE_GROUP_SIZE_BYTES_DEFAULT); - Preconditions.checkArgument( - value > 0, "'%s' is set to %s but must be > 0", MAX_FILE_GROUP_SIZE_BYTES, value); - return value; - } - - private boolean rewriteAll(Map options) { - return PropertyUtil.propertyAsBoolean(options, REWRITE_ALL, REWRITE_ALL_DEFAULT); - } -} diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedPositionDeletesRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedPositionDeletesRewriter.java deleted file mode 100644 index 5c2f5c3bfb0e..000000000000 --- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedPositionDeletesRewriter.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import java.util.List; -import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.PositionDeletesScanTask; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.util.PropertyUtil; - -/** - * @deprecated since 1.9.0, will be removed in 1.10.0; use {@link - * BinPackRewritePositionDeletePlanner} and {@link FileRewriteRunner} - */ -@Deprecated -public abstract class SizeBasedPositionDeletesRewriter - extends SizeBasedFileRewriter { - - protected SizeBasedPositionDeletesRewriter(Table table) { - super(table); - } - - @Override - protected Iterable filterFiles(Iterable tasks) { - return Iterables.filter(tasks, this::wronglySized); - } - - @Override - protected Iterable> filterFileGroups( - List> groups) { - return Iterables.filter(groups, this::shouldRewrite); - } - - private boolean shouldRewrite(List group) { - return enoughInputFiles(group) || enoughContent(group) || tooMuchContent(group); - } - - @Override - protected long defaultTargetFileSize() { - return PropertyUtil.propertyAsLong( - table().properties(), - TableProperties.DELETE_TARGET_FILE_SIZE_BYTES, - TableProperties.DELETE_TARGET_FILE_SIZE_BYTES_DEFAULT); - } -} diff --git a/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java b/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java deleted file mode 100644 index 5cf78a973fa0..000000000000 --- a/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.actions; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.MockFileScanTask; -import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.Table; -import org.apache.iceberg.TestBase; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -/** - * @deprecated The tested class is deprecated since 1.9.0, will be removed in 1.10.0 - */ -@ExtendWith(ParameterizedTestExtension.class) -@Deprecated -public class TestSizeBasedRewriter extends TestBase { - - @TestTemplate - public void testSplitSizeLowerBound() { - SizeBasedDataFileRewriterImpl rewriter = new SizeBasedDataFileRewriterImpl(table); - - FileScanTask task1 = new MockFileScanTask(145L * 1024 * 1024); - FileScanTask task2 = new MockFileScanTask(145L * 1024 * 1024); - FileScanTask task3 = new MockFileScanTask(145L * 1024 * 1024); - FileScanTask task4 = new MockFileScanTask(145L * 1024 * 1024); - List tasks = ImmutableList.of(task1, task2, task3, task4); - - long minFileSize = 256L * 1024 * 1024; - long targetFileSize = 512L * 1024 * 1024; - long maxFileSize = 768L * 1024 * 1024; - - Map options = - ImmutableMap.of( - SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, String.valueOf(minFileSize), - SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, String.valueOf(targetFileSize), - SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, String.valueOf(maxFileSize)); - rewriter.init(options); - - // the total task size is 580 MB and the target file size is 512 MB - // the remainder must be written into a separate file as it exceeds 10% - long numOutputFiles = rewriter.computeNumOutputFiles(tasks); - assertThat(numOutputFiles).isEqualTo(2); - - // the split size must be >= targetFileSize and < maxFileSize - long splitSize = rewriter.computeSplitSize(tasks); - assertThat(splitSize).isGreaterThanOrEqualTo(targetFileSize); - assertThat(splitSize).isLessThan(maxFileSize); - } - - private static class SizeBasedDataFileRewriterImpl extends SizeBasedDataRewriter { - - SizeBasedDataFileRewriterImpl(Table table) { - super(table); - } - - @Override - public Set rewrite(List group) { - throw new UnsupportedOperationException("Not implemented"); - } - - public long computeSplitSize(List group) { - return splitSize(inputSize(group)); - } - - public long computeNumOutputFiles(List group) { - return numOutputFiles(inputSize(group)); - } - } -} From 058d997661fd89e8f77d289cc3d85db90ed2668b Mon Sep 17 00:00:00 2001 From: manuzhang Date: Thu, 8 May 2025 00:23:09 +0800 Subject: [PATCH 7/7] Redo revapi --- .palantir/revapi.yml | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index b00c04264617..8929c86d55ae 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1278,18 +1278,6 @@ acceptedBreaks: - code: "java.field.removedWithConstant" old: "field org.apache.iceberg.TableProperties.ROW_LINEAGE" justification: "Removing deprecations for 1.10.0" - - code: "java.method.parameterTypeChanged" - old: "parameter org.apache.iceberg.io.ImmutableStorageCredential org.apache.iceberg.io.ImmutableStorageCredential::withConfig(===java.util.Map===)" - new: "parameter org.apache.iceberg.io.ImmutableStorageCredential org.apache.iceberg.io.ImmutableStorageCredential::withConfig(===org.apache.iceberg.util.SerializableMap===)" - justification: "Removing deprecations for 1.10.0" - - code: "java.method.parameterTypeChanged" - old: "parameter org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::config(===java.util.Map===)" - new: "parameter org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::config(===org.apache.iceberg.util.SerializableMap===)" - justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method boolean org.apache.iceberg.TableMetadata::rowLineageEnabled()" justification: "Removing deprecations for 1.10.0" @@ -1317,18 +1305,6 @@ acceptedBreaks: - code: "java.method.removed" old: "method org.apache.iceberg.TableMetadata.Builder org.apache.iceberg.TableMetadata.Builder::enableRowLineage()" justification: "Removing deprecations for 1.10.0" - - code: "java.method.removed" - old: "method org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::putAllConfig(java.util.Map)" - justification: "Removing deprecations for 1.10.0" - - code: "java.method.removed" - old: "method org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::putConfig(java.lang.String,\ - \ java.lang.String)" - justification: "Removing deprecations for 1.10.0" - - code: "java.method.removed" - old: "method org.apache.iceberg.io.ImmutableStorageCredential.Builder org.apache.iceberg.io.ImmutableStorageCredential.Builder::putConfig(java.util.Map.Entry)" - justification: "Removing deprecations for 1.10.0" - code: "java.method.removed" old: "method void org.apache.iceberg.GenericManifestFile::(java.lang.String,\ \ long, int, org.apache.iceberg.ManifestContent, long, long, java.lang.Long,\ @@ -1343,16 +1319,6 @@ acceptedBreaks: old: "method void org.apache.iceberg.actions.RewritePositionDeletesGroup::(org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupInfo,\ \ java.util.List)" justification: "Removing deprecations for 1.10.0" - - code: "java.method.returnTypeChangedCovariantly" - old: "method java.util.Map org.apache.iceberg.io.ImmutableStorageCredential::config()" - new: "method org.apache.iceberg.util.SerializableMap\ - \ org.apache.iceberg.io.ImmutableStorageCredential::config()" - justification: "Removing deprecations for 1.10.0" - - code: "java.method.returnTypeChangedCovariantly" - old: "method java.util.Map org.apache.iceberg.io.StorageCredential::config()" - new: "method org.apache.iceberg.util.SerializableMap\ - \ org.apache.iceberg.io.StorageCredential::config()" - justification: "Removing deprecations for 1.10.0" org.apache.iceberg:iceberg-parquet: - code: "java.method.removed" old: "method org.apache.iceberg.parquet.ParquetValueWriter\