Skip to content

feat!: store-defined concurrency limits#3547

Open
d-v-b wants to merge 26 commits intozarr-developers:mainfrom
d-v-b:feat/global-concurrency-limit
Open

feat!: store-defined concurrency limits#3547
d-v-b wants to merge 26 commits intozarr-developers:mainfrom
d-v-b:feat/global-concurrency-limit

Conversation

@d-v-b
Copy link
Contributor

@d-v-b d-v-b commented Oct 24, 2025

edit: this PR has now improved since its initial submission. Here are the key changes:

  • async.concurrency is gone from the global config, using this setting triggers a warning.
  • concurrency is managed by store instances
  • concurrent_map is gone. asyncio.gather is used instead, because concurrency limits are now the store's concern.
  • a ConcurrencyLimiter mixin adds a semaphore-based concurrency limit via a decorator
  • Non-remote stores like LocalStore and MemoryStore have no concurrency limits. This results in a large performance improvement.
  • FSSpecStore and ObjectStore have a default concurrency of 50.

@github-actions github-actions bot added the needs release notes Automatically applied to PRs which haven't added release notes label Oct 24, 2025
@github-actions github-actions bot removed the needs release notes Automatically applied to PRs which haven't added release notes label Oct 27, 2025
@dcherian
Copy link
Contributor

Nice, the other one that bugs me when profiling is

return await asyncio.to_thread(

@maxrjones
Copy link
Member

@d-v-b do you have thoughts on how downstream libraries would interact with Zarr's concurrency limit? Would those set there own concurrency approaches (e.g., global or per function) or could there be a way to share the limit with Zarr?

@d-v-b
Copy link
Contributor Author

d-v-b commented Dec 22, 2025

@maxrjones this PR is moving away from a global concurrency limit because it's actually very hard to implement a top-down concurrency control in our codebase. the new direction I'm taking with this PR is for each store to set its own concurrency limit. for stateless stores like local and remote (basically anything except zip), concurrency-sensitive applications should be able to cheaply create store instances with concurrency limits tuned to the needs of the application.

@d-v-b d-v-b changed the title feat/global concurrency limit feat!: store-defined concurrency limits Feb 20, 2026
@d-v-b
Copy link
Contributor Author

d-v-b commented Feb 20, 2026

this is ready for a look. The basic idea is that we make concurrency limits an implementation detail of a particular store and thereby remove all concurrency limiting logic from array / group routines. Some stores will have no concurrency limits (e.g., memory and local storage) and so their performance will go up. Stores that actually benefit from a concurrency limit (remote stuff) has the default limit of 10 but that's something we can play with.

To cut down on boilerplate we (claude and I) define a ConcurrencyLimiter base class that encapsulates the concurrency limiting logic. This means taking a concurrency limit (int | none) and creating a semaphore (or a nullcontext if there's no limit). We also define a decorator that an be used to wrap a should-be-concurrency-limited method on a class that inherits from the ConcurrencyLimiter base class.

@d-v-b d-v-b requested a review from jhamman February 20, 2026 12:44
@d-v-b d-v-b requested review from dcherian and maxrjones February 20, 2026 12:45
Copy link
Member

@maxrjones maxrjones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @d-v-b! I like the per-store concurrency limits idea. My main concern is that external stores implemented under the global concurrency limit model could go from limited concurrency to unbounded concurrency. It seems like there's at least a few cases of external store implementations on GitHub - https://github.com/search?q=zarr.abc.store&type=code. One option for mitigation is to make the Store ABC provide a default concurrency limit via the ConcurrencyLimiter, which store implementations like MemoryStore could opt out of rather than opt in to.

@d-v-b
Copy link
Contributor Author

d-v-b commented Mar 10, 2026

One option for mitigation is to make the Store ABC provide a default concurrency limit via the ConcurrencyLimiter, which store implementations like MemoryStore could opt out of rather than opt in to.

How would this look? Our store ABC doesn't define concrete implementations for stuff like get or set -- every store defines these anew. So I don't know how we could put a default concurrency limit in at the ABC level.

@d-v-b
Copy link
Contributor Author

d-v-b commented Mar 10, 2026

also I think a large number of downstream packages import Store for type-checking, not because they implement their own stores. That being said, we would definitely want folks who have implemented their own stores (e.g., @cgohlke, @paraseba) to have a look at these changes and vet them.

@d-v-b d-v-b requested a review from a team March 13, 2026 08:38
@d-v-b d-v-b added the benchmark Code will be benchmarked in a CI job. label Mar 13, 2026
@codspeed-hq
Copy link

codspeed-hq bot commented Mar 13, 2026

Merging this PR will improve performance by 29.08%

⚡ 48 improved benchmarks
✅ 18 untouched benchmarks
⏩ 6 skipped benchmarks1

Performance Changes

Mode Benchmark BASE HEAD Efficiency
WallTime test_write_array[memory-Layout(shape=(1000000,), chunks=(1000,), shards=None)-None] 556.8 ms 463.9 ms +20.02%
WallTime test_write_array[memory-Layout(shape=(1000000,), chunks=(1000,), shards=None)-gzip] 1,031.3 ms 806.3 ms +27.91%
WallTime test_write_array[memory-Layout(shape=(1000000,), chunks=(1000,), shards=(1000,))-None] 1.6 s 1.4 s +16.31%
WallTime test_write_array[memory-Layout(shape=(1000000,), chunks=(100,), shards=(1000000,))-None] 5.4 s 4.4 s +20.8%
WallTime test_write_array[local-Layout(shape=(1000000,), chunks=(1000,), shards=None)-None] 1,209 ms 942.4 ms +28.3%
WallTime test_slice_indexing[(50, 50, 50)-(slice(None, None, None), slice(None, None, None), slice(None, None, None))-memory] 419.6 ms 331.3 ms +26.65%
WallTime test_write_array[local-Layout(shape=(1000000,), chunks=(1000,), shards=(1000,))-None] 2.9 s 2.3 s +24.28%
WallTime test_write_array[memory-Layout(shape=(1000000,), chunks=(100,), shards=(1000000,))-gzip] 9.5 s 8.2 s +16.99%
WallTime test_slice_indexing[(50, 50, 50)-(slice(None, 10, None), slice(None, 10, None), slice(None, 10, None))-memory] 1.8 ms 1.6 ms +10.74%
WallTime test_write_array[local-Layout(shape=(1000000,), chunks=(1000,), shards=None)-gzip] 1.6 s 1.3 s +28.86%
WallTime test_slice_indexing[(50, 50, 50)-(slice(None, None, None), slice(None, None, None), slice(None, None, None))-memory_get_latency] 436.2 ms 345.1 ms +26.4%
WallTime test_write_array[local-Layout(shape=(1000000,), chunks=(1000,), shards=(1000,))-gzip] 3.3 s 2.7 s +22.71%
WallTime test_slice_indexing[(50, 50, 50)-(slice(0, None, 4), slice(0, None, 4), slice(0, None, 4))-memory] 414.6 ms 327.7 ms +26.53%
WallTime test_write_array[local-Layout(shape=(1000000,), chunks=(100,), shards=(1000000,))-gzip] 9.6 s 8.1 s +17.82%
WallTime test_slice_indexing[(50, 50, 50)-(slice(0, None, 4), slice(0, None, 4), slice(0, None, 4))-memory_get_latency] 431.3 ms 341.6 ms +26.24%
WallTime test_slice_indexing[(50, 50, 50)-(slice(None, None, None), slice(0, 3, 2), slice(0, 10, None))-memory] 6.4 ms 5.5 ms +16.05%
WallTime test_write_array[local-Layout(shape=(1000000,), chunks=(100,), shards=(1000000,))-None] 5.4 s 4.5 s +20.29%
WallTime test_sharded_morton_indexing[(16, 16, 16)-memory] 148.5 ms 117.1 ms +26.8%
WallTime test_write_array[memory-Layout(shape=(1000000,), chunks=(1000,), shards=(1000,))-gzip] 2.2 s 1.7 s +26.28%
WallTime test_read_array[memory-Layout(shape=(1000000,), chunks=(1000,), shards=None)-gzip] 585.4 ms 492.4 ms +18.89%
... ... ... ... ... ...

ℹ️ Only the first 20 benchmarks are displayed. Go to the app to view all benchmarks.


Comparing d-v-b:feat/global-concurrency-limit (60192ee) with main (a02d996)

Open in CodSpeed

Footnotes

  1. 6 benchmarks were skipped, so the baseline results were used instead. If they were deleted from the codebase, click here and archive them to remove them from the performance reports.

@d-v-b
Copy link
Contributor Author

d-v-b commented Mar 13, 2026

looks like removing the unnecessary concurrency limits for local and memory storage gets us a very nice performance improvement, according to the automated benchmarks. I think that escalates the priority of this PR.

@d-v-b
Copy link
Contributor Author

d-v-b commented Mar 13, 2026

My main concern is that external stores implemented under the global concurrency limit model could go from limited concurrency to unbounded concurrency.

the best thing I can think of is a minor release that adds an annoying warning to Store.__init_subclass__ like "In the next minor release of Zarr Python, concurrency limiting functionality must be defined on store classes. See <docs page> for information about migrating your Store subclass." I'm open to other ideas though!

@d-v-b d-v-b removed the benchmark Code will be benchmarked in a CI job. label Mar 13, 2026


class ObjectStore(Store, Generic[T_Store]):
class ObjectStore(Store, ConcurrencyLimiter, Generic[T_Store]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kylebarron lmk if these changes make sense, or if there's a better way to do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good example of what I think could be provided by an obspec adapter, through composition instead of inheritance.

Some pseudocode

class ObspecConcurrencyLimiter:
    client: GetRangeAsync & GetRangesAsync & ...

    def get_range_async(self):
        async with semaphore:
            return self.client.get_range_async(...)

But I'm still not sure how that would work with different stores that provide different methods

@d-v-b
Copy link
Contributor Author

d-v-b commented Mar 13, 2026

I think a common pattern is for concurrency-sensitive backends to impose their own concurrency limits already. For example, icechunk does this: https://icechunk.io/en/stable/performance/#concurrency. And local-file stores like tifffile don't need concurrency limits at all.

So my current read is that this PR is actually very low risk and therefore safe to merge.

@d-v-b
Copy link
Contributor Author

d-v-b commented Mar 25, 2026

ping @zarr-developers/python-core-devs, it would be good to get feedback on this PR.

Copy link
Contributor

@TomAugspurger TomAugspurger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't closely review the obstore changes, but overall I think this makes sense.

My only design question is whether the default of 50 should be in Python code, or looked up from the configuration.

self._decode_partial_single,
config.get("async.concurrency"),
)
# Store handles concurrency limiting internally
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these comments are helpful for review now but will look odd in a while :)

concurrency_limit: int | None = 50,
) -> None:
super().__init__(read_only=read_only)
Store.__init__(self, read_only=read_only)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This didn't need to change did it? Python's mro should ensure that super().__init__ finds Store first right?



class FsspecStore(Store):
class FsspecStore(Store, ConcurrencyLimiter):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complicating things: fsspec backends often have a parameter to control the max concurrency. In which case, the best thing might be to configure that to use the native backends concurrency limiting mechanism and set zarr-python's concurrency_limit=None.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be my preference!

read_only: bool = False,
path: str = "/",
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
concurrency_limit: int | None = 50,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a default that would be useful to set through the config.

@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T_co:
self = args[0]
async with self._limit(): # type: ignore[attr-defined]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a typing.Protocol to indicate that the first argument needs a _limit() method with the right type?

I guess that might be overkill...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah i'm down to add that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants