@@ -164,33 +164,76 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
164164 private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
165165 DateTimeFormat .forPattern ("yyyy-MM-dd HH:mm:ss" ).withZoneUTC ();
166166
167- @ VisibleForTesting
168- static String formatTimestamp (Long timestampMicro ) {
169- String dateTime = formatDatetime (timestampMicro );
170- return dateTime + " UTC" ;
167+ /**
168+ * Enum to define the precision of a timestamp since the epoch. It provides methods to normalize
169+ * any precision to seconds and nanoseconds.
170+ */
171+ enum TimestampPrecision {
172+ MILLISECONDS (1_000L , 1_000_000L ),
173+ MICROSECONDS (1_000_000L , 1_000L ),
174+ NANOSECONDS (1_000_000_000L , 1L );
175+
176+ private final long divisorForSeconds ;
177+ private final long nanoMultiplier ;
178+
179+ TimestampPrecision (long divisorForSeconds , long nanoMultiplier ) {
180+ this .divisorForSeconds = divisorForSeconds ;
181+ this .nanoMultiplier = nanoMultiplier ;
182+ }
183+
184+ public long getDivisorForSeconds () {
185+ return divisorForSeconds ;
186+ }
187+
188+ public long toNanos (long fractionalPart ) {
189+ return fractionalPart * this .nanoMultiplier ;
190+ }
191+
192+ public String formatFractional (long nanoOfSecond ) {
193+ if (nanoOfSecond % 1_000_000 == 0 ) {
194+ return String .format (".%03d" , nanoOfSecond / 1_000_000 );
195+ } else if (nanoOfSecond % 1000 == 0 ) {
196+ return String .format (".%06d" , nanoOfSecond / 1000 );
197+ } else {
198+ return String .format (".%09d" , nanoOfSecond );
199+ }
200+ }
171201 }
172202
203+ /**
204+ * Formats a timestamp value with specified precision.
205+ *
206+ * @param timestamp The timestamp value in units specified by precision (milliseconds,
207+ * microseconds, or nanoseconds since epoch)
208+ * @param precision The precision of the input timestamp
209+ * @return Formatted string in "yyyy-MM-dd HH:mm:ss[.fraction]" format
210+ */
173211 @ VisibleForTesting
174- static String formatDatetime (Long timestampMicro ) {
175- // timestampMicro is in "microseconds since epoch" format,
176- // e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
177- // Separate into seconds and microseconds.
178- long timestampSec = timestampMicro / 1_000_000 ;
179- long micros = timestampMicro % 1_000_000 ;
180- if (micros < 0 ) {
181- micros += 1_000_000 ;
212+ static String formatDatetime (long timestamp , TimestampPrecision precision ) {
213+ long divisor = precision .getDivisorForSeconds ();
214+ long timestampSec = timestamp / divisor ;
215+ long fractionalPart = timestamp % divisor ;
216+
217+ if (fractionalPart < 0 ) {
218+ fractionalPart += divisor ;
182219 timestampSec -= 1 ;
183220 }
221+
184222 String dayAndTime = DATE_AND_SECONDS_FORMATTER .print (timestampSec * 1000 );
185- if (micros == 0 ) {
223+
224+ long nanoOfSecond = precision .toNanos (fractionalPart );
225+
226+ if (nanoOfSecond == 0 ) {
186227 return dayAndTime ;
187- } else if (micros % 1000 == 0 ) {
188- return String .format ("%s.%03d" , dayAndTime , micros / 1000 );
189228 } else {
190- return String . format ( "%s.%06d" , dayAndTime , micros );
229+ return dayAndTime + precision . formatFractional ( nanoOfSecond );
191230 }
192231 }
193232
233+ static String formatTimestamp (long timestamp , TimestampPrecision precision ) {
234+ return formatDatetime (timestamp , precision ) + " UTC" ;
235+ }
236+
194237 /**
195238 * This method formats a BigQuery DATE value into a String matching the format used by JSON
196239 * export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic
@@ -335,7 +378,6 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
335378 // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
336379 // INTEGER type maps to an Avro LONG type.
337380 checkNotNull (v , "REQUIRED field %s should not be null" , name );
338-
339381 Type type = schema .getType ();
340382 LogicalType logicalType = schema .getLogicalType ();
341383 switch (type ) {
@@ -364,21 +406,26 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
364406 } else if (logicalType instanceof LogicalTypes .TimestampMillis ) {
365407 // Write only: SQL type TIMESTAMP
366408 // ideally Instant but TableRowJsonCoder encodes as String
367- return formatTimestamp ((Long ) v * 1000L );
409+ return formatTimestamp ((Long ) v , TimestampPrecision . MILLISECONDS );
368410 } else if (logicalType instanceof LogicalTypes .TimestampMicros ) {
369411 // SQL type TIMESTAMP
370412 // ideally Instant but TableRowJsonCoder encodes as String
371- return formatTimestamp ((Long ) v );
413+ return formatTimestamp ((Long ) v , TimestampPrecision .MICROSECONDS );
414+ // TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
415+ } else if ("timestamp-nanos" .equals (schema .getProp ("logicalType" ))) {
416+ // SQL type TIMESTAMP
417+ // ideally Instant but TableRowJsonCoder encodes as String
418+ return formatTimestamp ((Long ) v , TimestampPrecision .NANOSECONDS );
372419 } else if (!(VERSION_AVRO .startsWith ("1.8" ) || VERSION_AVRO .startsWith ("1.9" ))
373420 && logicalType instanceof LogicalTypes .LocalTimestampMillis ) {
374421 // Write only: SQL type DATETIME
375422 // ideally LocalDateTime but TableRowJsonCoder encodes as String
376- return formatDatetime (((Long ) v ) * 1000 );
423+ return formatDatetime (((Long ) v ), TimestampPrecision . MILLISECONDS );
377424 } else if (!(VERSION_AVRO .startsWith ("1.8" ) || VERSION_AVRO .startsWith ("1.9" ))
378425 && logicalType instanceof LogicalTypes .LocalTimestampMicros ) {
379426 // Write only: SQL type DATETIME
380427 // ideally LocalDateTime but TableRowJsonCoder encodes as String
381- return formatDatetime ((Long ) v );
428+ return formatDatetime ((Long ) v , TimestampPrecision . MICROSECONDS );
382429 } else {
383430 // SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
384431 // ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ JSON export that uses
@@ -602,6 +649,10 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv
602649 return fieldSchema .setType ("INTEGER" );
603650 }
604651 case LONG :
652+ // TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
653+ if (useAvroLogicalTypes && ("timestamp-nanos" .equals (type .getProp ("logicalType" )))) {
654+ return fieldSchema .setType ("TIMESTAMP" );
655+ }
605656 if (logicalType instanceof LogicalTypes .TimeMicros ) {
606657 return fieldSchema .setType ("TIME" );
607658 } else if (!(VERSION_AVRO .startsWith ("1.8" ) || VERSION_AVRO .startsWith ("1.9" ))
0 commit comments