Skip to content

Conversation

@claudevdm
Copy link
Collaborator

@claudevdm claudevdm commented Nov 17, 2025

This adds support for the micros instant logical type in spanner IO reads and writes.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@claudevdm claudevdm force-pushed the spanner-microsinstant branch from 1803689 to 1844618 Compare November 18, 2025 02:17
@claudevdm claudevdm marked this pull request as ready for review November 18, 2025 14:54
@github-actions
Copy link
Contributor

Assigning reviewers:

R: @shunping for label python.
R: @Abacn for label java.
R: @nielm for label spanner.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@claudevdm
Copy link
Collaborator Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds support for microsecond-precision timestamps (MicrosInstant logical type) in SpannerIO. The changes correctly handle the conversion between java.time.Instant and Spanner's Timestamp for both writing and reading data. The integration tests have also been updated to cover this new functionality.

My review focuses on improving code maintainability by reducing duplication and enhancing the completeness of the integration tests. I've identified a few areas where helper methods could be introduced to centralize conversion logic, and a couple of test cases where assertions could be made more robust.

Comment on lines +160 to +161
long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L;
keyBuilder.append(Timestamp.ofTimeMicroseconds(micros));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to convert a java.time.Instant to a com.google.cloud.Timestamp with microsecond precision is duplicated in setBeamValueToKey, setBeamValueToMutation (lines 253-254), and addIterableToMutationBuilder (lines 384-387). To improve code clarity and maintainability, consider extracting this conversion into a private static helper method.

For example:

private static Timestamp toSpannerTimestamp(Instant instant) {
  long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L;
  return Timestamp.ofTimeMicroseconds(micros);
}

This new method could then be used in all three locations, simplifying the code.

Comment on lines +377 to +380
long micros =
spannerTimestamp.getSeconds() * 1_000_000L + spannerTimestamp.getNanos() / 1_000L;
return java.time.Instant.ofEpochSecond(
micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to convert a com.google.cloud.Timestamp to a java.time.Instant is duplicated here and in getStructArrayValue (lines 437-440). To avoid duplication and improve maintainability, this logic should be extracted into a private static helper method.

For example, you could add:

private static java.time.Instant fromSpannerTimestamp(Timestamp spannerTimestamp) {
  long micros = spannerTimestamp.getSeconds() * 1_000_000L + spannerTimestamp.getNanos() / 1_000L;
  return java.time.Instant.ofEpochSecond(micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
}

And then call it here:

return fromSpannerTimestamp(spannerTimestamp);

Comment on lines +191 to +197
results = self.spanner_helper.read_data(self.database_id, prefix='replace')
self.assertEqual(len(results), 2)
# In REPLACE, boolean should be NULL but timestamp should be updated
self.assertEqual(results[0][0], 'replace0')
self.assertEqual(results[0][1], 10)
self.assertIsNone(results[0][2]) # boolean replaced with NULL
self.assertIsNotNone(results[0][3]) # timestamp updated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertions in this test case only validate the first row of the results. Since two rows are written to Spanner, it would be more robust to assert the contents of both rows to ensure the SpannerReplace transform works as expected for all inputs.

    results = self.spanner_helper.read_data(self.database_id, prefix='replace')
    self.assertEqual(len(results), 2)
    # In REPLACE, boolean should be NULL but timestamp should be updated
    self.assertEqual(results[0][0], 'replace0')
    self.assertEqual(results[0][1], 10)
    self.assertIsNone(results[0][2])  # boolean replaced with NULL
    self.assertIsNotNone(results[0][3])  # timestamp updated

    self.assertEqual(results[1][0], 'replace1')
    self.assertEqual(results[1][1], 11)
    self.assertIsNone(results[1][2])  # boolean replaced with NULL
    self.assertIsNotNone(results[1][3])  # timestamp updated

Comment on lines +213 to +218
results = self.spanner_helper.read_data(self.database_id, 'update')
self.assertEqual(len(results), 2)
# In UPDATE, boolean preserved but timestamp updated
self.assertEqual(results[0][1], 10)
self.assertEqual(results[0][2], False) # boolean preserved
self.assertIsNotNone(results[0][3]) # timestamp updated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The assertions for this test are incomplete. They only check some values of the first result row and do not check the second row at all. To make the test more robust, the assertions should be expanded to cover all relevant fields for both rows.

    results = self.spanner_helper.read_data(self.database_id, 'update')
    self.assertEqual(len(results), 2)
    # In UPDATE, boolean preserved but timestamp updated
    self.assertEqual(results[0][0], 'update0')
    self.assertEqual(results[0][1], 10)
    self.assertEqual(results[0][2], False)  # boolean preserved
    self.assertIsNotNone(results[0][3])  # timestamp updated

    self.assertEqual(results[1][0], 'update1')
    self.assertEqual(results[1][1], 11)
    self.assertEqual(results[1][2], False)  # boolean preserved
    self.assertIsNotNone(results[1][3])  # timestamp updated

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after dedup conversion logics suggested by AI code review

@claudevdm claudevdm changed the title Support micros isntant in spannerio. Support beam:logical_type:micros_instant:v1 in SpannerIo. Nov 18, 2025
Copy link
Contributor

@nielm nielm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also implement the gemini code assist changes...

Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant())))
.build();

Timestamp ts = Timestamp.ofTimeMicroseconds(1234567890123456L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a constant for this and in the expectedInstant calculations below

self.spanner_helper.insert_values(
self.database_id, [('or_update0', 5, False), ('or_update1', 9, False)])
self.database_id,
[('or_update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a constant for this and the other locations it is used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants