diff --git a/Cargo.toml b/Cargo.toml index 6c4cd9e1..9098f75e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ crate-type = ["cdylib", "rlib"] [dependencies] pyo3 = { version = "0.27.1", features = ["abi3-py311"] } -zarrs = { version = "0.23.6", features = ["async", "zlib", "pcodec", "bz2"] } +zarrs = { version = "0.23.7", features = ["async", "zlib", "pcodec", "bz2"] } rayon_iter_concurrent_limit = "0.2.0" rayon = "1.10.0" # fix for https://stackoverflow.com/questions/76593417/package-openssl-was-not-found-in-the-pkg-config-search-path diff --git a/README.md b/README.md index f06add74..d5bf99e6 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ The `ZarrsCodecPipeline` specific options are: - Defaults to `False`. - `codec_pipeline.strict`: raise exceptions for unsupported operations instead of falling back to the default codec pipeline of `zarr-python`. - Defaults to `False`. +- `codec_pipeline.subchunk_write_order`: Tells `zarrs` in what order to write subchunks within a shard. One of "C" or "random." "C" ordering is `numpy`-speak for [row-major](https://en.wikipedia.org/wiki/Row-_and_column-major_order). "Random" is a bit of a misnomer and implies no ordering in reality, instead being unordered as a result of `rayon`'s lack of ordering guarantee. + - Defaults to `random`. For example: ```python @@ -63,8 +65,9 @@ zarr.config.set({ "chunk_concurrent_maximum": None, "chunk_concurrent_minimum": 4, "direct_io": False, - "strict": False - } + "strict": False, + "subchunk_write_order": "C", + }, }) ``` diff --git a/python/zarrs/_internal.pyi b/python/zarrs/_internal.pyi index 4af635a7..1abbcd5e 100644 --- a/python/zarrs/_internal.pyi +++ b/python/zarrs/_internal.pyi @@ -30,6 +30,7 @@ class CodecPipelineImpl: chunk_concurrent_maximum: builtins.int | None = None, num_threads: builtins.int | None = None, direct_io: builtins.bool = False, + subchunk_write_order: typing.Literal["C", "random"] = "random", ) -> CodecPipelineImpl: ... def retrieve_chunks_and_apply_index( self, diff --git a/python/zarrs/pipeline.py b/python/zarrs/pipeline.py index d033820a..9359074d 100644 --- a/python/zarrs/pipeline.py +++ b/python/zarrs/pipeline.py @@ -62,6 +62,9 @@ def get_codec_pipeline_impl( ), num_threads=config.get("threading.max_workers", None), direct_io=config.get("codec_pipeline.direct_io", False), + subchunk_write_order=config.get( + "codec_pipeline.subchunk_write_order", "random" + ), ) except TypeError as e: if strict: diff --git a/src/lib.rs b/src/lib.rs index 7cc1a0f7..09febfde 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,10 +18,12 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon_iter_concurrent_limit::iter_concurrent_limit; use unsafe_cell_slice::UnsafeCellSlice; use utils::is_whole_chunk; +use zarrs::array::codec::{ShardingCodecOptions, SubchunkWriteOrder}; use zarrs::array::{ ArrayBytes, ArrayBytesDecodeIntoTarget, ArrayBytesFixedDisjointView, ArrayMetadata, - ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecChain, CodecOptions, DataType, - FillValue, StoragePartialDecoder, copy_fill_value_into, update_array_bytes, + ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecChain, CodecOptions, + CodecSpecificOptions, DataType, FillValue, StoragePartialDecoder, copy_fill_value_into, + update_array_bytes, }; use zarrs::config::global_config; use zarrs::convert::array_metadata_v2_to_v3; @@ -38,7 +40,7 @@ mod utils; use crate::concurrency::ChunkConcurrentLimitAndCodecOptions; use crate::store::StoreConfig; -use crate::utils::{PyCodecErrExt, PyErrExt as _}; +use crate::utils::{PyCodecErrExt, PyErrExt as _, SubchunkWriteOrderWrapper}; // TODO: Use a OnceLock for store with get_or_try_init when stabilised? #[gen_stub_pyclass] @@ -209,6 +211,7 @@ impl CodecPipelineImpl { #[gen_stub_pymethods] #[pymethods] impl CodecPipelineImpl { + #[allow(clippy::too_many_arguments)] // python functions can have defaults #[pyo3(signature = ( array_metadata, store_config, @@ -218,6 +221,7 @@ impl CodecPipelineImpl { chunk_concurrent_maximum=None, num_threads=None, direct_io=false, + subchunk_write_order=SubchunkWriteOrderWrapper(SubchunkWriteOrder::Random), ))] #[new] fn new( @@ -228,6 +232,7 @@ impl CodecPipelineImpl { chunk_concurrent_maximum: Option, num_threads: Option, direct_io: bool, + subchunk_write_order: SubchunkWriteOrderWrapper, ) -> PyResult { store_config.direct_io(direct_io); let metadata = serde_json::from_str(array_metadata).map_py_err::()?; @@ -237,8 +242,16 @@ impl CodecPipelineImpl { } ArrayMetadata::V3(v3) => Cow::Borrowed(v3), }; - let codec_chain = - Arc::new(CodecChain::from_metadata(&metadata_v3.codecs).map_py_err::()?); + let codec_chain = Arc::new( + CodecChain::from_metadata(&metadata_v3.codecs) + .map_py_err::()? + .with_codec_specific_options( + &CodecSpecificOptions::default().with_option( + ShardingCodecOptions::default() + .with_subchunk_write_order(subchunk_write_order.0), + ), + ), + ); let codec_options = CodecOptions::default().with_validate_checksums(validate_checksums); let chunk_concurrent_minimum = @@ -254,17 +267,15 @@ impl CodecPipelineImpl { DataType::from_metadata(&metadata_v3.data_type).map_py_err::()?; let fill_value = data_type .fill_value(&metadata_v3.fill_value, ZarrVersion::V3) - .or_else(|_| { - Err(match &metadata { - ArrayMetadata::V2(metadata) => format!( - "incompatible fill value metadata: dtype={}, fill_value={}", - metadata.dtype, metadata.fill_value - ), - ArrayMetadata::V3(metadata) => format!( - "incompatible fill value metadata: data_type={}, fill_value={}", - metadata.data_type, metadata.fill_value - ), - }) + .map_err(|_| match &metadata { + ArrayMetadata::V2(metadata) => format!( + "incompatible fill value metadata: dtype={}, fill_value={}", + metadata.dtype, metadata.fill_value + ), + ArrayMetadata::V3(metadata) => format!( + "incompatible fill value metadata: data_type={}, fill_value={}", + metadata.data_type, metadata.fill_value + ), }) .map_py_err::()?; diff --git a/src/store.rs b/src/store.rs index 132b436b..86f53081 100644 --- a/src/store.rs +++ b/src/store.rs @@ -72,7 +72,7 @@ impl<'py> FromPyObject<'_, 'py> for StoreConfig { } impl StoreConfig { - pub fn direct_io(&mut self, flag: bool) -> () { + pub fn direct_io(&mut self, flag: bool) { match self { StoreConfig::Filesystem(config) => config.direct_io(flag), StoreConfig::Http(_config) => (), diff --git a/src/store/filesystem.rs b/src/store/filesystem.rs index 39b53d71..6ff1415e 100644 --- a/src/store/filesystem.rs +++ b/src/store/filesystem.rs @@ -22,7 +22,7 @@ impl FilesystemStoreConfig { } } - pub fn direct_io(&mut self, flag: bool) -> () { + pub fn direct_io(&mut self, flag: bool) { self.opts.direct_io(flag); } } diff --git a/src/tests.rs b/src/tests.rs index 9f1f8aa7..8afc932d 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -10,8 +10,8 @@ use crate::CodecPipelineImpl; #[test] fn test_nparray_to_unsafe_cell_slice_empty() -> PyResult<()> { - pyo3::prepare_freethreaded_python(); - Python::with_gil(|py| { + Python::initialize(); + Python::attach(|py| { let arr: Bound<'_, PyUntypedArray> = PyModule::from_code( py, c_str!( diff --git a/src/utils.rs b/src/utils.rs index 3ededf06..ed60eeb0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,10 @@ use std::fmt::Display; -use pyo3::{PyErr, PyResult, PyTypeInfo}; -use zarrs::array::CodecError; +use pyo3::{ + Borrowed, Bound, FromPyObject, IntoPyObject, PyAny, PyErr, PyResult, PyTypeInfo, Python, + exceptions::PyValueError, types::PyString, +}; +use zarrs::array::{CodecError, codec::SubchunkWriteOrder}; use crate::ChunkItem; @@ -41,3 +44,42 @@ pub fn is_whole_chunk(item: &ChunkItem) -> bool { item.chunk_subset.start().iter().all(|&o| o == 0) && item.chunk_subset.shape() == bytemuck::must_cast_slice::<_, u64>(&item.shape) } + +#[derive(Debug, Clone, Copy)] +pub struct SubchunkWriteOrderWrapper(pub SubchunkWriteOrder); + +impl<'py> IntoPyObject<'py> for SubchunkWriteOrderWrapper { + type Target = PyString; + type Output = Bound<'py, PyString>; + type Error = PyErr; + + fn into_pyobject(self, py: Python<'py>) -> Result { + match self.0 { + SubchunkWriteOrder::C => Ok("C".into_pyobject(py)?), + SubchunkWriteOrder::Random => Ok("random".into_pyobject(py)?), + _ => Err(PyValueError::new_err( + "Unrecognized subchunk write order for converting to python object, only `C` and `random` allowed.", + )), + } + } +} + +impl<'py> FromPyObject<'_, 'py> for SubchunkWriteOrderWrapper { + type Error = PyErr; + + fn extract(option: Borrowed<'_, 'py, PyAny>) -> PyResult { + match option.extract::<&str>()? { + "C" => Ok(SubchunkWriteOrderWrapper(SubchunkWriteOrder::C)), + "random" => Ok(SubchunkWriteOrderWrapper(SubchunkWriteOrder::Random)), + _ => Err(PyValueError::new_err( + "Unrecognized subchunk write order while extracting to rust, only `C` and `random` allowed.", + )), + } + } +} + +impl pyo3_stub_gen::PyStubType for SubchunkWriteOrderWrapper { + fn type_output() -> pyo3_stub_gen::TypeInfo { + pyo3_stub_gen::TypeInfo::with_module("typing.Literal['C', 'random']", "typing".into()) + } +} diff --git a/tests/test_sharding.py b/tests/test_sharding.py index 599b5e92..5dfd87bc 100644 --- a/tests/test_sharding.py +++ b/tests/test_sharding.py @@ -1,8 +1,9 @@ -from typing import Any +from typing import Any, Literal import numpy as np import numpy.typing as npt import pytest +import zarr from zarr import Array, AsyncArray from zarr.abc.store import Store from zarr.codecs import ( @@ -367,3 +368,36 @@ async def test_sharding_with_empty_inner_chunk( print("read data") data_read = await a.getitem(...) assert np.array_equal(data_read, data) + + +@pytest.mark.parametrize( + "index_location", [ShardingCodecIndexLocation.start, ShardingCodecIndexLocation.end] +) +@pytest.mark.parametrize("subchunk_write_order", ["C", "random"]) +async def test_sharding_subchunk_write_order( + store: Store, + index_location: ShardingCodecIndexLocation, + subchunk_write_order: Literal["C", "random"], +) -> None: + with zarr.config.set({"codec_pipeline.subchunk_write_order": subchunk_write_order}): + path = f"sharding_with_empty_inner_chunk_{index_location}" + spath = StorePath(store, path) + codec = ShardingCodec(chunk_shape=(2, 2), index_location=index_location) + a = await AsyncArray.create( + spath, + shape=(16, 16), + chunk_shape=(16, 16), + dtype="uint32", + fill_value=0, + codecs=[codec], + ) + await a.setitem(..., np.arange(16 * 16).reshape((16, 16))) + index = await codec._load_shard_index(a.store_path / "/c/0/0", (8, 8)) + index_offsets = index.offsets_and_lengths[index.get_full_chunk_map()].ravel()[ + ::2 + ] + assert len(index_offsets) == 64 # 8 * 8 + if subchunk_write_order == "C": + np.testing.assert_equal(np.sort(index_offsets), index_offsets) + else: + assert not np.array_equal(np.sort(index_offsets), index_offsets)