Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 14
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Value;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -102,6 +104,11 @@ public static Mutation createMutationFromBeamRows(
return mutationBuilder.build();
}

private static Timestamp toSpannerTimestamp(Instant instant) {
long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L;
return Timestamp.ofTimeMicroseconds(micros);
}

private static void setBeamValueToKey(
Key.Builder keyBuilder, Schema.FieldType field, String columnName, Row row) {
switch (field.getTypeName()) {
Expand Down Expand Up @@ -147,6 +154,21 @@ private static void setBeamValueToKey(
keyBuilder.append(row.getDecimal(columnName));
break;
// TODO: Implement logical date and datetime
case LOGICAL_TYPE:
Schema.LogicalType<?, ?> logicalType = checkNotNull(field.getLogicalType());
String identifier = logicalType.getIdentifier();
if (identifier.equals(MicrosInstant.IDENTIFIER)) {
Instant instant = row.getValue(columnName);
if (instant == null) {
keyBuilder.append((Timestamp) null);
} else {
keyBuilder.append(toSpannerTimestamp(instant));
}
} else {
throw new IllegalArgumentException(
String.format("Unsupported logical type in key: %s", identifier));
}
break;
case DATETIME:
@Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
if (dateTime == null) {
Expand Down Expand Up @@ -224,7 +246,21 @@ private static void setBeamValueToMutation(
mutationBuilder.set(columnName).to(decimal);
}
break;
// TODO: Implement logical date and datetime
case LOGICAL_TYPE:
Schema.LogicalType<?, ?> logicalType = checkNotNull(fieldType.getLogicalType());
String identifier = logicalType.getIdentifier();
if (identifier.equals(MicrosInstant.IDENTIFIER)) {
@Nullable Instant instant = row.getValue(columnName);
if (instant == null) {
mutationBuilder.set(columnName).to((Timestamp) null);
} else {
mutationBuilder.set(columnName).to(toSpannerTimestamp(instant));
}
} else {
throw new IllegalArgumentException(
String.format("Unsupported logical type: %s", identifier));
}
break;
case DATETIME:
@Nullable ReadableDateTime dateTime = row.getDateTime(columnName);
if (dateTime == null) {
Expand Down Expand Up @@ -335,6 +371,27 @@ private static void addIterableToMutationBuilder(
case STRING:
mutationBuilder.set(column).toStringArray((Iterable<String>) ((Object) iterable));
break;
case LOGICAL_TYPE:
String identifier = checkNotNull(beamIterableType.getLogicalType()).getIdentifier();
if (identifier.equals(MicrosInstant.IDENTIFIER)) {
if (iterable == null) {
mutationBuilder.set(column).toTimestampArray(null);
} else {
mutationBuilder
.set(column)
.toTimestampArray(
StreamSupport.stream(iterable.spliterator(), false)
.map(
instant -> {
return toSpannerTimestamp((java.time.Instant) instant);
})
.collect(toList()));
}
} else {
throw new IllegalArgumentException(
String.format("Unsupported logical type in iterable: %s", identifier));
}
break;
case DATETIME:
if (iterable == null) {
mutationBuilder.set(column).toDateArray(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Map;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -352,6 +353,11 @@ private static void addIterableToStructBuilder(
}
}

private static java.time.Instant fromSpannerTimestamp(Timestamp spannerTimestamp) {
long micros = spannerTimestamp.getSeconds() * 1_000_000L + spannerTimestamp.getNanos() / 1_000L;
return java.time.Instant.ofEpochSecond(micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
}

private static @Nullable Object getStructValue(Struct struct, Schema.Field field) {
String column = field.getName();
Type.Code typeCode = struct.getColumnType(column).getCode();
Expand All @@ -365,7 +371,19 @@ private static void addIterableToStructBuilder(
return struct.getBytes(column).toByteArray();
// TODO: implement logical datetime
case TIMESTAMP:
return Instant.ofEpochSecond(struct.getTimestamp(column).getSeconds()).toDateTime();
Timestamp spannerTimestamp = struct.getTimestamp(column);

// Check if the Beam schema expects MicrosInstant logical type
Schema.FieldType fieldType = field.getType();
if (fieldType.getTypeName().isLogicalType()) {
Schema.@Nullable LogicalType<?, ?> logicalType = fieldType.getLogicalType();
if (logicalType != null && logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
return fromSpannerTimestamp(spannerTimestamp);
}
}
// Default DATETIME behavior: convert to Joda DateTime
return Instant.ofEpochSecond(spannerTimestamp.getSeconds()).toDateTime();

// TODO: implement logical date
case DATE:
return DateTime.parse(struct.getDate(column).toString());
Expand Down Expand Up @@ -407,11 +425,26 @@ private static void addIterableToStructBuilder(
return struct.getBooleanList(column);
case BYTES:
return struct.getBytesList(column);
// TODO: implement logical datetime
case TIMESTAMP:
// Check if expects MicrosInstant in arrays
Schema.@Nullable FieldType elementType = field.getType().getCollectionElementType();
if (elementType != null && elementType.getTypeName().isLogicalType()) {
Schema.@Nullable LogicalType<?, ?> logicalType = elementType.getLogicalType();
if (logicalType != null && logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
// Return List<java.time.Instant> for MicrosInstant arrays
return struct.getTimestampList(column).stream()
.map(
timestamp -> {
return fromSpannerTimestamp(timestamp);
})
.collect(toList());
}
}
// Default: return List<DateTime> for DATETIME type
return struct.getTimestampList(column).stream()
.map(timestamp -> Instant.ofEpochSecond(timestamp.getSeconds()).toDateTime())
.collect(toList());

// TODO: implement logical date
case DATE:
return struct.getDateList(column).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.Type;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
Expand All @@ -44,6 +46,7 @@ public class MutationUtilsTest {
private static final Struct EMPTY_STRUCT = Struct.newBuilder().build();
private static final Struct INT64_STRUCT = Struct.newBuilder().set("int64").to(3L).build();
private static final String TABLE = "some_table";
private static final Instant TEST_INSTANT = Instant.parse("2024-01-15T10:30:00.123456Z");

private static final Schema WRITE_ROW_SCHEMA =
Schema.builder()
Expand Down Expand Up @@ -71,6 +74,10 @@ public class MutationUtilsTest {
.addNullableField("f_decimal", Schema.FieldType.DECIMAL)
.addNullableField("f_byte", Schema.FieldType.BYTE)
.addNullableField("f_iterable", Schema.FieldType.iterable(Schema.FieldType.INT64))
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
.addNullableField(
"f_micros_instant_array",
Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant())))
.build();

private static final Row WRITE_ROW =
Expand Down Expand Up @@ -107,6 +114,8 @@ public class MutationUtilsTest {
.withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE))
.withFieldValue("f_byte", Byte.parseByte("127"))
.withFieldValue("f_iterable", ImmutableList.of(2L, 3L))
.withFieldValue("f_micros_instant", TEST_INSTANT)
.withFieldValue("f_micros_instant_array", ImmutableList.of(TEST_INSTANT, TEST_INSTANT))
.build();

private static final Schema WRITE_ROW_SCHEMA_NULLS =
Expand All @@ -123,6 +132,10 @@ public class MutationUtilsTest {
.addNullableField("f_array", Schema.FieldType.array(Schema.FieldType.INT64))
.addNullableField(
"f_struct_array", Schema.FieldType.array(Schema.FieldType.row(INT64_SCHEMA)))
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
.addNullableField(
"f_micros_instant_array",
Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant())))
.build();

private static final Row WRITE_ROW_NULLS =
Expand All @@ -138,6 +151,8 @@ public class MutationUtilsTest {
.addValue(null)
.addValue(null)
.addValue(null)
.addValue(null)
.addValue(null)
.build();

private static final Schema KEY_SCHEMA =
Expand All @@ -153,6 +168,7 @@ public class MutationUtilsTest {
.addNullableField("f_int32", Schema.FieldType.INT32)
.addNullableField("f_decimal", Schema.FieldType.DECIMAL)
.addNullableField("f_byte", Schema.FieldType.BYTE)
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
.build();

private static final Row KEY_ROW =
Expand All @@ -168,6 +184,7 @@ public class MutationUtilsTest {
.withFieldValue("f_int32", 0x7fffffff)
.withFieldValue("f_decimal", BigDecimal.valueOf(Long.MIN_VALUE))
.withFieldValue("f_byte", Byte.parseByte("127"))
.withFieldValue("f_micros_instant", TEST_INSTANT)
.build();

private static final Schema KEY_SCHEMA_NULLS =
Expand All @@ -178,6 +195,7 @@ public class MutationUtilsTest {
.addNullableField("f_bytes", Schema.FieldType.BYTES)
.addNullableField("f_date_time", Schema.FieldType.DATETIME)
.addNullableField("f_bool", Schema.FieldType.BOOLEAN)
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
.build();

private static final Row KEY_ROW_NULLS =
Expand All @@ -188,6 +206,7 @@ public class MutationUtilsTest {
.addValue(null)
.addValue(null)
.addValue(null)
.addValue(null)
.build();

@Test
Expand Down Expand Up @@ -264,6 +283,7 @@ public void testCreateDeleteMutationFromRowWithNulls() {
}

private static Mutation createDeleteMutation() {
long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L + TEST_INSTANT.getNano() / 1_000L;
Key key =
Key.newBuilder()
.append(1L)
Expand All @@ -277,6 +297,7 @@ private static Mutation createDeleteMutation() {
.append(0x7fffffff)
.append(BigDecimal.valueOf(Long.MIN_VALUE))
.append(Byte.parseByte("127"))
.append(Timestamp.ofTimeMicroseconds(micros))
.build();
return Mutation.delete(TABLE, key);
}
Expand All @@ -290,12 +311,14 @@ private static Mutation createDeleteMutationNulls() {
.append((ByteArray) null)
.append((Timestamp) null)
.append((Boolean) null)
.append((Timestamp) null)
.build();
return Mutation.delete(TABLE, key);
}

private static Mutation createMutation(Mutation.Op operation) {
Mutation.WriteBuilder builder = chooseBuilder(operation);
long micros = TEST_INSTANT.getEpochSecond() * 1_000_000L + TEST_INSTANT.getNano() / 1_000L;
return builder
.set("f_int64")
.to(1L)
Expand Down Expand Up @@ -353,6 +376,12 @@ private static Mutation createMutation(Mutation.Op operation) {
.to(Byte.parseByte("127"))
.set("f_iterable")
.toInt64Array(ImmutableList.of(2L, 3L))
.set("f_micros_instant")
.to(Timestamp.ofTimeMicroseconds(micros))
.set("f_micros_instant_array")
.toTimestampArray(
ImmutableList.of(
Timestamp.ofTimeMicroseconds(micros), Timestamp.ofTimeMicroseconds(micros)))
.build();
}

Expand Down Expand Up @@ -381,6 +410,10 @@ private static Mutation createMutationNulls(Mutation.Op operation) {
.toInt64Array((List<Long>) null)
.set("f_struct_array")
.toStructArray(Type.struct(Type.StructField.of("int64", Type.int64())), null)
.set("f_micros_instant")
.to((Timestamp) null)
.set("f_micros_instant_array")
.toTimestampArray(null)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import com.google.spanner.v1.StructType;
import com.google.spanner.v1.TypeCode;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
Expand All @@ -45,6 +47,10 @@
public class StructUtilsTest {
private static final Schema EMPTY_SCHEMA = Schema.builder().build();
private static final Schema INT64_SCHEMA = Schema.builder().addInt64Field("int64").build();
private static final Timestamp TIMESTAMP = Timestamp.ofTimeMicroseconds(1234567890123456L);
private static final Instant INSTANT =
Instant.ofEpochSecond(
1234567890123456L / 1_000_000L, (1234567890123456L % 1_000_000L) * 1_000L);

@Test
public void testStructToBeamRow() {
Expand Down Expand Up @@ -286,6 +292,39 @@ public void testStructTypeToBeamRowSchemaFailsTypeNotSupported() {
"Error processing struct to row: Unsupported type 'STRUCT'.", exception.getMessage());
}

@Test
public void testStructToBeamRowWithMicrosInstant() {
Schema schema =
Schema.builder()
.addInt64Field("f_int64")
.addNullableField("f_micros_instant", Schema.FieldType.logicalType(new MicrosInstant()))
.addNullableField(
"f_micros_instant_array",
Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant())))
.build();

Struct struct =
Struct.newBuilder()
.set("f_int64")
.to(42L)
.set("f_micros_instant")
.to(TIMESTAMP)
.set("f_micros_instant_array")
.toTimestampArray(ImmutableList.of(TIMESTAMP, TIMESTAMP))
.build();

Row result = StructUtils.structToBeamRow(struct, schema);

assertEquals(42L, result.getInt64("f_int64").longValue());

assertEquals(INSTANT, result.getValue("f_micros_instant"));

@SuppressWarnings("unchecked")
List<Instant> instants = (List<Instant>) result.getValue("f_micros_instant_array");
assertEquals(2, instants.size());
assertEquals(INSTANT, instants.get(0));
}

private StructType.Field getFieldForTypeCode(String name, TypeCode typeCode) {
return StructType.Field.newBuilder()
.setName(name)
Expand Down
Loading
Loading