feat!: store-defined concurrency limits#3547
feat!: store-defined concurrency limits#3547d-v-b wants to merge 26 commits intozarr-developers:mainfrom
Conversation
…v-b/zarr-python into feat/global-concurrency-limit
|
Nice, the other one that bugs me when profiling is zarr-python/src/zarr/codecs/zstd.py Line 79 in b3e9aed |
…at/global-concurrency-limit
|
@d-v-b do you have thoughts on how downstream libraries would interact with Zarr's concurrency limit? Would those set there own concurrency approaches (e.g., global or per function) or could there be a way to share the limit with Zarr? |
|
@maxrjones this PR is moving away from a global concurrency limit because it's actually very hard to implement a top-down concurrency control in our codebase. the new direction I'm taking with this PR is for each store to set its own concurrency limit. for stateless stores like local and remote (basically anything except zip), concurrency-sensitive applications should be able to cheaply create store instances with concurrency limits tuned to the needs of the application. |
…at/global-concurrency-limit
…at/global-concurrency-limit
|
this is ready for a look. The basic idea is that we make concurrency limits an implementation detail of a particular store and thereby remove all concurrency limiting logic from array / group routines. Some stores will have no concurrency limits (e.g., memory and local storage) and so their performance will go up. Stores that actually benefit from a concurrency limit (remote stuff) has the default limit of 10 but that's something we can play with. To cut down on boilerplate we (claude and I) define a |
maxrjones
left a comment
There was a problem hiding this comment.
Thanks @d-v-b! I like the per-store concurrency limits idea. My main concern is that external stores implemented under the global concurrency limit model could go from limited concurrency to unbounded concurrency. It seems like there's at least a few cases of external store implementations on GitHub - https://github.com/search?q=zarr.abc.store&type=code. One option for mitigation is to make the Store ABC provide a default concurrency limit via the ConcurrencyLimiter, which store implementations like MemoryStore could opt out of rather than opt in to.
How would this look? Our store ABC doesn't define concrete implementations for stuff like |
|
looks like removing the unnecessary concurrency limits for local and memory storage gets us a very nice performance improvement, according to the automated benchmarks. I think that escalates the priority of this PR. |
the best thing I can think of is a minor release that adds an annoying warning to |
|
|
||
|
|
||
| class ObjectStore(Store, Generic[T_Store]): | ||
| class ObjectStore(Store, ConcurrencyLimiter, Generic[T_Store]): |
There was a problem hiding this comment.
@kylebarron lmk if these changes make sense, or if there's a better way to do this.
There was a problem hiding this comment.
I think this is a good example of what I think could be provided by an obspec adapter, through composition instead of inheritance.
Some pseudocode
class ObspecConcurrencyLimiter:
client: GetRangeAsync & GetRangesAsync & ...
def get_range_async(self):
async with semaphore:
return self.client.get_range_async(...)But I'm still not sure how that would work with different stores that provide different methods
|
I think a common pattern is for concurrency-sensitive backends to impose their own concurrency limits already. For example, icechunk does this: https://icechunk.io/en/stable/performance/#concurrency. And local-file stores like tifffile don't need concurrency limits at all. So my current read is that this PR is actually very low risk and therefore safe to merge. |
|
ping @zarr-developers/python-core-devs, it would be good to get feedback on this PR. |
TomAugspurger
left a comment
There was a problem hiding this comment.
I didn't closely review the obstore changes, but overall I think this makes sense.
My only design question is whether the default of 50 should be in Python code, or looked up from the configuration.
| self._decode_partial_single, | ||
| config.get("async.concurrency"), | ||
| ) | ||
| # Store handles concurrency limiting internally |
There was a problem hiding this comment.
nit: these comments are helpful for review now but will look odd in a while :)
| concurrency_limit: int | None = 50, | ||
| ) -> None: | ||
| super().__init__(read_only=read_only) | ||
| Store.__init__(self, read_only=read_only) |
There was a problem hiding this comment.
This didn't need to change did it? Python's mro should ensure that super().__init__ finds Store first right?
|
|
||
|
|
||
| class FsspecStore(Store): | ||
| class FsspecStore(Store, ConcurrencyLimiter): |
There was a problem hiding this comment.
Complicating things: fsspec backends often have a parameter to control the max concurrency. In which case, the best thing might be to configure that to use the native backends concurrency limiting mechanism and set zarr-python's concurrency_limit=None.
There was a problem hiding this comment.
this would be my preference!
| read_only: bool = False, | ||
| path: str = "/", | ||
| allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, | ||
| concurrency_limit: int | None = 50, |
There was a problem hiding this comment.
This seems like a default that would be useful to set through the config.
| @functools.wraps(func) | ||
| async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T_co: | ||
| self = args[0] | ||
| async with self._limit(): # type: ignore[attr-defined] |
There was a problem hiding this comment.
Could we use a typing.Protocol to indicate that the first argument needs a _limit() method with the right type?
I guess that might be overkill...
There was a problem hiding this comment.
nah i'm down to add that
edit: this PR has now improved since its initial submission. Here are the key changes:
async.concurrencyis gone from the global config, using this setting triggers a warning.concurrent_mapis gone.asyncio.gatheris used instead, because concurrency limits are now the store's concern.ConcurrencyLimitermixin adds a semaphore-based concurrency limit via a decoratorLocalStoreandMemoryStorehave no concurrency limits. This results in a large performance improvement.FSSpecStoreandObjectStorehave a default concurrency of 50.