Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ v2026.03.0 (unreleased)
New Features
~~~~~~~~~~~~

- Adds a new option ``chunks="preserve"`` when opening a dataset. This option
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this should just be "auto". Are we really working around a dask bug?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about doing this work in dask, but I ended up deciding that this is a sufficiently different goal from dask auto. The goal of dask auto is to guarantee that the chunksize will be under a configurable limit while preserving the aspect ratio of previous_chunks. We don't really want either of those things.

But maybe you are just saying: this is what xarray should mean by "auto" in which case I definitely agree. I'm just not sure how to make the transition from the old version of "auto" to the new version. Maybe it would be easier to give it a new name ("preserve") and then change the default value in kwargs from chunks="auto" to chunks="preserve" at some point. If we just change what "auto" means then there is no way for people to get the dask auto behavior.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this again today and I think maybe we can get around the backwards incompatibility by adding a new option that does allows you to specify whether you want to use dask-style or preserve-style. I'll do an experiment with changing the behavior of "auto" itself and see how much it breaks stuff.

guarantees that chunks in xarray match on-disk chunks or multiples of them.
No chunk splitting allowed. (:pull:`11060`).
By `Julia Signell <https://github.com/jsignell>`_

Breaking Changes
~~~~~~~~~~~~~~~~
Expand Down
71 changes: 71 additions & 0 deletions properties/test_parallelcompat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import numpy as np
import pytest

pytest.importorskip("hypothesis")
# isort: split

from hypothesis import given

import xarray.testing.strategies as xrst
from xarray.namedarray.parallelcompat import ChunkManagerEntrypoint


class TestPreserveChunks:
@given(xrst.shape_and_chunks())
def test_preserve_all_chunks(
self, shape_and_chunks: tuple[tuple[int, ...], tuple[int, ...]]
) -> None:
shape, previous_chunks = shape_and_chunks
typesize = 8
target = 1024 * 1024

actual = ChunkManagerEntrypoint.preserve_chunks(
chunks=("preserve",) * len(shape),
shape=shape,
target=target,
typesize=typesize,
previous_chunks=previous_chunks,
)
for i, chunk in enumerate(actual):
if chunk != shape[i]:
assert chunk >= previous_chunks[i]
assert chunk % previous_chunks[i] == 0
assert chunk <= shape[i]

if actual != shape:
assert np.prod(actual) * typesize >= 0.5 * target

@pytest.mark.parametrize("first_chunk", [-1, (), 1])
@given(xrst.shape_and_chunks(min_dims=2))
def test_preserve_some_chunks(
self,
first_chunk: int | tuple[int, ...],
shape_and_chunks: tuple[tuple[int, ...], tuple[int, ...]],
) -> None:
shape, previous_chunks = shape_and_chunks
typesize = 4
target = 2 * 1024 * 1024

actual = ChunkManagerEntrypoint.preserve_chunks(
chunks=(first_chunk, *["preserve" for _ in range(len(shape) - 1)]),
shape=shape,
target=target,
typesize=typesize,
previous_chunks=previous_chunks,
)
for i, chunk in enumerate(actual):
if i == 0:
if first_chunk == 1:
assert chunk == 1
elif first_chunk == -1:
assert chunk == shape[i]
elif first_chunk == ():
assert chunk == previous_chunks[i]
elif chunk != shape[i]:
assert chunk >= previous_chunks[i]
assert chunk % previous_chunks[i] == 0
assert chunk <= shape[i]

# if we have more than one chunk, make sure the chunks are big enough
if actual[1:] != shape[1:]:
assert np.prod(actual) * typesize >= 0.5 * target
26 changes: 19 additions & 7 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ def _dataset_from_backend_dataset(
create_default_indexes,
**extra_tokens,
):
if not isinstance(chunks, int | dict) and chunks not in {None, "auto"}:
if not isinstance(chunks, int | dict) and chunks not in {None, "auto", "preserve"}:
raise ValueError(
f"chunks must be an int, dict, 'auto', or None. Instead found {chunks}."
f"chunks must be an int, dict, 'auto', 'preserve', or None. Instead found {chunks}."
)

_protect_dataset_variables_inplace(backend_ds, cache)
Expand Down Expand Up @@ -430,11 +430,14 @@ def open_dataset(
"netcdf4" over "h5netcdf" over "scipy" (customizable via
``netcdf_engine_order`` in ``xarray.set_options()``). A custom backend
class (a subclass of ``BackendEntrypoint``) can also be used.
chunks : int, dict, 'auto' or None, default: None
chunks : int, dict, 'auto', 'preserve' or None, default: None
If provided, used to load the data into dask arrays.

- ``chunks="auto"`` will use dask ``auto`` chunking taking into account the
engine preferred chunks.
- ``chunks="preserve"`` will use a chunking scheme that never splits encoded
chunks. If encoded chunks are small then "preserve" takes multiples of them
over the largest dimension.
- ``chunks=None`` skips using dask. This uses xarray's internally private
:ref:`lazy indexing classes <internal design.lazy indexing>`,
but data is eagerly loaded into memory as numpy arrays when accessed.
Expand Down Expand Up @@ -674,11 +677,14 @@ def open_dataarray(
"netcdf4" over "h5netcdf" over "scipy" (customizable via
``netcdf_engine_order`` in ``xarray.set_options()``). A custom backend
class (a subclass of ``BackendEntrypoint``) can also be used.
chunks : int, dict, 'auto' or None, default: None
chunks : int, dict, 'auto', 'preserve', or None, default: None
If provided, used to load the data into dask arrays.

- ``chunks='auto'`` will use dask ``auto`` chunking taking into account the
engine preferred chunks.
- ``chunks="preserve"`` will use a chunking scheme that never splits encoded
chunks. If encoded chunks are small then "preserve" takes multiples of them
over the largest dimension.
- ``chunks=None`` skips using dask. This uses xarray's internally private
:ref:`lazy indexing classes <internal design.lazy indexing>`,
but data is eagerly loaded into memory as numpy arrays when accessed.
Expand Down Expand Up @@ -900,11 +906,14 @@ def open_datatree(
"h5netcdf" over "netcdf4" (customizable via ``netcdf_engine_order`` in
``xarray.set_options()``). A custom backend class (a subclass of
``BackendEntrypoint``) can also be used.
chunks : int, dict, 'auto' or None, default: None
chunks : int, dict, 'auto', preserve, or None, default: None
If provided, used to load the data into dask arrays.

- ``chunks="auto"`` will use dask ``auto`` chunking taking into account the
engine preferred chunks.
- ``chunks="preserve"`` will use a chunking scheme that never splits encoded
chunks. If encoded chunks are small then "preserve" takes multiples of them
over the largest dimension.
- ``chunks=None`` skips using dask. This uses xarray's internally private
:ref:`lazy indexing classes <internal design.lazy indexing>`,
but data is eagerly loaded into memory as numpy arrays when accessed.
Expand Down Expand Up @@ -1146,11 +1155,14 @@ def open_groups(
``xarray.set_options()``). A custom backend class (a subclass of
``BackendEntrypoint``) can also be used.
can also be used.
chunks : int, dict, 'auto' or None, default: None
chunks : int, dict, 'auto', 'preserve', or None, default: None
If provided, used to load the data into dask arrays.

- ``chunks="auto"`` will use dask ``auto`` chunking taking into account the
engine preferred chunks.
- ``chunks="preserve"`` will use a chunking scheme that never splits encoded
chunks. If encoded chunks are small then "preserve" takes multiples of them
over the largest dimension.
- ``chunks=None`` skips using dask. This uses xarray's internally private
:ref:`lazy indexing classes <internal design.lazy indexing>`,
but data is eagerly loaded into memory as numpy arrays when accessed.
Expand Down Expand Up @@ -1418,7 +1430,7 @@ def open_mfdataset(
concatenation along more than one dimension is desired, then ``paths`` must be a
nested list-of-lists (see ``combine_nested`` for details). (A string glob will
be expanded to a 1-dimensional list.)
chunks : int, dict, 'auto' or None, optional
chunks : int, dict, 'auto', 'preserve', or None, optional
Dictionary with keys given by dimension names and values given by chunk sizes.
In general, these should divide the dimensions of each dataset. If int, chunk
each dimension by ``chunks``. By default, chunks will be chosen to match the
Expand Down
5 changes: 4 additions & 1 deletion xarray/backends/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1450,12 +1450,15 @@ def open_zarr(
Array synchronizer provided to zarr
group : str, optional
Group path. (a.k.a. `path` in zarr terminology.)
chunks : int, dict, "auto" or None, optional
chunks : int, dict, "auto", "preserve", or None, optional
Used to load the data into dask arrays. Default behavior is to use
``chunks={}`` if dask is available, otherwise ``chunks=None``.

- ``chunks='auto'`` will use dask ``auto`` chunking taking into account the
engine preferred chunks.
- ``chunks="preserve"`` will use a chunking scheme that never splits encoded
chunks. If encoded chunks are small then "preserve" takes multiples of them
over the largest dimension.
- ``chunks=None`` skips using dask. This uses xarray's internally private
:ref:`lazy indexing classes <internal design.lazy indexing>`,
but data is eagerly loaded into memory as numpy arrays when accessed.
Expand Down
2 changes: 1 addition & 1 deletion xarray/namedarray/_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def dtype(self) -> _DType_co: ...
_NormalizedChunks = tuple[tuple[int, ...], ...]
# FYI in some cases we don't allow `None`, which this doesn't take account of.
# # FYI the `str` is for a size string, e.g. "16MB", supported by dask.
T_ChunkDim: TypeAlias = str | int | Literal["auto"] | tuple[int, ...] | None # noqa: PYI051
T_ChunkDim: TypeAlias = str | int | Literal["auto", "preserve"] | tuple[int, ...] | None # noqa: PYI051
# We allow the tuple form of this (though arguably we could transition to named dims only)
T_Chunks: TypeAlias = T_ChunkDim | Mapping[Any, T_ChunkDim]

Expand Down
5 changes: 3 additions & 2 deletions xarray/namedarray/daskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

if TYPE_CHECKING:
from xarray.namedarray._typing import (
T_ChunkDim,
T_Chunks,
_DType_co,
_NormalizedChunks,
Expand Down Expand Up @@ -45,11 +46,11 @@ def chunks(self, data: Any) -> _NormalizedChunks:

def normalize_chunks(
self,
chunks: T_Chunks | _NormalizedChunks,
chunks: tuple[T_ChunkDim, ...] | _NormalizedChunks,
shape: tuple[int, ...] | None = None,
limit: int | None = None,
dtype: _DType_co | None = None,
previous_chunks: _NormalizedChunks | None = None,
previous_chunks: tuple[int, ...] | _NormalizedChunks | None = None,
) -> Any:
"""Called by open_dataset"""
from dask.array.core import normalize_chunks
Expand Down
118 changes: 118 additions & 0 deletions xarray/namedarray/parallelcompat.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

if TYPE_CHECKING:
from xarray.namedarray._typing import (
T_ChunkDim,
T_Chunks,
_Chunks,
_DType,
Expand Down Expand Up @@ -784,3 +785,120 @@ def get_auto_chunk_size(
raise NotImplementedError(
"For 'auto' rechunking of cftime arrays, get_auto_chunk_size must be implemented by the chunk manager"
)

@staticmethod
def preserve_chunks(
chunks: tuple[T_ChunkDim, ...],
shape: tuple[int, ...],
target: int,
typesize: int,
previous_chunks: tuple[int, ...] | _NormalizedChunks,
) -> tuple[T_ChunkDim, ...]:
"""Quickly determine optimal chunks close to target size but never splitting
previous_chunks.

This takes in a chunks argument potentially containing ``"preserve"`` for several
dimensions. This function replaces ``"preserve"`` with concrete dimension sizes that
try to get chunks to be close to certain size in bytes, provided by the ``target=``
keyword. Any dimensions marked as ``"preserve"`` will potentially be multiplied
by some factor to get close to the byte target, while never splitting
``previous_chunks``. If chunks are non-uniform along a particular dimension
then that dimension will always use exactly ``previous_chunks``.

Examples
--------
>>> ChunkManagerEntrypoint.preserve_chunks(
... chunks=("preserve", "preserve", "preserve"),
... shape=(1280, 1280, 20),
... target=500 * 1024,
... typesize=8,
... previous_chunks=(128, 128, 1),
... )
(128, 128, 2)

>>> ChunkManagerEntrypoint.preserve_chunks(
... chunks=("preserve", "preserve", 1),
... shape=(1280, 1280, 20),
... target=1 * 1024 * 1024,
... typesize=8,
... previous_chunks=(128, 128, 1),
... )
(128, 1024, 1)

>>> ChunkManagerEntrypoint.preserve_chunks(
... chunks=("preserve", "preserve", 1),
... shape=(1280, 1280, 20),
... target=1 * 1024 * 1024,
... typesize=8,
... previous_chunks=((128,) * 10, (128, 256, 256, 512), (1,) * 20),
... )
(256, (128, 256, 256, 512), 1)

Parameters
----------
chunks: tuple[int | str | tuple[int], ...]
A tuple of either dimensions or tuples of explicit chunk dimensions
Some entries should be "preserve".
shape: tuple[int]
The shape of the array
target: int
The target size of the chunk in bytes.
typesize: int
The size, in bytes, of each element of the chunk.
previous_chunks: tuple[int | tuple[int], ...]
Size of chunks being preserved. Expressed as a tuple of ints or tuple
of tuple of ints.
"""
new_chunks = [*previous_chunks]
auto_dims = [c == "preserve" for c in chunks]
max_chunks = np.array(shape)
for i, previous_chunk in enumerate(previous_chunks):
chunk = chunks[i]
if chunk == -1:
# -1 means whole dim is in one chunk
new_chunks[i] = shape[i]
else:
if isinstance(previous_chunk, tuple):
# For uniform chunks just take the first item
if previous_chunk[1:-1] == previous_chunk[:-2]:
new_chunks[i] = previous_chunk[0]
previous_chunk = previous_chunk[0]
# For non-uniform chunks, leave them alone
else:
auto_dims[i] = False
max_chunks[i] = max(previous_chunk)

if isinstance(previous_chunk, int):
# preserve, None or () means we want to track previous chunk
if chunk == "preserve" or not chunk:
max_chunks[i] = previous_chunk
# otherwise use the explicitly provided chunk
else:
new_chunks[i] = chunk
max_chunks[i] = chunk if isinstance(chunk, int) else max(chunk)

if not any(auto_dims):
return chunks

while True:
# Repeatedly look for the last dim with more than one chunk and multiply it by 2.
# Stop when:
# 1a. we are larger than the target chunk size OR
# 1b. we are within 50% of the target chunk size OR
# 2. the chunk covers the entire array

num_chunks = np.array(shape) / max_chunks * auto_dims
chunk_bytes = np.prod(max_chunks) * typesize

if chunk_bytes > target or abs(chunk_bytes - target) / target < 0.5:
break

if (num_chunks <= 1).all():
break

idx = int(np.nonzero(num_chunks > 1)[0][-1])

new_chunks[idx] = min(new_chunks[idx] * 2, shape[idx])
max_chunks[idx] = new_chunks[idx]

return tuple(new_chunks)
16 changes: 15 additions & 1 deletion xarray/namedarray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def _get_chunk( # type: ignore[no-untyped-def]
preferred_chunk_shape = tuple(
itertools.starmap(preferred_chunks.get, zip(dims, shape, strict=True))
)
if isinstance(chunks, Number) or (chunks == "auto"):
if isinstance(chunks, (Number, str)):
chunks = dict.fromkeys(dims, chunks)
chunk_shape = tuple(
chunks.get(dim, None) or preferred_chunk_sizes
Expand All @@ -236,6 +236,20 @@ def _get_chunk( # type: ignore[no-untyped-def]
limit = None
dtype = data.dtype

if any(c == "preserve" for c in chunk_shape) and any(
c == "auto" for c in chunk_shape
):
raise ValueError('chunks cannot use a combination of "auto" and "preserve"')

if shape and preferred_chunk_shape and any(c == "preserve" for c in chunk_shape):
chunk_shape = chunkmanager.preserve_chunks(
chunk_shape,
shape=shape,
target=chunkmanager.get_auto_chunk_size(),
typesize=getattr(dtype, "itemsize", 8),
previous_chunks=preferred_chunk_shape,
)

chunk_shape = chunkmanager.normalize_chunks(
chunk_shape,
shape=shape,
Expand Down
Loading
Loading