Skip to content

Commit bcdd5ea

Browse files
committed
Add timestamp-nanos avro logical type support in bigquery utils.
1 parent e336419 commit bcdd5ea

File tree

4 files changed

+267
-37
lines changed

4 files changed

+267
-37
lines changed

sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
8282
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
8383
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
84+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
8485
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
8586
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
8687
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
@@ -137,6 +138,7 @@
137138
* LogicalTypes.TimestampMillis <-----> DATETIME
138139
* LogicalTypes.TimestampMicros ------> Long
139140
* LogicalTypes.TimestampMicros <------ LogicalType(urn="beam:logical_type:micros_instant:v1")
141+
* LogicalTypes.TimestampNanos <------> LogicalType(TIMESTAMP(9))
140142
* LogicalTypes.Decimal <-----> DECIMAL
141143
* </pre>
142144
*
@@ -1027,6 +1029,10 @@ private static FieldType toFieldType(TypeWithNullability type) {
10271029
fieldType = FieldType.DATETIME;
10281030
}
10291031
}
1032+
// TODO: Remove once Avro 1.12+ has timestamp-nanos
1033+
if (fieldType == null && "timestamp-nanos".equals(avroSchema.getProp("logicalType"))) {
1034+
fieldType = FieldType.logicalType(Timestamp.NANOS);
1035+
}
10301036

10311037
if (fieldType == null) {
10321038
switch (type.type.getType()) {
@@ -1186,6 +1192,9 @@ private static org.apache.avro.Schema getFieldSchema(
11861192
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
11871193
baseType =
11881194
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
1195+
} else if (Timestamp.IDENTIFIER.equals(identifier)) {
1196+
String schemaJson = "{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}";
1197+
baseType = new org.apache.avro.Schema.Parser().parse(schemaJson);
11891198
} else {
11901199
throw new RuntimeException(
11911200
"Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier());
@@ -1340,6 +1349,11 @@ private static org.apache.avro.Schema getFieldSchema(
13401349
java.time.Instant instant = (java.time.Instant) value;
13411350
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
13421351
+ TimeUnit.NANOSECONDS.toMicros(instant.getNano());
1352+
} else if (Timestamp.IDENTIFIER.equals(identifier)) {
1353+
java.time.Instant instant = (java.time.Instant) value;
1354+
long epochSeconds = instant.getEpochSecond();
1355+
int nanoOfSecond = instant.getNano();
1356+
return (epochSeconds * 1_000_000_000L) + nanoOfSecond;
13431357
} else {
13441358
throw new RuntimeException("Unhandled logical type " + identifier);
13451359
}
@@ -1387,6 +1401,22 @@ private static Object convertLogicalType(
13871401
@Nonnull FieldType fieldType,
13881402
@Nonnull GenericData genericData) {
13891403
TypeWithNullability type = new TypeWithNullability(avroSchema);
1404+
1405+
// TODO: Remove this workaround once Avro is upgraded to 1.12+ where timestamp-nanos
1406+
if ("timestamp-nanos".equals(type.type.getProp("logicalType"))) {
1407+
if (type.type.getType() == Type.LONG) {
1408+
Long nanos = (Long) value;
1409+
// Check if Beam expects Timestamp logical type
1410+
if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE
1411+
&& org.apache.beam.sdk.schemas.logicaltypes.Timestamp.IDENTIFIER.equals(
1412+
fieldType.getLogicalType().getIdentifier())) {
1413+
return java.time.Instant.ofEpochSecond(0L, nanos);
1414+
} else {
1415+
return nanos;
1416+
}
1417+
}
1418+
}
1419+
13901420
LogicalType logicalType = LogicalTypes.fromSchema(type.type);
13911421
if (logicalType == null) {
13921422
return null;

sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
5555
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
5656
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
57+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
5758
import org.apache.beam.sdk.testing.CoderProperties;
5859
import org.apache.beam.sdk.transforms.Create;
5960
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -549,6 +550,87 @@ public void testFromBeamSchema() {
549550
assertEquals(getAvroSchema(), avroSchema);
550551
}
551552

553+
@Test
554+
public void testBeamTimestampNanosLogicalTypeToAvroSchema() {
555+
Schema beamSchema =
556+
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
557+
558+
// Expected Avro schema with timestamp-nanos
559+
String expectedJson =
560+
"{\"type\": \"record\", \"name\": \"topLevelRecord\", "
561+
+ "\"fields\": [{\"name\": \"timestampNanos\", "
562+
+ "\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}}]}";
563+
564+
org.apache.avro.Schema expectedAvroSchema =
565+
new org.apache.avro.Schema.Parser().parse(expectedJson);
566+
567+
assertEquals(expectedAvroSchema, AvroUtils.toAvroSchema(beamSchema));
568+
}
569+
570+
@Test
571+
public void testBeamTimestampNanosToGenericRecord() {
572+
Schema beamSchema =
573+
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
574+
575+
java.time.Instant instant = java.time.Instant.parse("2000-01-01T01:02:03.123456789Z");
576+
Row beamRow = Row.withSchema(beamSchema).addValue(instant).build();
577+
578+
// Expected nanos since epoch
579+
long expectedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();
580+
System.out.println(expectedNanos);
581+
582+
org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
583+
GenericRecord avroRecord = AvroUtils.toGenericRecord(beamRow, avroSchema);
584+
585+
assertEquals(expectedNanos, avroRecord.get("timestampNanos"));
586+
}
587+
588+
@Test
589+
public void testTimestampNanosRoundTrip() {
590+
Schema beamSchema =
591+
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
592+
593+
// Test various nanosecond precisions
594+
java.time.Instant[] testInstants = {
595+
java.time.Instant.parse("2000-01-01T00:00:00.000000001Z"), // 1 nano
596+
java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos
597+
java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos
598+
};
599+
600+
org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
601+
602+
for (java.time.Instant instant : testInstants) {
603+
Row originalRow = Row.withSchema(beamSchema).addValue(instant).build();
604+
GenericRecord avroRecord = AvroUtils.toGenericRecord(originalRow, avroSchema);
605+
Row roundTripRow = AvroUtils.toBeamRowStrict(avroRecord, beamSchema);
606+
607+
assertEquals(originalRow, roundTripRow);
608+
java.time.Instant roundTripInstant =
609+
(java.time.Instant) roundTripRow.getValue("timestampNanos");
610+
assertEquals(instant, roundTripInstant);
611+
}
612+
}
613+
614+
@Test
615+
public void testTimestampNanosAvroSchemaToBeamSchema() {
616+
List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
617+
fields.add(
618+
new org.apache.avro.Schema.Field(
619+
"timestampNanos",
620+
new org.apache.avro.Schema.Parser()
621+
.parse("{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"),
622+
"",
623+
(Object) null));
624+
org.apache.avro.Schema avroSchema =
625+
org.apache.avro.Schema.createRecord("test", null, null, false, fields);
626+
627+
Schema beamSchema = AvroUtils.toBeamSchema(avroSchema);
628+
629+
Schema expected =
630+
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
631+
assertEquals(expected, beamSchema);
632+
}
633+
552634
@Test
553635
public void testAvroSchemaFromBeamSchemaCanBeParsed() {
554636
org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(getBeamSchema());

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -164,33 +164,76 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
164164
private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
165165
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
166166

167-
@VisibleForTesting
168-
static String formatTimestamp(Long timestampMicro) {
169-
String dateTime = formatDatetime(timestampMicro);
170-
return dateTime + " UTC";
167+
/**
168+
* Enum to define the precision of a timestamp since the epoch. It provides methods to normalize
169+
* any precision to seconds and nanoseconds.
170+
*/
171+
enum TimestampPrecision {
172+
MILLISECONDS(1_000L, 1_000_000L),
173+
MICROSECONDS(1_000_000L, 1_000L),
174+
NANOSECONDS(1_000_000_000L, 1L);
175+
176+
private final long divisorForSeconds;
177+
private final long nanoMultiplier;
178+
179+
TimestampPrecision(long divisorForSeconds, long nanoMultiplier) {
180+
this.divisorForSeconds = divisorForSeconds;
181+
this.nanoMultiplier = nanoMultiplier;
182+
}
183+
184+
public long getDivisorForSeconds() {
185+
return divisorForSeconds;
186+
}
187+
188+
public long toNanos(long fractionalPart) {
189+
return fractionalPart * this.nanoMultiplier;
190+
}
191+
192+
public String formatFractional(long nanoOfSecond) {
193+
if (nanoOfSecond % 1_000_000 == 0) {
194+
return String.format(".%03d", nanoOfSecond / 1_000_000);
195+
} else if (nanoOfSecond % 1000 == 0) {
196+
return String.format(".%06d", nanoOfSecond / 1000);
197+
} else {
198+
return String.format(".%09d", nanoOfSecond);
199+
}
200+
}
171201
}
172202

203+
/**
204+
* Formats a timestamp value with specified precision.
205+
*
206+
* @param timestamp The timestamp value in units specified by precision (milliseconds,
207+
* microseconds, or nanoseconds since epoch)
208+
* @param precision The precision of the input timestamp
209+
* @return Formatted string in "yyyy-MM-dd HH:mm:ss[.fraction]" format
210+
*/
173211
@VisibleForTesting
174-
static String formatDatetime(Long timestampMicro) {
175-
// timestampMicro is in "microseconds since epoch" format,
176-
// e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
177-
// Separate into seconds and microseconds.
178-
long timestampSec = timestampMicro / 1_000_000;
179-
long micros = timestampMicro % 1_000_000;
180-
if (micros < 0) {
181-
micros += 1_000_000;
212+
static String formatDatetime(long timestamp, TimestampPrecision precision) {
213+
long divisor = precision.getDivisorForSeconds();
214+
long timestampSec = timestamp / divisor;
215+
long fractionalPart = timestamp % divisor;
216+
217+
if (fractionalPart < 0) {
218+
fractionalPart += divisor;
182219
timestampSec -= 1;
183220
}
221+
184222
String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
185-
if (micros == 0) {
223+
224+
long nanoOfSecond = precision.toNanos(fractionalPart);
225+
226+
if (nanoOfSecond == 0) {
186227
return dayAndTime;
187-
} else if (micros % 1000 == 0) {
188-
return String.format("%s.%03d", dayAndTime, micros / 1000);
189228
} else {
190-
return String.format("%s.%06d", dayAndTime, micros);
229+
return dayAndTime + precision.formatFractional(nanoOfSecond);
191230
}
192231
}
193232

233+
static String formatTimestamp(long timestamp, TimestampPrecision precision) {
234+
return formatDatetime(timestamp, precision) + " UTC";
235+
}
236+
194237
/**
195238
* This method formats a BigQuery DATE value into a String matching the format used by JSON
196239
* export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic
@@ -335,7 +378,7 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
335378
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
336379
// INTEGER type maps to an Avro LONG type.
337380
checkNotNull(v, "REQUIRED field %s should not be null", name);
338-
381+
System.out.println(schema);
339382
Type type = schema.getType();
340383
LogicalType logicalType = schema.getLogicalType();
341384
switch (type) {
@@ -357,28 +400,34 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
357400
return ((Integer) v).toString();
358401
}
359402
case LONG:
403+
System.out.println(logicalType);
360404
if (logicalType instanceof LogicalTypes.TimeMicros) {
361405
// SQL type TIME
362406
// ideally LocalTime but TableRowJsonCoder encodes as String
363407
return formatTime((Long) v);
364408
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
365409
// Write only: SQL type TIMESTAMP
366410
// ideally Instant but TableRowJsonCoder encodes as String
367-
return formatTimestamp((Long) v * 1000L);
411+
return formatTimestamp((Long) v, TimestampPrecision.MILLISECONDS);
368412
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
369413
// SQL type TIMESTAMP
370414
// ideally Instant but TableRowJsonCoder encodes as String
371-
return formatTimestamp((Long) v);
415+
return formatTimestamp((Long) v, TimestampPrecision.MICROSECONDS);
416+
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
417+
} else if ("timestamp-nanos".equals(schema.getProp("logicalType"))) {
418+
// SQL type TIMESTAMP
419+
// ideally Instant but TableRowJsonCoder encodes as String
420+
return formatTimestamp((Long) v, TimestampPrecision.NANOSECONDS);
372421
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
373422
&& logicalType instanceof LogicalTypes.LocalTimestampMillis) {
374423
// Write only: SQL type DATETIME
375424
// ideally LocalDateTime but TableRowJsonCoder encodes as String
376-
return formatDatetime(((Long) v) * 1000);
425+
return formatDatetime(((Long) v), TimestampPrecision.MILLISECONDS);
377426
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
378427
&& logicalType instanceof LogicalTypes.LocalTimestampMicros) {
379428
// Write only: SQL type DATETIME
380429
// ideally LocalDateTime but TableRowJsonCoder encodes as String
381-
return formatDatetime((Long) v);
430+
return formatDatetime((Long) v, TimestampPrecision.MICROSECONDS);
382431
} else {
383432
// SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
384433
// ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ JSON export that uses
@@ -602,6 +651,10 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv
602651
return fieldSchema.setType("INTEGER");
603652
}
604653
case LONG:
654+
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
655+
if (useAvroLogicalTypes && ("timestamp-nanos".equals(type.getProp("logicalType")))) {
656+
return fieldSchema.setType("TIMESTAMP");
657+
}
605658
if (logicalType instanceof LogicalTypes.TimeMicros) {
606659
return fieldSchema.setType("TIME");
607660
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))

0 commit comments

Comments
 (0)