Skip to content

Commit 87d204f

Browse files
authored
Fix inconsistent data type in GenericRecord and AvroSchema for AvroWriter (#36839)
* Fix inconsist data type in GenericRecord and AvroSchema for AvroWriter * clean code
1 parent b369dfa commit 87d204f

File tree

3 files changed

+36
-5
lines changed

3 files changed

+36
-5
lines changed

sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Objects;
3737
import java.util.UUID;
3838
import java.util.concurrent.TimeUnit;
39+
import java.util.function.Function;
3940
import java.util.stream.Collectors;
4041
import javax.annotation.Nonnull;
4142
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
@@ -97,6 +98,7 @@
9798
import org.apache.beam.sdk.values.Row;
9899
import org.apache.beam.sdk.values.TypeDescriptor;
99100
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat;
101+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
100102
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
101103
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
102104
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
@@ -1214,6 +1216,15 @@ private static org.apache.avro.Schema getFieldSchema(
12141216
return fieldType.getNullable() ? ReflectData.makeNullable(baseType) : baseType;
12151217
}
12161218

1219+
private static final Map<org.apache.avro.Schema, Function<Number, ? extends Number>>
1220+
NUMERIC_CONVERTERS =
1221+
ImmutableMap.of(
1222+
org.apache.avro.Schema.create(Type.INT), Number::intValue,
1223+
org.apache.avro.Schema.create(Type.LONG), Number::longValue,
1224+
org.apache.avro.Schema.create(Type.FLOAT), Number::floatValue,
1225+
org.apache.avro.Schema.create(Type.DOUBLE), Number::doubleValue);
1226+
1227+
/** Convert a value from Beam Row to a vlue used for Avro GenericRecord. */
12171228
private static @Nullable Object genericFromBeamField(
12181229
FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable Object value) {
12191230
TypeWithNullability typeWithNullability = new TypeWithNullability(avroSchema);
@@ -1230,6 +1241,11 @@ private static org.apache.avro.Schema getFieldSchema(
12301241
return value;
12311242
}
12321243

1244+
if (NUMERIC_CONVERTERS.containsKey(typeWithNullability.type)) {
1245+
return NUMERIC_CONVERTERS.get(typeWithNullability.type).apply((Number) value);
1246+
}
1247+
1248+
// TODO: should we use Avro Schema as the source-of-truth in general?
12331249
switch (fieldType.getTypeName()) {
12341250
case BYTE:
12351251
case INT16:

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public SerializableFunction<AvroWriteRequest<Row>, GenericRecord> getAvroFilterF
122122
row = checkStateNotNull(row.getRow(RECORD));
123123
}
124124
Row filtered = rowFilter.filter(row);
125-
return AvroUtils.toGenericRecord(filtered);
125+
return AvroUtils.toGenericRecord(filtered, request.getSchema());
126126
};
127127
}
128128
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,28 @@ public class BigQueryFileLoadsSchemaTransformProviderTest {
7070
new TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
7171

7272
private static final Schema SCHEMA =
73-
Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64));
73+
Schema.of(
74+
Field.of("name", FieldType.STRING),
75+
Field.of("number", FieldType.INT64),
76+
Field.of("age", FieldType.INT32).withNullable(true));
7477

7578
private static final List<Row> ROWS =
7679
Arrays.asList(
77-
Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(),
78-
Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 2L).build(),
79-
Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 3L).build());
80+
Row.withSchema(SCHEMA)
81+
.withFieldValue("name", "a")
82+
.withFieldValue("number", 1L)
83+
.withFieldValue("age", 10)
84+
.build(),
85+
Row.withSchema(SCHEMA)
86+
.withFieldValue("name", "b")
87+
.withFieldValue("number", 2L)
88+
.withFieldValue("age", 20)
89+
.build(),
90+
Row.withSchema(SCHEMA)
91+
.withFieldValue("name", "c")
92+
.withFieldValue("number", 3L)
93+
.withFieldValue("age", null)
94+
.build());
8095

8196
private static final BigQueryOptions OPTIONS =
8297
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);

0 commit comments

Comments
 (0)