Skip to content

Commit 5f65095

Browse files
committed
Core, Orc, Data: Implementation of ORCFormatModel
1 parent 31fc168 commit 5f65095

5 files changed

Lines changed: 577 additions & 6 deletions

File tree

core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ private FormatModelRegistry() {}
5656

5757
private static final Logger LOG = LoggerFactory.getLogger(FormatModelRegistry.class);
5858
// The list of classes which are used for registering the reader and writer builders
59-
private static final List<String> CLASSES_TO_REGISTER = ImmutableList.of();
59+
private static final List<String> CLASSES_TO_REGISTER =
60+
ImmutableList.of("org.apache.iceberg.data.GenericFormatModels");
6061

6162
// Format models indexed by file format and object model class
6263
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> MODELS =
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.data;
20+
21+
import org.apache.iceberg.Schema;
22+
import org.apache.iceberg.data.orc.GenericOrcReader;
23+
import org.apache.iceberg.data.orc.GenericOrcWriter;
24+
import org.apache.iceberg.formats.FormatModelRegistry;
25+
import org.apache.iceberg.orc.ORCFormatModel;
26+
27+
public class GenericFormatModels {
28+
public static void register() {
29+
FormatModelRegistry.register(
30+
ORCFormatModel.create(
31+
Record.class,
32+
Schema.class,
33+
(icebergSchema, fileSchema, engineSchema) ->
34+
GenericOrcWriter.buildWriter(icebergSchema, fileSchema),
35+
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
36+
GenericOrcReader.buildReader(icebergSchema, fileSchema, idToConstant)));
37+
38+
FormatModelRegistry.register(ORCFormatModel.forDelete());
39+
}
40+
41+
private GenericFormatModels() {}
42+
}
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.data;
20+
21+
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
22+
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
import java.io.IOException;
26+
import java.nio.file.Path;
27+
import java.util.List;
28+
import org.apache.iceberg.DataFile;
29+
import org.apache.iceberg.DeleteFile;
30+
import org.apache.iceberg.FileFormat;
31+
import org.apache.iceberg.Parameter;
32+
import org.apache.iceberg.ParameterizedTestExtension;
33+
import org.apache.iceberg.Parameters;
34+
import org.apache.iceberg.PartitionSpec;
35+
import org.apache.iceberg.Schema;
36+
import org.apache.iceberg.TestBase;
37+
import org.apache.iceberg.deletes.EqualityDeleteWriter;
38+
import org.apache.iceberg.deletes.PositionDelete;
39+
import org.apache.iceberg.deletes.PositionDeleteWriter;
40+
import org.apache.iceberg.encryption.EncryptedFiles;
41+
import org.apache.iceberg.encryption.EncryptedOutputFile;
42+
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
43+
import org.apache.iceberg.formats.FileWriterBuilder;
44+
import org.apache.iceberg.formats.FormatModelRegistry;
45+
import org.apache.iceberg.inmemory.InMemoryFileIO;
46+
import org.apache.iceberg.io.CloseableIterable;
47+
import org.apache.iceberg.io.DataWriter;
48+
import org.apache.iceberg.io.InputFile;
49+
import org.apache.iceberg.io.OutputFile;
50+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
51+
import org.junit.jupiter.api.AfterEach;
52+
import org.junit.jupiter.api.BeforeEach;
53+
import org.junit.jupiter.api.TestTemplate;
54+
import org.junit.jupiter.api.extension.ExtendWith;
55+
import org.junit.jupiter.api.io.TempDir;
56+
57+
@ExtendWith(ParameterizedTestExtension.class)
58+
public class TestGenericFormatModels {
59+
@Parameters(name = "fileFormat = {0}")
60+
protected static List<Object> parameters() {
61+
return List.of(FileFormat.AVRO);
62+
}
63+
64+
private static final List<Record> TEST_RECORDS =
65+
ImmutableList.of(
66+
GenericRecord.create(TestBase.SCHEMA).copy("id", 1, "data", "hello"),
67+
GenericRecord.create(TestBase.SCHEMA).copy("id", 2, "data", "world"));
68+
69+
@Parameter(index = 0)
70+
private FileFormat fileFormat;
71+
72+
@TempDir protected Path temp;
73+
74+
private InMemoryFileIO fileIO;
75+
private EncryptedOutputFile encryptedFile;
76+
77+
@BeforeEach
78+
public void before() {
79+
this.fileIO = new InMemoryFileIO();
80+
OutputFile outputFile = fileIO.newOutputFile("test-file." + fileFormat.name().toLowerCase());
81+
this.encryptedFile = EncryptedFiles.encryptedOutput(outputFile, EncryptionKeyMetadata.EMPTY);
82+
}
83+
84+
@AfterEach
85+
public void after() throws IOException {
86+
fileIO.deleteFile(encryptedFile.encryptingOutputFile());
87+
this.encryptedFile = null;
88+
if (fileIO != null) {
89+
fileIO.close();
90+
}
91+
}
92+
93+
@TestTemplate
94+
public void testDataWriter() throws IOException {
95+
FileWriterBuilder<DataWriter<Record>, Schema> writerBuilder =
96+
FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile);
97+
98+
DataFile dataFile;
99+
DataWriter<Record> writer =
100+
writerBuilder.schema(TestBase.SCHEMA).spec(PartitionSpec.unpartitioned()).build();
101+
try (writer) {
102+
for (Record record : TEST_RECORDS) {
103+
writer.write(record);
104+
}
105+
}
106+
107+
dataFile = writer.toDataFile();
108+
109+
assertThat(dataFile).isNotNull();
110+
assertThat(dataFile.recordCount()).isEqualTo(2);
111+
assertThat(dataFile.format()).isEqualTo(fileFormat);
112+
113+
// Verify the file content by reading it back
114+
InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location());
115+
List<Record> readRecords;
116+
try (CloseableIterable<Record> reader =
117+
FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
118+
.project(TestBase.SCHEMA)
119+
.build()) {
120+
readRecords = ImmutableList.copyOf(reader);
121+
}
122+
123+
assertThat(readRecords).hasSize(2);
124+
DataTestHelpers.assertEquals(
125+
TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(0), readRecords.get(0));
126+
DataTestHelpers.assertEquals(
127+
TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(1), readRecords.get(1));
128+
}
129+
130+
@TestTemplate
131+
public void testEqualityDeleteWriter() throws IOException {
132+
FileWriterBuilder<EqualityDeleteWriter<Record>, Schema> writerBuilder =
133+
FormatModelRegistry.equalityDeleteWriteBuilder(fileFormat, Record.class, encryptedFile);
134+
135+
DeleteFile deleteFile;
136+
EqualityDeleteWriter<Record> writer =
137+
writerBuilder
138+
.schema(TestBase.SCHEMA)
139+
.spec(PartitionSpec.unpartitioned())
140+
.equalityFieldIds(3)
141+
.build();
142+
try (writer) {
143+
for (Record record : TEST_RECORDS) {
144+
writer.write(record);
145+
}
146+
}
147+
148+
deleteFile = writer.toDeleteFile();
149+
150+
assertThat(deleteFile).isNotNull();
151+
assertThat(deleteFile.recordCount()).isEqualTo(2);
152+
assertThat(deleteFile.format()).isEqualTo(fileFormat);
153+
assertThat(deleteFile.equalityFieldIds()).containsExactly(3);
154+
155+
// Verify the file content by reading it back
156+
InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location());
157+
List<Record> readRecords;
158+
try (CloseableIterable<Record> reader =
159+
FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
160+
.project(TestBase.SCHEMA)
161+
.build()) {
162+
readRecords = ImmutableList.copyOf(reader);
163+
}
164+
165+
assertThat(readRecords).hasSize(2);
166+
DataTestHelpers.assertEquals(
167+
TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(0), readRecords.get(0));
168+
DataTestHelpers.assertEquals(
169+
TestBase.SCHEMA.asStruct(), TEST_RECORDS.get(1), readRecords.get(1));
170+
}
171+
172+
@TestTemplate
173+
public void testPositionDeleteWriter() throws IOException {
174+
Schema positionDeleteSchema = new Schema(DELETE_FILE_PATH, DELETE_FILE_POS);
175+
176+
FileWriterBuilder<PositionDeleteWriter<Record>, ?> writerBuilder =
177+
FormatModelRegistry.positionDeleteWriteBuilder(fileFormat, encryptedFile);
178+
179+
PositionDelete<Record> delete1 = PositionDelete.create();
180+
delete1.set("data-file-1.parquet", 0L, null);
181+
182+
PositionDelete<Record> delete2 = PositionDelete.create();
183+
delete2.set("data-file-1.parquet", 1L, null);
184+
185+
List<PositionDelete<Record>> positionDeletes = ImmutableList.of(delete1, delete2);
186+
187+
DeleteFile deleteFile;
188+
PositionDeleteWriter<Record> writer = writerBuilder.spec(PartitionSpec.unpartitioned()).build();
189+
try (writer) {
190+
for (PositionDelete<Record> delete : positionDeletes) {
191+
writer.write(delete);
192+
}
193+
}
194+
195+
deleteFile = writer.toDeleteFile();
196+
197+
assertThat(deleteFile).isNotNull();
198+
assertThat(deleteFile.recordCount()).isEqualTo(2);
199+
assertThat(deleteFile.format()).isEqualTo(fileFormat);
200+
201+
// Verify the file content by reading it back
202+
InputFile inputFile = fileIO.newInputFile(encryptedFile.encryptingOutputFile().location());
203+
List<Record> readRecords;
204+
try (CloseableIterable<Record> reader =
205+
FormatModelRegistry.readBuilder(fileFormat, Record.class, inputFile)
206+
.project(positionDeleteSchema)
207+
.build()) {
208+
readRecords = ImmutableList.copyOf(reader);
209+
}
210+
211+
assertThat(readRecords).hasSize(2);
212+
assertThat(readRecords.get(0).getField("file_path")).isEqualTo("data-file-1.parquet");
213+
assertThat(readRecords.get(0).getField("pos")).isEqualTo(0L);
214+
assertThat(readRecords.get(1).getField("file_path")).isEqualTo("data-file-1.parquet");
215+
assertThat(readRecords.get(1).getField("pos")).isEqualTo(1L);
216+
}
217+
}

orc/src/main/java/org/apache/iceberg/orc/ORC.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,15 @@
4545
import java.util.Locale;
4646
import java.util.Map;
4747
import java.util.Objects;
48+
import java.util.Set;
4849
import java.util.function.BiFunction;
4950
import java.util.function.Function;
5051
import java.util.stream.Collectors;
5152
import java.util.stream.IntStream;
5253
import org.apache.hadoop.conf.Configuration;
5354
import org.apache.hadoop.fs.Path;
5455
import org.apache.iceberg.FileFormat;
56+
import org.apache.iceberg.MetadataColumns;
5557
import org.apache.iceberg.MetricsConfig;
5658
import org.apache.iceberg.PartitionSpec;
5759
import org.apache.iceberg.Schema;
@@ -79,7 +81,10 @@
7981
import org.apache.iceberg.io.OutputFile;
8082
import org.apache.iceberg.mapping.NameMapping;
8183
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
84+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
8285
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
86+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
87+
import org.apache.iceberg.types.TypeUtil;
8388
import org.apache.iceberg.util.ArrayUtil;
8489
import org.apache.iceberg.util.PropertyUtil;
8590
import org.apache.orc.CompressionKind;
@@ -179,9 +184,8 @@ public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
179184
return this;
180185
}
181186

182-
// Supposed to always be a private method used strictly by data and delete write builders
183-
private WriteBuilder createContextFunc(
184-
Function<Map<String, String>, Context> newCreateContextFunc) {
187+
// supposed to always be a private method used strictly by data and delete write builders
188+
WriteBuilder createContextFunc(Function<Map<String, String>, Context> newCreateContextFunc) {
185189
this.createContextFunc = newCreateContextFunc;
186190
return this;
187191
}
@@ -219,7 +223,7 @@ public <D> FileAppender<D> build() {
219223
metricsConfig);
220224
}
221225

222-
private static class Context {
226+
static class Context {
223227
private final long stripeSize;
224228
private final long blockSize;
225229
private final int vectorizedRowBatchSize;
@@ -699,6 +703,7 @@ public static class ReadBuilder {
699703
private Function<TypeDescription, OrcRowReader<?>> readerFunc;
700704
private Function<TypeDescription, OrcBatchReader<?>> batchedReaderFunc;
701705
private int recordsPerBatch = VectorizedRowBatch.DEFAULT_SIZE;
706+
private Set<Integer> constantFieldIds = ImmutableSet.of();
702707

703708
private ReadBuilder(InputFile file) {
704709
Preconditions.checkNotNull(file, "Input file cannot be null");
@@ -775,12 +780,20 @@ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
775780
return this;
776781
}
777782

783+
ReadBuilder constantFieldIds(Set<Integer> newConstantFieldIds) {
784+
this.constantFieldIds = newConstantFieldIds;
785+
return this;
786+
}
787+
778788
public <D> CloseableIterable<D> build() {
779789
Preconditions.checkNotNull(schema, "Schema is required");
780790
return new OrcIterable<>(
781791
file,
782792
conf,
783-
schema,
793+
// This is a behavioral change. Previously there were an error if metadata columns were
794+
// present in the schema, now they are removed and the correct reader is created
795+
TypeUtil.selectNot(
796+
schema, Sets.union(constantFieldIds, MetadataColumns.metadataFieldIds())),
784797
nameMapping,
785798
start,
786799
length,

0 commit comments

Comments
 (0)