3636import java .util .Objects ;
3737import java .util .UUID ;
3838import java .util .concurrent .TimeUnit ;
39+ import java .util .function .Function ;
3940import java .util .stream .Collectors ;
4041import javax .annotation .Nonnull ;
4142import net .bytebuddy .description .type .TypeDescription .ForLoadedType ;
9798import org .apache .beam .sdk .values .Row ;
9899import org .apache .beam .sdk .values .TypeDescriptor ;
99100import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .base .CaseFormat ;
101+ import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .ImmutableMap ;
100102import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Iterables ;
101103import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Lists ;
102104import org .apache .beam .vendor .guava .v32_1_2_jre .com .google .common .collect .Maps ;
@@ -1214,14 +1216,13 @@ private static org.apache.avro.Schema getFieldSchema(
12141216 return fieldType .getNullable () ? ReflectData .makeNullable (baseType ) : baseType ;
12151217 }
12161218
1217- private static final org .apache .avro .Schema INT_AVRO_TYPE =
1218- org .apache .avro .Schema .create (Type .INT );
1219- private static final org .apache .avro .Schema LONG_AVRO_TYPE =
1220- org .apache .avro .Schema .create (Type .LONG );
1221- private static final org .apache .avro .Schema FLOAT_AVRO_TYPE =
1222- org .apache .avro .Schema .create (Type .FLOAT );
1223- private static final org .apache .avro .Schema DOUBLE_AVRO_TYPE =
1224- org .apache .avro .Schema .create (Type .DOUBLE );
1219+ private static final Map <org .apache .avro .Schema , Function <Number , ? extends Number >>
1220+ NUMERIC_CONVERTERS =
1221+ ImmutableMap .of (
1222+ org .apache .avro .Schema .create (Type .INT ), Number ::intValue ,
1223+ org .apache .avro .Schema .create (Type .LONG ), Number ::longValue ,
1224+ org .apache .avro .Schema .create (Type .FLOAT ), Number ::floatValue ,
1225+ org .apache .avro .Schema .create (Type .DOUBLE ), Number ::doubleValue );
12251226
12261227 /** Convert a value from Beam Row to a vlue used for Avro GenericRecord. */
12271228 private static @ Nullable Object genericFromBeamField (
@@ -1240,15 +1241,8 @@ private static org.apache.avro.Schema getFieldSchema(
12401241 return value ;
12411242 }
12421243
1243- // Handle implicit up-cast: use avroSchema
1244- if (INT_AVRO_TYPE .equals (typeWithNullability .type )) {
1245- return ((Number ) value ).intValue ();
1246- } else if (LONG_AVRO_TYPE .equals (typeWithNullability .type )) {
1247- return ((Number ) value ).longValue ();
1248- } else if (FLOAT_AVRO_TYPE .equals (typeWithNullability .type )) {
1249- return ((Number ) value ).floatValue ();
1250- } else if (DOUBLE_AVRO_TYPE .equals (typeWithNullability .type )) {
1251- return ((Number ) value ).doubleValue ();
1244+ if (NUMERIC_CONVERTERS .containsKey (typeWithNullability .type )) {
1245+ return NUMERIC_CONVERTERS .get (typeWithNullability .type ).apply ((Number ) value );
12521246 }
12531247
12541248 // TODO: should we use Avro Schema as the source-of-truth in general?
0 commit comments