-
Notifications
You must be signed in to change notification settings - Fork 153
💥 Payload limit configuration and validation #1288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
04a1c6d
9ca018d
fb7f857
98b1eb2
861269b
526f595
dc38edd
a9aa5ce
7f621ef
58081db
22de18b
dd7bcc2
76ff2a9
2efbb2c
3036a8a
4af084b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1207,6 +1207,27 @@ def __init__(self) -> None: | |
| super().__init__(encode_common_attributes=True) | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class PayloadLimitsConfig: | ||
| """Configuration for when payload sizes exceed limits.""" | ||
|
|
||
| memo_size_warning: int = 2 * 1024 | ||
| """The limit (in bytes) at which a memo size warning is logged.""" | ||
|
|
||
| payload_size_warning: int = 512 * 1024 | ||
| """The limit (in bytes) at which a payload size warning is logged.""" | ||
|
|
||
|
|
||
| class PayloadSizeWarning(RuntimeWarning): | ||
| """The size of payloads is above the warning limit.""" | ||
|
|
||
|
|
||
| @dataclass | ||
| class _PayloadErrorLimits: | ||
| memo_size_error: int | ||
| payload_size_error: int | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class DataConverter(WithSerializationContext): | ||
| """Data converter for converting and encoding payloads to/from Python values. | ||
|
|
@@ -1230,9 +1251,16 @@ class DataConverter(WithSerializationContext): | |
| failure_converter: FailureConverter = dataclasses.field(init=False) | ||
| """Failure converter created from the :py:attr:`failure_converter_class`.""" | ||
|
|
||
| payload_limits: PayloadLimitsConfig = PayloadLimitsConfig() | ||
| """Settings for payload size limits.""" | ||
|
|
||
| default: ClassVar[DataConverter] | ||
| """Singleton default data converter.""" | ||
|
|
||
| _memo_size_error_limit: int = 0 | ||
|
|
||
| _payload_size_error_limit: int = 0 | ||
|
|
||
| def __post_init__(self) -> None: # noqa: D105 | ||
| object.__setattr__(self, "payload_converter", self.payload_converter_class()) | ||
| object.__setattr__(self, "failure_converter", self.failure_converter_class()) | ||
|
|
@@ -1334,6 +1362,13 @@ def with_context(self, context: SerializationContext) -> Self: | |
| object.__setattr__(cloned, "failure_converter", failure_converter) | ||
| return cloned | ||
|
|
||
| def _with_payload_error_limits(self, options: _PayloadErrorLimits) -> DataConverter: | ||
| return dataclasses.replace( | ||
| self, | ||
| _memo_size_error_limit=options.memo_size_error, | ||
| _payload_size_error_limit=options.payload_size_error, | ||
| ) | ||
|
|
||
| async def _decode_memo( | ||
| self, | ||
| source: temporalio.api.common.v1.Memo, | ||
|
|
@@ -1367,35 +1402,43 @@ async def _encode_memo( | |
| async def _encode_memo_existing( | ||
| self, source: Mapping[str, Any], memo: temporalio.api.common.v1.Memo | ||
| ): | ||
| payloads = [] | ||
| for k, v in source.items(): | ||
| payload = v | ||
| if not isinstance(v, temporalio.api.common.v1.Payload): | ||
| payload = (await self.encode([v]))[0] | ||
| memo.fields[k].CopyFrom(payload) | ||
| payloads.append(payload) | ||
| # Memos have their field payloads validated all together in one unit | ||
| DataConverter._validate_limits( | ||
| payloads, | ||
| self._memo_size_error_limit, | ||
| "[TMPRL1103] Attempted to upload memo with size that exceeded the error limit.", | ||
| self.payload_limits.memo_size_warning, | ||
| "[TMPRL1103] Attempted to upload memo with size that exceeded the warning limit.", | ||
| ) | ||
|
|
||
| async def _encode_payload( | ||
| self, payload: temporalio.api.common.v1.Payload | ||
| ) -> temporalio.api.common.v1.Payload: | ||
| if self.payload_codec: | ||
| payload = (await self.payload_codec.encode([payload]))[0] | ||
| self._validate_payload_limits([payload]) | ||
| return payload | ||
|
|
||
| async def _encode_payloads(self, payloads: temporalio.api.common.v1.Payloads): | ||
| if self.payload_codec: | ||
| await self.payload_codec.encode_wrapper(payloads) | ||
| self._validate_payload_limits(payloads.payloads) | ||
|
|
||
| async def _encode_payload_sequence( | ||
| self, payloads: Sequence[temporalio.api.common.v1.Payload] | ||
| ) -> list[temporalio.api.common.v1.Payload]: | ||
| if not self.payload_codec: | ||
| return list(payloads) | ||
| return await self.payload_codec.encode(payloads) | ||
|
|
||
| # Temporary shortcircuit detection while the _encode_* methods may no-op if | ||
| # a payload codec is not configured. Remove once those paths have more to them. | ||
| @property | ||
| def _encode_payload_has_effect(self) -> bool: | ||
| return self.payload_codec is not None | ||
| encoded_payloads = list(payloads) | ||
| if self.payload_codec: | ||
| encoded_payloads = await self.payload_codec.encode(encoded_payloads) | ||
| self._validate_payload_limits(encoded_payloads) | ||
| return encoded_payloads | ||
|
|
||
| async def _decode_payload( | ||
| self, payload: temporalio.api.common.v1.Payload | ||
|
|
@@ -1452,6 +1495,40 @@ async def _apply_to_failure_payloads( | |
| if failure.HasField("cause"): | ||
| await DataConverter._apply_to_failure_payloads(failure.cause, cb) | ||
|
|
||
| def _validate_payload_limits( | ||
| self, | ||
| payloads: Sequence[temporalio.api.common.v1.Payload], | ||
| ): | ||
| DataConverter._validate_limits( | ||
| payloads, | ||
| self._payload_size_error_limit, | ||
| "[TMPRL1103] Attempted to upload payloads with size that exceeded the error limit.", | ||
| self.payload_limits.payload_size_warning, | ||
| "[TMPRL1103] Attempted to upload payloads with size that exceeded the warning limit.", | ||
| ) | ||
|
|
||
| @staticmethod | ||
| def _validate_limits( | ||
| payloads: Sequence[temporalio.api.common.v1.Payload], | ||
| error_limit: int, | ||
| error_message: str, | ||
| warning_limit: int, | ||
| warning_message: str, | ||
| ): | ||
| total_size = sum(payload.ByteSize() for payload in payloads) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically if the server was comparing sizes of the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably in the totality of the Payloads object, the size different is nearly negligible, but there are other fields on Payloads that would contribute to the overall size. If contributing non-zero, that means the SDK will pass on potentially too large payloads and the server would reject them, which is the behavior today. It may be possible to update the visitor in Python to consider allowing the visitor functions to optionally take a Payloads. The visitor functions could have a default implementation that breaks it up into a payload sequence like it does today, but then for this feature it actually provides the override behavior.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Are there? I am not aware of any, or do you mean in the future? The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was looking at https://github.com/temporalio/api-go/blob/c246540cf2eda8f4f8aa9e46ef33d87c48632a27/common/v1/message.pb.go#L81, which is what the server gets the size of e.g. https://github.com/temporalio/temporal/blob/8aeb4bda8dbae17648fb32205cf0b9af62fd07e5/service/frontend/workflow_handler.go#L1236. Maybe
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, that |
||
|
|
||
| if error_limit > 0 and total_size > error_limit: | ||
| raise temporalio.exceptions.PayloadSizeError( | ||
jmaeagle99 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| f"{error_message} Size: {total_size} bytes, Limit: {error_limit} bytes" | ||
| ) | ||
|
|
||
| if warning_limit > 0 and total_size > warning_limit: | ||
| # TODO: Use a context aware logger to log extra information about workflow/activity/etc | ||
jmaeagle99 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| warnings.warn( | ||
jmaeagle99 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| f"{warning_message} Size: {total_size} bytes, Limit: {warning_limit} bytes", | ||
| PayloadSizeWarning, | ||
| ) | ||
|
|
||
|
|
||
| DefaultPayloadConverter.default_encoding_payload_converters = ( | ||
| BinaryNullPayloadConverter(), | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.