-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Support beam:logical_type:micros_instant:v1 in SpannerIo. #36840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
1803689 to
1844618
Compare
|
Assigning reviewers: R: @shunping for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request adds support for microsecond-precision timestamps (MicrosInstant logical type) in SpannerIO. The changes correctly handle the conversion between java.time.Instant and Spanner's Timestamp for both writing and reading data. The integration tests have also been updated to cover this new functionality.
My review focuses on improving code maintainability by reducing duplication and enhancing the completeness of the integration tests. I've identified a few areas where helper methods could be introduced to centralize conversion logic, and a couple of test cases where assertions could be made more robust.
| long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L; | ||
| keyBuilder.append(Timestamp.ofTimeMicroseconds(micros)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic to convert a java.time.Instant to a com.google.cloud.Timestamp with microsecond precision is duplicated in setBeamValueToKey, setBeamValueToMutation (lines 253-254), and addIterableToMutationBuilder (lines 384-387). To improve code clarity and maintainability, consider extracting this conversion into a private static helper method.
For example:
private static Timestamp toSpannerTimestamp(Instant instant) {
long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L;
return Timestamp.ofTimeMicroseconds(micros);
}This new method could then be used in all three locations, simplifying the code.
| long micros = | ||
| spannerTimestamp.getSeconds() * 1_000_000L + spannerTimestamp.getNanos() / 1_000L; | ||
| return java.time.Instant.ofEpochSecond( | ||
| micros / 1_000_000L, (micros % 1_000_000L) * 1_000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic to convert a com.google.cloud.Timestamp to a java.time.Instant is duplicated here and in getStructArrayValue (lines 437-440). To avoid duplication and improve maintainability, this logic should be extracted into a private static helper method.
For example, you could add:
private static java.time.Instant fromSpannerTimestamp(Timestamp spannerTimestamp) {
long micros = spannerTimestamp.getSeconds() * 1_000_000L + spannerTimestamp.getNanos() / 1_000L;
return java.time.Instant.ofEpochSecond(micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
}And then call it here:
return fromSpannerTimestamp(spannerTimestamp);| results = self.spanner_helper.read_data(self.database_id, prefix='replace') | ||
| self.assertEqual(len(results), 2) | ||
| # In REPLACE, boolean should be NULL but timestamp should be updated | ||
| self.assertEqual(results[0][0], 'replace0') | ||
| self.assertEqual(results[0][1], 10) | ||
| self.assertIsNone(results[0][2]) # boolean replaced with NULL | ||
| self.assertIsNotNone(results[0][3]) # timestamp updated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertions in this test case only validate the first row of the results. Since two rows are written to Spanner, it would be more robust to assert the contents of both rows to ensure the SpannerReplace transform works as expected for all inputs.
results = self.spanner_helper.read_data(self.database_id, prefix='replace')
self.assertEqual(len(results), 2)
# In REPLACE, boolean should be NULL but timestamp should be updated
self.assertEqual(results[0][0], 'replace0')
self.assertEqual(results[0][1], 10)
self.assertIsNone(results[0][2]) # boolean replaced with NULL
self.assertIsNotNone(results[0][3]) # timestamp updated
self.assertEqual(results[1][0], 'replace1')
self.assertEqual(results[1][1], 11)
self.assertIsNone(results[1][2]) # boolean replaced with NULL
self.assertIsNotNone(results[1][3]) # timestamp updated| results = self.spanner_helper.read_data(self.database_id, 'update') | ||
| self.assertEqual(len(results), 2) | ||
| # In UPDATE, boolean preserved but timestamp updated | ||
| self.assertEqual(results[0][1], 10) | ||
| self.assertEqual(results[0][2], False) # boolean preserved | ||
| self.assertIsNotNone(results[0][3]) # timestamp updated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertions for this test are incomplete. They only check some values of the first result row and do not check the second row at all. To make the test more robust, the assertions should be expanded to cover all relevant fields for both rows.
results = self.spanner_helper.read_data(self.database_id, 'update')
self.assertEqual(len(results), 2)
# In UPDATE, boolean preserved but timestamp updated
self.assertEqual(results[0][0], 'update0')
self.assertEqual(results[0][1], 10)
self.assertEqual(results[0][2], False) # boolean preserved
self.assertIsNotNone(results[0][3]) # timestamp updated
self.assertEqual(results[1][0], 'update1')
self.assertEqual(results[1][1], 11)
self.assertEqual(results[1][2], False) # boolean preserved
self.assertIsNotNone(results[1][3]) # timestamp updated
Abacn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after dedup conversion logics suggested by AI code review
nielm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also implement the gemini code assist changes...
| Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant()))) | ||
| .build(); | ||
|
|
||
| Timestamp ts = Timestamp.ofTimeMicroseconds(1234567890123456L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a constant for this and in the expectedInstant calculations below
| self.spanner_helper.insert_values( | ||
| self.database_id, [('or_update0', 5, False), ('or_update1', 9, False)]) | ||
| self.database_id, | ||
| [('or_update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use a constant for this and the other locations it is used.
This adds support for the micros instant logical type in spanner IO reads and writes.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.