From 5ef4b1baf640b0f6a7b468475e22bab5333d24c5 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sat, 21 Mar 2026 11:22:59 +0100 Subject: [PATCH 1/9] feat: subchunk write order --- Cargo.toml | 2 +- README.md | 5 ++++- python/zarrs/_internal.pyi | 1 + python/zarrs/pipeline.py | 3 +++ src/lib.rs | 22 ++++++++++++++----- src/utils.rs | 44 ++++++++++++++++++++++++++++++++++++-- 6 files changed, 68 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6c4cd9e1..9098f75e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] pyo3 = { version = "0.27.1", features = ["abi3-py311"] } -zarrs = { version = "0.23.6", features = ["async", "zlib", "pcodec", "bz2"] } +zarrs = { version = "0.23.7", features = ["async", "zlib", "pcodec", "bz2"] } rayon_iter_concurrent_limit = "0.2.0" rayon = "1.10.0" # fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path diff --git a/README.md b/README.md index f06add74..708c5a92 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ The `ZarrsCodecPipeline` specific options are: - Defaults to `False`. - `codec_pipeline.strict`: raise exceptions for unsupported operations instead of falling back to the default codec pipeline of `zarr-python`. - Defaults to `False`. +- `codec_pipeline.subchunk_write_order`: Tells `zarrs` in what order to write subchunks within a shard. One of "C" or "random." + - Defaults to `random`. For example: ```python @@ -63,7 +65,8 @@ zarr.config.set({ "chunk_concurrent_maximum": None, "chunk_concurrent_minimum": 4, "direct_io": False, - "strict": False + "strict": False, + "subchunk_write_order": "C" } }) ``` diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 4af635a7..687d6095 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -30,6 +30,7 @@ class CodecPipelineImpl: chunk_concurrent_maximum: builtins.int | None = None, num_threads: builtins.int | None = None, direct_io: builtins.bool = False, + subchunk_write_order: builtins.str = "random", ) -> CodecPipelineImpl: ... def retrieve_chunks_and_apply_index( self, diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index d033820a..9359074d 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -62,6 +62,9 @@ def get_codec_pipeline_impl( ), num_threads=config.get("threading.max_workers", None), direct_io=config.get("codec_pipeline.direct_io", False), + subchunk_write_order=config.get( + "codec_pipeline.subchunk_write_order", "random" + ), ) except TypeError as e: if strict: diff --git a/src/lib.rs b/src/lib.rs index 7cc1a0f7..9b9310a1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,10 +18,12 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; use unsafe_cell_slice::UnsafeCellSlice; use utils::is_whole_chunk; +use zarrs::array::codec::{ShardingCodecOptions, SubchunkWriteOrder}; use zarrs::array::{ ArrayBytes, ArrayBytesDecodeIntoTarget, ArrayBytesFixedDisjointView, ArrayMetadata, - ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecChain, CodecOptions, DataType, - FillValue, StoragePartialDecoder, copy_fill_value_into, update_array_bytes, + ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecChain, CodecOptions, + CodecSpecificOptions, DataType, FillValue, StoragePartialDecoder, copy_fill_value_into, + update_array_bytes, }; use zarrs::config::global_config; use zarrs::convert::array_metadata_v2_to_v3; @@ -38,7 +40,7 @@ mod utils; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::store::StoreConfig; -use crate::utils::{PyCodecErrExt, PyErrExt as _}; +use crate::utils::{PyCodecErrExt, PyErrExt as _, SubchunkWriteOrderWrapper}; // TODO: Use a OnceLock for store with get_or_try_init when stabilised? #[gen_stub_pyclass] @@ -218,6 +220,7 @@ impl CodecPipelineImpl { chunk_concurrent_maximum=None, num_threads=None, direct_io=false, + subchunk_write_order=SubchunkWriteOrderWrapper(SubchunkWriteOrder::Random), ))] #[new] fn new( @@ -228,6 +231,7 @@ impl CodecPipelineImpl { chunk_concurrent_maximum: Option, num_threads: Option, direct_io: bool, + subchunk_write_order: SubchunkWriteOrderWrapper, ) -> PyResult { store_config.direct_io(direct_io); let metadata = serde_json::from_str(array_metadata).map_py_err::()?; @@ -237,8 +241,16 @@ impl CodecPipelineImpl { } ArrayMetadata::V3(v3) => Cow::Borrowed(v3), }; - let codec_chain = - Arc::new(CodecChain::from_metadata(&metadata_v3.codecs).map_py_err::()?); + let codec_chain = Arc::new( + CodecChain::from_metadata(&metadata_v3.codecs) + .map_py_err::()? + .with_codec_specific_options( + &CodecSpecificOptions::default().with_option( + ShardingCodecOptions::default() + .with_subchunk_write_order(subchunk_write_order.0), + ), + ), + ); let codec_options = CodecOptions::default().with_validate_checksums(validate_checksums); let chunk_concurrent_minimum = diff --git a/src/utils.rs b/src/utils.rs index 3ededf06..214ee62c 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,10 @@ use std::fmt::Display; -use pyo3::{PyErr, PyResult, PyTypeInfo}; -use zarrs::array::CodecError; +use pyo3::{ + Borrowed, Bound, FromPyObject, IntoPyObject, PyAny, PyErr, PyResult, PyTypeInfo, Python, + exceptions::PyValueError, types::PyString, +}; +use zarrs::array::{CodecError, codec::SubchunkWriteOrder}; use crate::ChunkItem; @@ -41,3 +44,40 @@ pub fn is_whole_chunk(item: &ChunkItem) -> bool { item.chunk_subset.start().iter().all(|&o| o == 0) && item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(&item.shape) } + +#[derive(Debug, Clone)] +pub struct SubchunkWriteOrderWrapper(pub SubchunkWriteOrder); + +impl<'py> IntoPyObject<'py> for SubchunkWriteOrderWrapper { + type Target = PyString; + type Output = Bound<'py, PyString>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + match self.0 { + SubchunkWriteOrder::C => Ok("C".into_pyobject(py)?), + SubchunkWriteOrder::Random => Ok("random".into_pyobject(py)?), + _ => unreachable!("Unrecognized subchunk write order, only `C` and `random` allowed."), + } + } +} + +impl<'py> FromPyObject<'_, 'py> for SubchunkWriteOrderWrapper { + type Error = PyErr; + + fn extract(option: Borrowed<'_, 'py, PyAny>) -> PyResult { + match option.extract::<&str>()? { + "C" => Ok(SubchunkWriteOrderWrapper(SubchunkWriteOrder::C)), + "random" => Ok(SubchunkWriteOrderWrapper(SubchunkWriteOrder::Random)), + _ => Err(PyValueError::new_err( + "Unrecognized subchunk write order, only `C` and `random` allowed.", + )), + } + } +} + +impl pyo3_stub_gen::PyStubType for SubchunkWriteOrderWrapper { + fn type_output() -> pyo3_stub_gen::TypeInfo { + pyo3_stub_gen::TypeInfo::builtin("str") + } +} From 746f39ca58b1ffe3cb7a49f98ad3952329b6f88d Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sun, 22 Mar 2026 15:15:10 +0100 Subject: [PATCH 2/9] chore: test --- tests/test_sharding.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/tests/test_sharding.py b/tests/test_sharding.py index 599b5e92..5dfd87bc 100644 --- a/tests/test_sharding.py +++ b/tests/test_sharding.py @@ -1,8 +1,9 @@ -from typing import Any +from typing import Any, Literal import numpy as np import numpy.typing as npt import pytest +import zarr from zarr import Array, AsyncArray from zarr.abc.store import Store from zarr.codecs import ( @@ -367,3 +368,36 @@ async def test_sharding_with_empty_inner_chunk( print("read data") data_read = await a.getitem(...) assert np.array_equal(data_read, data) + + +@pytest.mark.parametrize( + "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] +) +@pytest.mark.parametrize("subchunk_write_order", ["C", "random"]) +async def test_sharding_subchunk_write_order( + store: Store, + index_location: ShardingCodecIndexLocation, + subchunk_write_order: Literal["C", "random"], +) -> None: + with zarr.config.set({"codec_pipeline.subchunk_write_order": subchunk_write_order}): + path = f"sharding_with_empty_inner_chunk_{index_location}" + spath = StorePath(store, path) + codec = ShardingCodec(chunk_shape=(2, 2), index_location=index_location) + a = await AsyncArray.create( + spath, + shape=(16, 16), + chunk_shape=(16, 16), + dtype="uint32", + fill_value=0, + codecs=[codec], + ) + await a.setitem(..., np.arange(16 * 16).reshape((16, 16))) + index = await codec._load_shard_index(a.store_path / "/c/0/0", (8, 8)) + index_offsets = index.offsets_and_lengths[index.get_full_chunk_map()].ravel()[ + ::2 + ] + assert len(index_offsets) == 64 # 8 * 8 + if subchunk_write_order == "C": + np.testing.assert_equal(np.sort(index_offsets), index_offsets) + else: + assert not np.array_equal(np.sort(index_offsets), index_offsets) From 2a59fc5788eee58b04c2a793ffdf1ab356df496b Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sun, 22 Mar 2026 16:08:33 +0100 Subject: [PATCH 3/9] chore: deprecated calls --- src/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/tests.rs b/src/tests.rs index 9f1f8aa7..8afc932d 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -10,8 +10,8 @@ use crate::CodecPipelineImpl; #[test] fn test_nparray_to_unsafe_cell_slice_empty() -> PyResult<()> { - pyo3::prepare_freethreaded_python(); - Python::with_gil(|py| { + Python::initialize(); + Python::attach(|py| { let arr: Bound<'_, PyUntypedArray> = PyModule::from_code( py, c_str!( From 28cce28ed1cfb519baf7f1f42f920384c53154c8 Mon Sep 17 00:00:00 2001 From: ilan-gold Date: Sun, 22 Mar 2026 16:30:55 +0100 Subject: [PATCH 4/9] fix: not unreachable --- src/utils.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/utils.rs b/src/utils.rs index 214ee62c..3891ea22 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -57,7 +57,9 @@ impl<'py> IntoPyObject<'py> for SubchunkWriteOrderWrapper { match self.0 { SubchunkWriteOrder::C => Ok("C".into_pyobject(py)?), SubchunkWriteOrder::Random => Ok("random".into_pyobject(py)?), - _ => unreachable!("Unrecognized subchunk write order, only `C` and `random` allowed."), + _ => Err(PyValueError::new_err( + "Unrecognized subchunk write order for converting to python object, only `C` and `random` allowed.", + )), } } } @@ -70,7 +72,7 @@ impl<'py> FromPyObject<'_, 'py> for SubchunkWriteOrderWrapper { "C" => Ok(SubchunkWriteOrderWrapper(SubchunkWriteOrder::C)), "random" => Ok(SubchunkWriteOrderWrapper(SubchunkWriteOrder::Random)), _ => Err(PyValueError::new_err( - "Unrecognized subchunk write order, only `C` and `random` allowed.", + "Unrecognized subchunk write order while extracting to rust, only `C` and `random` allowed.", )), } } From 037db6e7e1b7f58378776bcce89f17660db6c625 Mon Sep 17 00:00:00 2001 From: Ilan Gold Date: Mon, 23 Mar 2026 12:00:41 +0100 Subject: [PATCH 5/9] Update README.md Co-authored-by: Philipp A. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 708c5a92..9d96a661 100644 --- a/README.md +++ b/README.md @@ -66,8 +66,8 @@ zarr.config.set({ "chunk_concurrent_minimum": 4, "direct_io": False, "strict": False, - "subchunk_write_order": "C" - } + "subchunk_write_order": "C", +}, }) ``` From 1338e496d4a0b74bf6572f0753347e54185c9ddd Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Mon, 23 Mar 2026 12:04:11 +0100 Subject: [PATCH 6/9] fmt --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9d96a661..102debf8 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ zarr.config.set({ "direct_io": False, "strict": False, "subchunk_write_order": "C", -}, + }, }) ``` From 46506a598850c79ff4f942cda98b40a9b045f446 Mon Sep 17 00:00:00 2001 From: Ilan Gold Date: Mon, 23 Mar 2026 12:07:21 +0100 Subject: [PATCH 7/9] chore: explanation in readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 102debf8..d5bf99e6 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ The `ZarrsCodecPipeline` specific options are: - Defaults to `False`. - `codec_pipeline.strict`: raise exceptions for unsupported operations instead of falling back to the default codec pipeline of `zarr-python`. - Defaults to `False`. -- `codec_pipeline.subchunk_write_order`: Tells `zarrs` in what order to write subchunks within a shard. One of "C" or "random." +- `codec_pipeline.subchunk_write_order`: Tells `zarrs` in what order to write subchunks within a shard. One of "C" or "random." "C" ordering is `numpy`-speak for [row-major](https://en.wikipedia.org/wiki/Row-_and_column-major_order). "Random" is a bit of a misnomer and implies no ordering in reality, instead being unordered as a result of `rayon`'s lack of ordering guarantee. - Defaults to `random`. For example: From 0ff786781d8b264dea4eb85bac61b8a4a078c14f Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Mon, 23 Mar 2026 12:09:19 +0100 Subject: [PATCH 8/9] clippy --- src/lib.rs | 21 ++++++++++----------- src/store.rs | 2 +- src/store/filesystem.rs | 2 +- src/utils.rs | 2 +- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9b9310a1..09febfde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -211,6 +211,7 @@ impl CodecPipelineImpl { #[gen_stub_pymethods] #[pymethods] impl CodecPipelineImpl { + #[allow(clippy::too_many_arguments)] // python functions can have defaults #[pyo3(signature = ( array_metadata, store_config, @@ -266,17 +267,15 @@ impl CodecPipelineImpl { DataType::from_metadata(&metadata_v3.data_type).map_py_err::()?; let fill_value = data_type .fill_value(&metadata_v3.fill_value, ZarrVersion::V3) - .or_else(|_| { - Err(match &metadata { - ArrayMetadata::V2(metadata) => format!( - "incompatible fill value metadata: dtype={}, fill_value={}", - metadata.dtype, metadata.fill_value - ), - ArrayMetadata::V3(metadata) => format!( - "incompatible fill value metadata: data_type={}, fill_value={}", - metadata.data_type, metadata.fill_value - ), - }) + .map_err(|_| match &metadata { + ArrayMetadata::V2(metadata) => format!( + "incompatible fill value metadata: dtype={}, fill_value={}", + metadata.dtype, metadata.fill_value + ), + ArrayMetadata::V3(metadata) => format!( + "incompatible fill value metadata: data_type={}, fill_value={}", + metadata.data_type, metadata.fill_value + ), }) .map_py_err::()?; diff --git a/src/store.rs b/src/store.rs index 132b436b..86f53081 100644 --- a/src/store.rs +++ b/src/store.rs @@ -72,7 +72,7 @@ impl<'py> FromPyObject<'_, 'py> for StoreConfig { } impl StoreConfig { - pub fn direct_io(&mut self, flag: bool) -> () { + pub fn direct_io(&mut self, flag: bool) { match self { StoreConfig::Filesystem(config) => config.direct_io(flag), StoreConfig::Http(_config) => (), diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs index 39b53d71..6ff1415e 100644 --- a/src/store/filesystem.rs +++ b/src/store/filesystem.rs @@ -22,7 +22,7 @@ impl FilesystemStoreConfig { } } - pub fn direct_io(&mut self, flag: bool) -> () { + pub fn direct_io(&mut self, flag: bool) { self.opts.direct_io(flag); } } diff --git a/src/utils.rs b/src/utils.rs index 3891ea22..56fa45c0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -45,7 +45,7 @@ pub fn is_whole_chunk(item: &ChunkItem) -> bool { && item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(&item.shape) } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct SubchunkWriteOrderWrapper(pub SubchunkWriteOrder); impl<'py> IntoPyObject<'py> for SubchunkWriteOrderWrapper { From 4dad7bb83e9d4140d0e4b0a6ac46806fde5332ef Mon Sep 17 00:00:00 2001 From: Phil Schaf Date: Mon, 23 Mar 2026 13:16:21 +0100 Subject: [PATCH 9/9] fix type --- python/zarrs/_internal.pyi | 2 +- src/utils.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 687d6095..1abbcd5e 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -30,7 +30,7 @@ class CodecPipelineImpl: chunk_concurrent_maximum: builtins.int | None = None, num_threads: builtins.int | None = None, direct_io: builtins.bool = False, - subchunk_write_order: builtins.str = "random", + subchunk_write_order: typing.Literal["C", "random"] = "random", ) -> CodecPipelineImpl: ... def retrieve_chunks_and_apply_index( self, diff --git a/src/utils.rs b/src/utils.rs index 56fa45c0..ed60eeb0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -80,6 +80,6 @@ impl<'py> FromPyObject<'_, 'py> for SubchunkWriteOrderWrapper { impl pyo3_stub_gen::PyStubType for SubchunkWriteOrderWrapper { fn type_output() -> pyo3_stub_gen::TypeInfo { - pyo3_stub_gen::TypeInfo::builtin("str") + pyo3_stub_gen::TypeInfo::with_module("typing.Literal['C', 'random']", "typing".into()) } }