Skip to content

Commit 1803689

Browse files
committed
Support micros isntant in spannerio.
1 parent 9516397 commit 1803689

File tree

5 files changed

+257
-37
lines changed

5 files changed

+257
-37
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
import com.google.cloud.spanner.Mutation;
2929
import com.google.cloud.spanner.Value;
3030
import java.math.BigDecimal;
31+
import java.time.Instant;
3132
import java.util.HashMap;
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.stream.StreamSupport;
3536
import org.apache.beam.sdk.schemas.Schema;
37+
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
3638
import org.apache.beam.sdk.transforms.SerializableFunction;
3739
import org.apache.beam.sdk.values.Row;
3840
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -147,6 +149,22 @@ private static void setBeamValueToKey(
147149
keyBuilder.append(row.getDecimal(columnName));
148150
break;
149151
// TODO: Implement logical date and datetime
152+
case LOGICAL_TYPE:
153+
Schema.LogicalType<?, ?> logicalType = checkNotNull(field.getLogicalType());
154+
String identifier = logicalType.getIdentifier();
155+
if (identifier.equals(MicrosInstant.IDENTIFIER)) {
156+
Instant instant = row.getValue(columnName);
157+
if (instant == null) {
158+
keyBuilder.append((Timestamp) null);
159+
} else {
160+
long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L;
161+
keyBuilder.append(Timestamp.ofTimeMicroseconds(micros));
162+
}
163+
} else {
164+
throw new IllegalArgumentException(
165+
String.format("Unsupported logical type in key: %s", identifier));
166+
}
167+
break;
150168
case DATETIME:
151169
@Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
152170
if (dateTime == null) {
@@ -224,7 +242,22 @@ private static void setBeamValueToMutation(
224242
mutationBuilder.set(columnName).to(decimal);
225243
}
226244
break;
227-
// TODO: Implement logical date and datetime
245+
case LOGICAL_TYPE:
246+
Schema.LogicalType<?, ?> logicalType = checkNotNull(fieldType.getLogicalType());
247+
String identifier = logicalType.getIdentifier();
248+
if (identifier.equals(MicrosInstant.IDENTIFIER)) {
249+
@Nullable Instant instant = row.getValue(columnName);
250+
if (instant == null) {
251+
mutationBuilder.set(columnName).to((Timestamp) null);
252+
} else {
253+
long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L;
254+
mutationBuilder.set(columnName).to(Timestamp.ofTimeMicroseconds(micros));
255+
}
256+
} else {
257+
throw new IllegalArgumentException(
258+
String.format("Unsupported logical type: %s", identifier));
259+
}
260+
break;
228261
case DATETIME:
229262
@Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
230263
if (dateTime == null) {
@@ -335,6 +368,31 @@ private static void addIterableToMutationBuilder(
335368
case STRING:
336369
mutationBuilder.set(column).toStringArray((Iterable<String>) ((Object) iterable));
337370
break;
371+
case LOGICAL_TYPE:
372+
String identifier = checkNotNull(beamIterableType.getLogicalType()).getIdentifier();
373+
if (identifier.equals(MicrosInstant.IDENTIFIER)) {
374+
if (iterable == null) {
375+
mutationBuilder.set(column).toTimestampArray(null);
376+
} else {
377+
mutationBuilder
378+
.set(column)
379+
.toTimestampArray(
380+
StreamSupport.stream(iterable.spliterator(), false)
381+
.map(
382+
instant -> {
383+
Instant javaInstant = (java.time.Instant) instant;
384+
long micros =
385+
javaInstant.getEpochSecond() * 1_000_000L
386+
+ javaInstant.getNano() / 1_000L;
387+
return Timestamp.ofTimeMicroseconds(micros);
388+
})
389+
.collect(toList()));
390+
}
391+
} else {
392+
throw new IllegalArgumentException(
393+
String.format("Unsupported logical type in iterable: %s", identifier));
394+
}
395+
break;
338396
case DATETIME:
339397
if (iterable == null) {
340398
mutationBuilder.set(column).toDateArray(null);

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map;
3232
import java.util.stream.StreamSupport;
3333
import org.apache.beam.sdk.schemas.Schema;
34+
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
3435
import org.apache.beam.sdk.values.Row;
3536
import org.checkerframework.checker.nullness.qual.Nullable;
3637
import org.joda.time.DateTime;
@@ -365,7 +366,23 @@ private static void addIterableToStructBuilder(
365366
return struct.getBytes(column).toByteArray();
366367
// TODO: implement logical datetime
367368
case TIMESTAMP:
368-
return Instant.ofEpochSecond(struct.getTimestamp(column).getSeconds()).toDateTime();
369+
Timestamp spannerTimestamp = struct.getTimestamp(column);
370+
371+
// Check if the Beam schema expects MicrosInstant logical type
372+
Schema.FieldType fieldType = field.getType();
373+
if (fieldType.getTypeName().isLogicalType()) {
374+
Schema.@Nullable LogicalType<?, ?> logicalType = fieldType.getLogicalType();
375+
if (logicalType != null && logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
376+
// Convert to java.time.Instant with microsecond precision
377+
long micros =
378+
spannerTimestamp.getSeconds() * 1_000_000L + spannerTimestamp.getNanos() / 1_000L;
379+
return java.time.Instant.ofEpochSecond(
380+
micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
381+
}
382+
}
383+
// Default DATETIME behavior: convert to Joda DateTime
384+
return Instant.ofEpochSecond(spannerTimestamp.getSeconds()).toDateTime();
385+
369386
// TODO: implement logical date
370387
case DATE:
371388
return DateTime.parse(struct.getDate(column).toString());
@@ -407,11 +424,29 @@ private static void addIterableToStructBuilder(
407424
return struct.getBooleanList(column);
408425
case BYTES:
409426
return struct.getBytesList(column);
410-
// TODO: implement logical datetime
411427
case TIMESTAMP:
428+
// Check if expects MicrosInstant in arrays
429+
Schema.@Nullable FieldType elementType = field.getType().getCollectionElementType();
430+
if (elementType != null && elementType.getTypeName().isLogicalType()) {
431+
Schema.@Nullable LogicalType<?, ?> logicalType = elementType.getLogicalType();
432+
if (logicalType != null && logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
433+
// Return List<java.time.Instant> for MicrosInstant arrays
434+
return struct.getTimestampList(column).stream()
435+
.map(
436+
timestamp -> {
437+
long micros =
438+
timestamp.getSeconds() * 1_000_000L + timestamp.getNanos() / 1_000L;
439+
return java.time.Instant.ofEpochSecond(
440+
micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
441+
})
442+
.collect(toList());
443+
}
444+
}
445+
// Default: return List<DateTime> for DATETIME type
412446
return struct.getTimestampList(column).stream()
413447
.map(timestamp -> Instant.ofEpochSecond(timestamp.getSeconds()).toDateTime())
414448
.collect(toList());
449+
415450
// TODO: implement logical date
416451
case DATE:
417452
return struct.getDateList(column).stream()

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtilsTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import com.google.cloud.spanner.Struct;
2929
import com.google.cloud.spanner.Type;
3030
import java.math.BigDecimal;
31+
import java.time.Instant;
3132
import java.util.List;
3233
import org.apache.beam.sdk.schemas.Schema;
34+
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
3335
import org.apache.beam.sdk.values.Row;
3436
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
3537
import org.joda.time.DateTime;
@@ -44,6 +46,7 @@ public class MutationUtilsTest {
4446
private static final Struct EMPTY_STRUCT = Struct.newBuilder().build();
4547
private static final Struct INT64_STRUCT = Struct.newBuilder().set("int64").to(3L).build();
4648
private static final String TABLE = "some_table";
49+
private static final Instant TEST_INSTANT = Instant.parse("2024-01-15T10:30:00.123456Z");
4750

4851
private static final Schema WRITE_ROW_SCHEMA =
4952
Schema.builder()
@@ -71,6 +74,10 @@ public class MutationUtilsTest {
7174
.addNullableField("f_decimal", Schema.FieldType.DECIMAL)
7275
.addNullableField("f_byte", Schema.FieldType.BYTE)
7376
.addNullableField("f_iterable", Schema.FieldType.iterable(Schema.FieldType.INT64))
77+
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
78+
.addNullableField(
79+
"f_micros_instant_array",
80+
Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant())))
7481
.build();
7582

7683
private static final Row WRITE_ROW =
@@ -107,6 +114,8 @@ public class MutationUtilsTest {
107114
.withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE))
108115
.withFieldValue("f_byte", Byte.parseByte("127"))
109116
.withFieldValue("f_iterable", ImmutableList.of(2L, 3L))
117+
.withFieldValue("f_micros_instant", TEST_INSTANT)
118+
.withFieldValue("f_micros_instant_array", ImmutableList.of(TEST_INSTANT, TEST_INSTANT))
110119
.build();
111120

112121
private static final Schema WRITE_ROW_SCHEMA_NULLS =
@@ -123,6 +132,10 @@ public class MutationUtilsTest {
123132
.addNullableField("f_array", Schema.FieldType.array(Schema.FieldType.INT64))
124133
.addNullableField(
125134
"f_struct_array", Schema.FieldType.array(Schema.FieldType.row(INT64_SCHEMA)))
135+
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
136+
.addNullableField(
137+
"f_micros_instant_array",
138+
Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant())))
126139
.build();
127140

128141
private static final Row WRITE_ROW_NULLS =
@@ -138,6 +151,8 @@ public class MutationUtilsTest {
138151
.addValue(null)
139152
.addValue(null)
140153
.addValue(null)
154+
.addValue(null)
155+
.addValue(null)
141156
.build();
142157

143158
private static final Schema KEY_SCHEMA =
@@ -153,6 +168,7 @@ public class MutationUtilsTest {
153168
.addNullableField("f_int32", Schema.FieldType.INT32)
154169
.addNullableField("f_decimal", Schema.FieldType.DECIMAL)
155170
.addNullableField("f_byte", Schema.FieldType.BYTE)
171+
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
156172
.build();
157173

158174
private static final Row KEY_ROW =
@@ -168,6 +184,7 @@ public class MutationUtilsTest {
168184
.withFieldValue("f_int32", 0x7fffffff)
169185
.withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE))
170186
.withFieldValue("f_byte", Byte.parseByte("127"))
187+
.withFieldValue("f_micros_instant", TEST_INSTANT)
171188
.build();
172189

173190
private static final Schema KEY_SCHEMA_NULLS =
@@ -178,6 +195,7 @@ public class MutationUtilsTest {
178195
.addNullableField("f_bytes", Schema.FieldType.BYTES)
179196
.addNullableField("f_date_time", Schema.FieldType.DATETIME)
180197
.addNullableField("f_bool", Schema.FieldType.BOOLEAN)
198+
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
181199
.build();
182200

183201
private static final Row KEY_ROW_NULLS =
@@ -188,6 +206,7 @@ public class MutationUtilsTest {
188206
.addValue(null)
189207
.addValue(null)
190208
.addValue(null)
209+
.addValue(null)
191210
.build();
192211

193212
@Test
@@ -264,6 +283,7 @@ public void testCreateDeleteMutationFromRowWithNulls() {
264283
}
265284

266285
private static Mutation createDeleteMutation() {
286+
long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L + TEST_INSTANT.getNano() / 1_000L;
267287
Key key =
268288
Key.newBuilder()
269289
.append(1L)
@@ -277,6 +297,7 @@ private static Mutation createDeleteMutation() {
277297
.append(0x7fffffff)
278298
.append(BigDecimal.valueOf(Long.MIN_VALUE))
279299
.append(Byte.parseByte("127"))
300+
.append(Timestamp.ofTimeMicroseconds(micros))
280301
.build();
281302
return Mutation.delete(TABLE, key);
282303
}
@@ -290,12 +311,14 @@ private static Mutation createDeleteMutationNulls() {
290311
.append((ByteArray) null)
291312
.append((Timestamp) null)
292313
.append((Boolean) null)
314+
.append((Timestamp) null)
293315
.build();
294316
return Mutation.delete(TABLE, key);
295317
}
296318

297319
private static Mutation createMutation(Mutation.Op operation) {
298320
Mutation.WriteBuilder builder = chooseBuilder(operation);
321+
long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L + TEST_INSTANT.getNano() / 1_000L;
299322
return builder
300323
.set("f_int64")
301324
.to(1L)
@@ -353,6 +376,12 @@ private static Mutation createMutation(Mutation.Op operation) {
353376
.to(Byte.parseByte("127"))
354377
.set("f_iterable")
355378
.toInt64Array(ImmutableList.of(2L, 3L))
379+
.set("f_micros_instant")
380+
.to(Timestamp.ofTimeMicroseconds(micros))
381+
.set("f_micros_instant_array")
382+
.toTimestampArray(
383+
ImmutableList.of(
384+
Timestamp.ofTimeMicroseconds(micros), Timestamp.ofTimeMicroseconds(micros)))
356385
.build();
357386
}
358387

@@ -381,6 +410,10 @@ private static Mutation createMutationNulls(Mutation.Op operation) {
381410
.toInt64Array((List<Long>) null)
382411
.set("f_struct_array")
383412
.toStructArray(Type.struct(Type.StructField.of("int64", Type.int64())), null)
413+
.set("f_micros_instant")
414+
.to((Timestamp) null)
415+
.set("f_micros_instant_array")
416+
.toTimestampArray(null)
384417
.build();
385418
}
386419

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
import com.google.spanner.v1.StructType;
3434
import com.google.spanner.v1.TypeCode;
3535
import java.math.BigDecimal;
36+
import java.time.Instant;
3637
import java.util.List;
3738
import org.apache.beam.sdk.schemas.Schema;
39+
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
3840
import org.apache.beam.sdk.values.Row;
3941
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
4042
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -286,6 +288,43 @@ public void testStructTypeToBeamRowSchemaFailsTypeNotSupported() {
286288
"Error processing struct to row: Unsupported type 'STRUCT'.", exception.getMessage());
287289
}
288290

291+
@Test
292+
public void testStructToBeamRowWithMicrosInstant() {
293+
Schema schema =
294+
Schema.builder()
295+
.addInt64Field("f_int64")
296+
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
297+
.addNullableField(
298+
"f_micros_instant_array",
299+
Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant())))
300+
.build();
301+
302+
Timestamp ts = Timestamp.ofTimeMicroseconds(1234567890123456L);
303+
Struct struct =
304+
Struct.newBuilder()
305+
.set("f_int64")
306+
.to(42L)
307+
.set("f_micros_instant")
308+
.to(ts)
309+
.set("f_micros_instant_array")
310+
.toTimestampArray(ImmutableList.of(ts, ts))
311+
.build();
312+
313+
Row result = StructUtils.structToBeamRow(struct, schema);
314+
315+
assertEquals(42L, result.getInt64("f_int64").longValue());
316+
317+
Instant expectedInstant =
318+
Instant.ofEpochSecond(
319+
1234567890123456L / 1_000_000L, (1234567890123456L % 1_000_000L) * 1_000L);
320+
assertEquals(expectedInstant, result.getValue("f_micros_instant"));
321+
322+
@SuppressWarnings("unchecked")
323+
List<Instant> instants = (List<Instant>) result.getValue("f_micros_instant_array");
324+
assertEquals(2, instants.size());
325+
assertEquals(expectedInstant, instants.get(0));
326+
}
327+
289328
private StructType.Field getFieldForTypeCode(String name, TypeCode typeCode) {
290329
return StructType.Field.newBuilder()
291330
.setName(name)

0 commit comments

Comments
 (0)