Skip to content

Commit 296421d

Browse files
authored
Merge pull request #36780: Adds SchemaFieldNumber annotations to Iceberg classes that use the SchemaCoder
2 parents 82fe92a + 2cd7a75 commit 296421d

File tree

4 files changed

+78
-0
lines changed

4 files changed

+78
-0
lines changed

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.beam.sdk.schemas.SchemaCoder;
2626
import org.apache.beam.sdk.schemas.SchemaRegistry;
2727
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
28+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
2829
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
2930
import org.apache.iceberg.CombinedScanTask;
3031
import org.apache.iceberg.FileScanTask;
@@ -53,6 +54,7 @@ static Builder builder() {
5354
return new AutoValue_ReadTask.Builder();
5455
}
5556

57+
@SchemaFieldNumber("0")
5658
abstract List<String> getFileScanTaskJsons();
5759

5860
@SchemaIgnore

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadTaskDescriptor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.sdk.schemas.SchemaCoder;
2424
import org.apache.beam.sdk.schemas.SchemaRegistry;
2525
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
26+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
2627
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
2728

2829
/** Describes the table a {@link ReadTask} belongs to. */
@@ -46,6 +47,7 @@ static Builder builder() {
4647
return new AutoValue_ReadTaskDescriptor.Builder();
4748
}
4849

50+
@SchemaFieldNumber("0")
4951
abstract String getTableIdentifierString();
5052

5153
@AutoValue.Builder

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SnapshotInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.beam.sdk.schemas.SchemaCoder;
2929
import org.apache.beam.sdk.schemas.SchemaRegistry;
3030
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
31+
import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber;
3132
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
3233
import org.apache.beam.sdk.values.Row;
3334
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
@@ -98,22 +99,31 @@ public TableIdentifier getTableIdentifier() {
9899
return cachedTableIdentifier;
99100
}
100101

102+
@SchemaFieldNumber("0")
101103
public abstract long getSequenceNumber();
102104

105+
@SchemaFieldNumber("1")
103106
public abstract long getSnapshotId();
104107

108+
@SchemaFieldNumber("2")
105109
public abstract @Nullable Long getParentId();
106110

111+
@SchemaFieldNumber("3")
107112
public abstract long getTimestampMillis();
108113

114+
@SchemaFieldNumber("4")
109115
public abstract @Nullable String getOperation();
110116

117+
@SchemaFieldNumber("5")
111118
public abstract @Nullable Map<String, String> getSummary();
112119

120+
@SchemaFieldNumber("6")
113121
public abstract @Nullable String getManifestListLocation();
114122

123+
@SchemaFieldNumber("7")
115124
public abstract @Nullable Integer getSchemaId();
116125

126+
@SchemaFieldNumber("8")
117127
public abstract @Nullable String getTableIdentifierString();
118128

119129
@AutoValue.Builder

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@
2121
import static org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.OUTPUT_TAG;
2222
import static org.hamcrest.MatcherAssert.assertThat;
2323
import static org.hamcrest.Matchers.containsInAnyOrder;
24+
import static org.junit.Assert.assertEquals;
2425

2526
import java.util.HashMap;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.UUID;
2930
import java.util.stream.Collectors;
3031
import org.apache.beam.sdk.managed.Managed;
32+
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
3133
import org.apache.beam.sdk.schemas.Schema;
34+
import org.apache.beam.sdk.schemas.SchemaRegistry;
3235
import org.apache.beam.sdk.testing.PAssert;
3336
import org.apache.beam.sdk.testing.TestPipeline;
3437
import org.apache.beam.sdk.values.PCollection;
@@ -150,4 +153,65 @@ public void testReadUsingManagedTransform() throws Exception {
150153

151154
testPipeline.run();
152155
}
156+
157+
@Test
158+
public void testSnapshotInfoSchemaFieldNumbers() throws NoSuchSchemaException {
159+
Schema schema = SchemaRegistry.createDefault().getSchema(SnapshotInfo.class);
160+
assertEquals(9, schema.getFieldCount());
161+
162+
assertEquals(
163+
Schema.Field.of("sequenceNumber", Schema.FieldType.INT64)
164+
.withDescription(schema.getField(0).getDescription())
165+
.withNullable(false),
166+
schema.getField(0));
167+
168+
assertEquals(
169+
Schema.Field.of("snapshotId", Schema.FieldType.INT64)
170+
.withDescription(schema.getField(1).getDescription())
171+
.withNullable(false),
172+
schema.getField(1));
173+
174+
assertEquals(
175+
Schema.Field.of("parentId", Schema.FieldType.INT64)
176+
.withDescription(schema.getField(2).getDescription())
177+
.withNullable(true),
178+
schema.getField(2));
179+
180+
assertEquals(
181+
Schema.Field.of("timestampMillis", Schema.FieldType.INT64)
182+
.withDescription(schema.getField(3).getDescription())
183+
.withNullable(false),
184+
schema.getField(3));
185+
186+
assertEquals(
187+
Schema.Field.of("operation", Schema.FieldType.STRING)
188+
.withDescription(schema.getField(4).getDescription())
189+
.withNullable(true),
190+
schema.getField(4));
191+
192+
assertEquals(
193+
Schema.Field.of(
194+
"summary", Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.STRING))
195+
.withDescription(schema.getField(5).getDescription())
196+
.withNullable(true),
197+
schema.getField(5));
198+
199+
assertEquals(
200+
Schema.Field.of("manifestListLocation", Schema.FieldType.STRING)
201+
.withDescription(schema.getField(6).getDescription())
202+
.withNullable(true),
203+
schema.getField(6));
204+
205+
assertEquals(
206+
Schema.Field.of("schemaId", Schema.FieldType.INT32)
207+
.withDescription(schema.getField(7).getDescription())
208+
.withNullable(true),
209+
schema.getField(7));
210+
211+
assertEquals(
212+
Schema.Field.of("tableIdentifierString", Schema.FieldType.STRING)
213+
.withDescription(schema.getField(8).getDescription())
214+
.withNullable(true),
215+
schema.getField(8));
216+
}
153217
}

0 commit comments

Comments
 (0)