diff --git a/src/tracksdata/array/_graph_array.py b/src/tracksdata/array/_graph_array.py index 018e7ae6..80418986 100644 --- a/src/tracksdata/array/_graph_array.py +++ b/src/tracksdata/array/_graph_array.py @@ -23,7 +23,7 @@ def _validate_shape( """Helper function to validate the shape argument.""" if shape is None: try: - shape = graph.metadata()["shape"] + shape = graph.metadata["shape"] except KeyError as e: raise KeyError( f"`shape` is required to `{func_name}`. " diff --git a/src/tracksdata/functional/_test/test_napari.py b/src/tracksdata/functional/_test/test_napari.py index 9b4a81dc..712cf53d 100644 --- a/src/tracksdata/functional/_test/test_napari.py +++ b/src/tracksdata/functional/_test/test_napari.py @@ -31,7 +31,7 @@ def test_napari_conversion(metadata_shape: bool) -> None: shape = (2, 10, 22, 32) if metadata_shape: - graph.update_metadata(shape=shape) + graph.metadata.update(shape=shape) arg_shape = None else: arg_shape = shape diff --git a/src/tracksdata/graph/__init__.py b/src/tracksdata/graph/__init__.py index fcf207e2..3906949b 100644 --- a/src/tracksdata/graph/__init__.py +++ b/src/tracksdata/graph/__init__.py @@ -1,10 +1,10 @@ """Graph backends for representing tracking data as directed graphs in memory or on disk.""" -from tracksdata.graph._base_graph import BaseGraph +from tracksdata.graph._base_graph import BaseGraph, MetadataView from tracksdata.graph._graph_view import GraphView from tracksdata.graph._rustworkx_graph import IndexedRXGraph, RustWorkXGraph from tracksdata.graph._sql_graph import SQLGraph InMemoryGraph = RustWorkXGraph -__all__ = ["BaseGraph", "GraphView", "InMemoryGraph", "IndexedRXGraph", "RustWorkXGraph", "SQLGraph"] +__all__ = ["BaseGraph", "GraphView", "InMemoryGraph", "IndexedRXGraph", "MetadataView", "RustWorkXGraph", "SQLGraph"] diff --git a/src/tracksdata/graph/_base_graph.py b/src/tracksdata/graph/_base_graph.py index 5b3708ad..90500b67 100644 --- a/src/tracksdata/graph/_base_graph.py +++ b/src/tracksdata/graph/_base_graph.py @@ -42,11 +42,75 @@ T = TypeVar("T", bound="BaseGraph") +class MetadataView(dict[str, Any]): + """Dictionary-like metadata view that syncs mutations back to the graph.""" + + _MISSING = object() + + def __init__( + self, + graph: "BaseGraph", + data: dict[str, Any], + *, + is_public: bool = True, + ) -> None: + super().__init__(data) + self._graph = graph + self._is_public = is_public + + def __setitem__(self, key: str, value: Any) -> None: + self._graph._set_metadata_with_validation(is_public=self._is_public, **{key: value}) + super().__setitem__(key, value) + + def __delitem__(self, key: str) -> None: + self._graph._remove_metadata_with_validation(key, is_public=self._is_public) + super().__delitem__(key) + + def pop(self, key: str, default: Any = _MISSING) -> Any: + self._graph._validate_metadata_key(key, is_public=self._is_public) + + if key not in self: + if default is self._MISSING: + raise KeyError(key) + return default + + value = super().__getitem__(key) + self._graph._remove_metadata_with_validation(key, is_public=self._is_public) + super().pop(key, None) + return value + + def popitem(self) -> tuple[str, Any]: + key, value = super().popitem() + self._graph._remove_metadata_with_validation(key, is_public=self._is_public) + return key, value + + def clear(self) -> None: + keys = list(self.keys()) + for key in keys: + self._graph._remove_metadata_with_validation(key, is_public=self._is_public) + super().clear() + + def setdefault(self, key: str, default: Any = None) -> Any: + if key in self: + return super().__getitem__(key) + self._graph._set_metadata_with_validation(is_public=self._is_public, **{key: default}) + super().__setitem__(key, default) + return default + + def update(self, *args, **kwargs) -> None: + updates = dict(*args, **kwargs) + if updates: + self._graph._set_metadata_with_validation(is_public=self._is_public, **updates) + super().update(updates) + + class BaseGraph(abc.ABC): """ Base class for a graph backend. """ + _PRIVATE_METADATA_PREFIX = "__private_" + node_added = Signal(int) node_removed = Signal(int) @@ -1186,7 +1250,8 @@ def from_other(cls: type[T], other: "BaseGraph", **kwargs) -> T: node_attrs = node_attrs.drop(DEFAULT_ATTR_KEYS.NODE_ID) graph = cls(**kwargs) - graph.update_metadata(**other.metadata()) + graph.metadata.update(other.metadata) + graph._private_metadata.update(other._private_metadata_for_copy()) current_node_attr_schemas = graph._node_attr_schemas() for k, v in other._node_attr_schemas().items(): @@ -1216,7 +1281,6 @@ def from_other(cls: type[T], other: "BaseGraph", **kwargs) -> T: current_edge_attr_schemas = graph._edge_attr_schemas() for k, v in other._edge_attr_schemas().items(): if k not in current_edge_attr_schemas: - print(f"Adding edge attribute key: {k} with dtype: {v.dtype} and default value: {v.default_value}") graph.add_edge_attr_key(k, v.dtype, v.default_value) edge_attrs = edge_attrs.with_columns( @@ -1786,7 +1850,7 @@ def to_geff( for k, v in edge_attrs.to_dict().items() } - td_metadata = self.metadata().copy() + td_metadata = self.metadata.copy() td_metadata.pop("geff", None) # avoid geff being written multiple times geff_metadata = geff.GeffMetadata( @@ -1824,57 +1888,89 @@ def to_geff( zarr_format=zarr_format, ) - @abc.abstractmethod - def metadata(self) -> dict[str, Any]: + @property + def metadata(self) -> MetadataView: """ Return the metadata of the graph. Returns ------- - dict[str, Any] + MetadataView The metadata of the graph as a dictionary. Examples -------- ```python - metadata = graph.metadata() + metadata = graph.metadata print(metadata["shape"]) ``` """ + return MetadataView( + graph=self, + data={k: v for k, v in self._metadata().items() if not self._is_private_metadata_key(k)}, + is_public=True, + ) - @abc.abstractmethod - def update_metadata(self, **kwargs) -> None: - """ - Set or update metadata for the graph. + @property + def _private_metadata(self) -> MetadataView: + return MetadataView( + graph=self, + data={k: v for k, v in self._metadata().items() if self._is_private_metadata_key(k)}, + is_public=False, + ) - Parameters - ---------- - **kwargs : Any - The metadata items to set by key. Values will be stored as JSON. + @classmethod + def _is_private_metadata_key(cls, key: str) -> bool: + return key.startswith(cls._PRIVATE_METADATA_PREFIX) + + def _validate_metadata_key(self, key: str, *, is_public: bool) -> None: + if not isinstance(key, str): + raise TypeError(f"Metadata key must be a string. Got {type(key)}.") + is_private_key = self._is_private_metadata_key(key) + if is_public and is_private_key: + raise ValueError(f"Metadata key '{key}' is reserved for internal use.") + if not is_public and not is_private_key: + raise ValueError( + f"Metadata key '{key}' is not private. Private metadata keys must start with " + f"'{self._PRIVATE_METADATA_PREFIX}'." + ) - Examples - -------- - ```python - graph.update_metadata(shape=[1, 25, 25], path="path/to/image.ome.zarr") - graph.update_metadata(description="Tracking data from experiment 1") - ``` + def _validate_metadata_keys(self, keys: Sequence[str], *, is_public: bool) -> None: + for key in keys: + self._validate_metadata_key(key, is_public=is_public) + + def _set_metadata_with_validation(self, is_public: bool = True, **kwargs) -> None: + self._validate_metadata_keys(kwargs.keys(), is_public=is_public) + self._update_metadata(**kwargs) + + def _remove_metadata_with_validation(self, key: str, *, is_public: bool = True) -> None: + self._validate_metadata_key(key, is_public=is_public) + self._remove_metadata(key) + + def _private_metadata_for_copy(self) -> dict[str, Any]: """ + Return private metadata entries that should be propagated by `from_other`. + + Backends can override this to exclude backend-specific private metadata. + """ + return dict(self._private_metadata) @abc.abstractmethod - def remove_metadata(self, key: str) -> None: + def _metadata(self) -> dict[str, Any]: + """ + Return the full metadata including private keys. """ - Remove a metadata key from the graph. - Parameters - ---------- - key : str - The key of the metadata to remove. + @abc.abstractmethod + def _update_metadata(self, **kwargs) -> None: + """ + Backend-specific metadata update implementation without public key validation. + """ - Examples - -------- - ```python - graph.remove_metadata("shape") - ``` + @abc.abstractmethod + def _remove_metadata(self, key: str) -> None: + """ + Backend-specific metadata removal implementation without public key validation. """ def to_traccuracy_graph(self, array_view_kwargs: dict[str, Any] | None = None) -> "TrackingGraph": diff --git a/src/tracksdata/graph/_graph_view.py b/src/tracksdata/graph/_graph_view.py index b9f82ead..c689931d 100644 --- a/src/tracksdata/graph/_graph_view.py +++ b/src/tracksdata/graph/_graph_view.py @@ -847,11 +847,11 @@ def copy(self, **kwargs) -> "GraphView": "Use `detach` to create a new reference-less graph with the same nodes and edges." ) - def metadata(self) -> dict[str, Any]: - return self._root.metadata() + def _metadata(self) -> dict[str, Any]: + return self._root._metadata() - def update_metadata(self, **kwargs) -> None: - self._root.update_metadata(**kwargs) + def _update_metadata(self, **kwargs) -> None: + self._root._update_metadata(**kwargs) - def remove_metadata(self, key: str) -> None: - self._root.remove_metadata(key) + def _remove_metadata(self, key: str) -> None: + self._root._remove_metadata(key) diff --git a/src/tracksdata/graph/_rustworkx_graph.py b/src/tracksdata/graph/_rustworkx_graph.py index ef4a3f4f..a415b89d 100644 --- a/src/tracksdata/graph/_rustworkx_graph.py +++ b/src/tracksdata/graph/_rustworkx_graph.py @@ -343,7 +343,7 @@ def __init__(self, rx_graph: rx.PyDiGraph | None = None) -> None: self._time_to_nodes: dict[int, list[int]] = {} self.__node_attr_schemas: dict[str, AttrSchema] = {} self.__edge_attr_schemas: dict[str, AttrSchema] = {} - self._overlaps: list[list[int, 2]] = [] + self._overlaps: list[list[int]] = [] # Add default node attributes with inferred schemas self.__node_attr_schemas[DEFAULT_ATTR_KEYS.T] = AttrSchema( @@ -371,7 +371,7 @@ def __init__(self, rx_graph: rx.PyDiGraph | None = None) -> None: elif not isinstance(self._graph.attrs, dict): LOG.warning( - "previous attribute %s will be added to key 'old_attrs' of `graph.metadata()`", + "previous attribute %s will be added to key 'old_attrs' of `graph.metadata`", self._graph.attrs, ) self._graph.attrs = { @@ -1153,16 +1153,11 @@ def edge_attrs( edge_map = rx_graph.edge_index_map() if len(edge_map) == 0: - return pl.DataFrame( - { - key: [] - for key in [ - *attr_keys, - DEFAULT_ATTR_KEYS.EDGE_SOURCE, - DEFAULT_ATTR_KEYS.EDGE_TARGET, - ] - } - ) + empty_columns = {} + for key in [*attr_keys, DEFAULT_ATTR_KEYS.EDGE_SOURCE, DEFAULT_ATTR_KEYS.EDGE_TARGET]: + schema = self._edge_attr_schemas()[key] + empty_columns[key] = pl.Series(name=key, values=[], dtype=schema.dtype) + return pl.DataFrame(empty_columns) source, target, data = zip(*edge_map.values(), strict=False) @@ -1499,13 +1494,13 @@ def edge_id(self, source_id: int, target_id: int) -> int: """ return self.rx_graph.get_edge_data(source_id, target_id)[DEFAULT_ATTR_KEYS.EDGE_ID] - def metadata(self) -> dict[str, Any]: + def _metadata(self) -> dict[str, Any]: return self._graph.attrs - def update_metadata(self, **kwargs) -> None: + def _update_metadata(self, **kwargs) -> None: self._graph.attrs.update(kwargs) - def remove_metadata(self, key: str) -> None: + def _remove_metadata(self, key: str) -> None: self._graph.attrs.pop(key, None) def edge_list(self) -> list[list[int, int]]: diff --git a/src/tracksdata/graph/_sql_graph.py b/src/tracksdata/graph/_sql_graph.py index 985cbdc9..65784572 100644 --- a/src/tracksdata/graph/_sql_graph.py +++ b/src/tracksdata/graph/_sql_graph.py @@ -20,8 +20,10 @@ from tracksdata.utils._dataframe import unpack_array_attrs, unpickle_bytes_columns from tracksdata.utils._dtypes import ( AttrSchema, + deserialize_attr_schema, polars_dtype_to_sqlalchemy_type, process_attr_key_args, + serialize_attr_schema, sqlalchemy_type_to_polars_dtype, ) from tracksdata.utils._logging import LOG @@ -441,6 +443,8 @@ class SQLGraph(BaseGraph): """ node_id_time_multiplier: int = 1_000_000_000 + _PRIVATE_SQL_NODE_SCHEMA_STORE_KEY = "__private_sql_node_attr_schema_store" + _PRIVATE_SQL_EDGE_SCHEMA_STORE_KEY = "__private_sql_edge_attr_schema_store" Base: type[DeclarativeBase] Node: type[DeclarativeBase] Edge: type[DeclarativeBase] @@ -469,17 +473,12 @@ def __init__( # Create unique classes for this instance self._define_schema(overwrite=overwrite) - self.__node_attr_schemas: dict[str, AttrSchema] = {} - self.__edge_attr_schemas: dict[str, AttrSchema] = {} if overwrite: self.Base.metadata.drop_all(self._engine) self.Base.metadata.create_all(self._engine) - # Initialize schemas from existing table columns - self._init_schemas_from_tables() - self._max_id_per_time = {} self._update_max_id_per_time() @@ -551,68 +550,133 @@ class Metadata(Base): self.Overlap = Overlap self.Metadata = Metadata - def _init_schemas_from_tables(self) -> None: - """ - Initialize AttrSchema objects from existing database table columns. - This is used when loading an existing graph from the database. - """ - # Initialize node schemas from Node table columns - for column_name in self.Node.__table__.columns.keys(): - if column_name not in self.__node_attr_schemas: - column = self.Node.__table__.columns[column_name] - # Infer polars dtype from SQLAlchemy type - pl_dtype = sqlalchemy_type_to_polars_dtype(column.type) - # AttrSchema.__post_init__ will infer the default_value - self.__node_attr_schemas[column_name] = AttrSchema( - key=column_name, - dtype=pl_dtype, - ) + @staticmethod + def _default_node_attr_schemas() -> dict[str, AttrSchema]: + return { + DEFAULT_ATTR_KEYS.T: AttrSchema(key=DEFAULT_ATTR_KEYS.T, dtype=pl.Int32), + DEFAULT_ATTR_KEYS.NODE_ID: AttrSchema(key=DEFAULT_ATTR_KEYS.NODE_ID, dtype=pl.Int64), + } + + @staticmethod + def _default_edge_attr_schemas() -> dict[str, AttrSchema]: + return { + DEFAULT_ATTR_KEYS.EDGE_ID: AttrSchema(key=DEFAULT_ATTR_KEYS.EDGE_ID, dtype=pl.Int32), + DEFAULT_ATTR_KEYS.EDGE_SOURCE: AttrSchema(key=DEFAULT_ATTR_KEYS.EDGE_SOURCE, dtype=pl.Int64), + DEFAULT_ATTR_KEYS.EDGE_TARGET: AttrSchema(key=DEFAULT_ATTR_KEYS.EDGE_TARGET, dtype=pl.Int64), + } + + def _attr_schemas_from_metadata( + self, + *, + table_class: type[DeclarativeBase], + metadata_key: str, + default_schemas: dict[str, AttrSchema], + preferred_order: Sequence[str], + ) -> dict[str, AttrSchema]: + encoded_schemas = self._private_metadata.get(metadata_key, {}) + schemas = default_schemas.copy() + schemas.update( + {key: deserialize_attr_schema(encoded_schema, key=key) for key, encoded_schema in encoded_schemas.items()} + ) - # Initialize edge schemas from Edge table columns - for column_name in self.Edge.__table__.columns.keys(): - # Skip internal edge columns - if column_name not in self.__edge_attr_schemas: - column = self.Edge.__table__.columns[column_name] - # Infer polars dtype from SQLAlchemy type - pl_dtype = sqlalchemy_type_to_polars_dtype(column.type) - # AttrSchema.__post_init__ will infer the default_value - self.__edge_attr_schemas[column_name] = AttrSchema( + # Legacy databases may not have schema metadata for all columns. + for column_name, column in table_class.__table__.columns.items(): + if column_name not in schemas: + schemas[column_name] = AttrSchema( key=column_name, - dtype=pl_dtype, + dtype=sqlalchemy_type_to_polars_dtype(column.type), ) + ordered_keys = [key for key in preferred_order if key in schemas] + ordered_keys.extend(key for key in table_class.__table__.columns.keys() if key not in ordered_keys) + ordered_keys.extend(key for key in schemas if key not in ordered_keys) + return {key: schemas[key] for key in ordered_keys} + + def _attr_schemas_for_table(self, table_class: type[DeclarativeBase]) -> dict[str, AttrSchema]: + if table_class.__tablename__ == self.Node.__tablename__: + return self._node_attr_schemas() + return self._edge_attr_schemas() + + @staticmethod + def _is_pickled_sql_type(column_type: TypeEngine) -> bool: + return isinstance(column_type, sa.PickleType | sa.LargeBinary) + + @property + def __node_attr_schemas(self) -> dict[str, AttrSchema]: + return self._attr_schemas_from_metadata( + table_class=self.Node, + metadata_key=self._PRIVATE_SQL_NODE_SCHEMA_STORE_KEY, + default_schemas=self._default_node_attr_schemas(), + preferred_order=[DEFAULT_ATTR_KEYS.T, DEFAULT_ATTR_KEYS.NODE_ID], + ) + + @__node_attr_schemas.setter + def __node_attr_schemas(self, schemas: dict[str, AttrSchema]) -> None: + merged_schemas = self._default_node_attr_schemas() + merged_schemas.update(schemas) + schemas = merged_schemas + encoded_schemas = {key: serialize_attr_schema(schema) for key, schema in schemas.items()} + self._private_metadata[self._PRIVATE_SQL_NODE_SCHEMA_STORE_KEY] = encoded_schemas + + @property + def __edge_attr_schemas(self) -> dict[str, AttrSchema]: + return self._attr_schemas_from_metadata( + table_class=self.Edge, + metadata_key=self._PRIVATE_SQL_EDGE_SCHEMA_STORE_KEY, + default_schemas=self._default_edge_attr_schemas(), + preferred_order=[ + DEFAULT_ATTR_KEYS.EDGE_ID, + DEFAULT_ATTR_KEYS.EDGE_SOURCE, + DEFAULT_ATTR_KEYS.EDGE_TARGET, + ], + ) + + @__edge_attr_schemas.setter + def __edge_attr_schemas(self, schemas: dict[str, AttrSchema]) -> None: + merged_schemas = self._default_edge_attr_schemas() + merged_schemas.update(schemas) + schemas = merged_schemas + encoded_schemas = {key: serialize_attr_schema(schema) for key, schema in schemas.items()} + self._private_metadata[self._PRIVATE_SQL_EDGE_SCHEMA_STORE_KEY] = encoded_schemas + def _restore_pickled_column_types(self, table: sa.Table) -> None: for column in table.columns: if isinstance(column.type, sa.LargeBinary): column.type = sa.PickleType() def _polars_schema_override(self, table_class: type[DeclarativeBase]) -> SchemaDict: - # Get the appropriate schema dict based on table class - if table_class.__tablename__ == self.Node.__tablename__: - schemas = self._node_attr_schemas() - else: - schemas = self._edge_attr_schemas() + schemas = self._attr_schemas_for_table(table_class) - # Return schema overrides for special types that need explicit casting + # Return schema overrides for columns safely represented in SQL. + # Pickled columns are unpickled and casted in a second pass. return { key: schema.dtype for key, schema in schemas.items() - if not (schema.dtype == pl.Object or isinstance(schema.dtype, pl.Array | pl.List)) + if ( + key in table_class.__table__.columns + and not self._is_pickled_sql_type(table_class.__table__.columns[key].type) + ) } def _cast_array_columns(self, table_class: type[DeclarativeBase], df: pl.DataFrame) -> pl.DataFrame: - # Get the appropriate schema dict based on table class - if table_class.__tablename__ == self.Node.__tablename__: - schemas = self._node_attr_schemas() - else: - schemas = self._edge_attr_schemas() + schemas = self._attr_schemas_for_table(table_class) - # Cast array columns (stored as blobs in database) - df = df.with_columns( - pl.Series(key, df[key].to_list(), dtype=schema.dtype) - for key, schema in schemas.items() - if isinstance(schema.dtype, pl.Array) and key in df.columns - ) + casts: list[pl.Series] = [] + for key, schema in schemas.items(): + if key not in df.columns or key not in table_class.__table__.columns: + continue + + if not self._is_pickled_sql_type(table_class.__table__.columns[key].type): + continue + + try: + casts.append(pl.Series(key, df[key].to_list(), dtype=schema.dtype)) + except Exception: + # Keep original dtype when values cannot be casted to the target schema. + continue + + if casts: + df = df.with_columns(casts) return df def _update_max_id_per_time(self) -> None: @@ -1289,6 +1353,8 @@ def node_attrs( # indices are included by default and must be removed if attr_keys is not None: nodes_df = nodes_df.select([pl.col(c) for c in attr_keys]) + else: + nodes_df = nodes_df.select([pl.col(c) for c in self._node_attr_schemas() if c in nodes_df.columns]) if unpack: nodes_df = unpack_array_attrs(nodes_df) @@ -1331,6 +1397,8 @@ def edge_attrs( if unpack: edges_df = unpack_array_attrs(edges_df) + elif attr_keys is None: + edges_df = edges_df.select([pl.col(c) for c in self._edge_attr_schemas() if c in edges_df.columns]) return edges_df @@ -1575,6 +1643,9 @@ def _add_new_column( sa_column = sa.Column(schema.key, sa_type, default=default_value) str_dialect_type = sa_column.type.compile(dialect=self._engine.dialect) + identifier_preparer = self._engine.dialect.identifier_preparer + quoted_table_name = identifier_preparer.format_table(table_class.__table__) + quoted_column_name = identifier_preparer.quote(sa_column.name) # Properly quote default values based on type if isinstance(default_value, str): @@ -1585,8 +1656,8 @@ def _add_new_column( quoted_default = str(default_value) add_column_stmt = sa.DDL( - f"ALTER TABLE {table_class.__table__} ADD " - f"COLUMN {sa_column.name} {str_dialect_type} " + f"ALTER TABLE {quoted_table_name} ADD " + f"COLUMN {quoted_column_name} {str_dialect_type} " f"DEFAULT {quoted_default}", ) LOG.info("add %s column statement:\n'%s'", table_class.__table__, add_column_stmt) @@ -1601,7 +1672,10 @@ def _add_new_column( table_class.__table__.append_column(sa_column) def _drop_column(self, table_class: type[DeclarativeBase], key: str) -> None: - drop_column_stmt = sa.DDL(f"ALTER TABLE {table_class.__table__} DROP COLUMN {key}") + identifier_preparer = self._engine.dialect.identifier_preparer + quoted_table_name = identifier_preparer.format_table(table_class.__table__) + quoted_column_name = identifier_preparer.quote(key) + drop_column_stmt = sa.DDL(f"ALTER TABLE {quoted_table_name} DROP COLUMN {quoted_column_name}") LOG.info("drop %s column statement:\n'%s'", table_class.__table__, drop_column_stmt) with Session(self._engine) as session: @@ -1617,14 +1691,14 @@ def add_node_attr_key( dtype: pl.DataType | None = None, default_value: Any = None, ) -> None: + node_schemas = self.__node_attr_schemas # Process arguments and create validated schema - schema = process_attr_key_args(key_or_schema, dtype, default_value, self.__node_attr_schemas) - - # Store schema - self.__node_attr_schemas[schema.key] = schema + schema = process_attr_key_args(key_or_schema, dtype, default_value, node_schemas) # Add column to database self._add_new_column(self.Node, schema) + node_schemas[schema.key] = schema + self.__node_attr_schemas = node_schemas def remove_node_attr_key(self, key: str) -> None: if key not in self.node_attr_keys(): @@ -1633,8 +1707,10 @@ def remove_node_attr_key(self, key: str) -> None: if key in (DEFAULT_ATTR_KEYS.NODE_ID, DEFAULT_ATTR_KEYS.T): raise ValueError(f"Cannot remove required node attribute key {key}") + node_schemas = self.__node_attr_schemas self._drop_column(self.Node, key) - self.__node_attr_schemas.pop(key, None) + node_schemas.pop(key, None) + self.__node_attr_schemas = node_schemas def add_edge_attr_key( self, @@ -1642,21 +1718,23 @@ def add_edge_attr_key( dtype: pl.DataType | None = None, default_value: Any = None, ) -> None: + edge_schemas = self.__edge_attr_schemas # Process arguments and create validated schema - schema = process_attr_key_args(key_or_schema, dtype, default_value, self.__edge_attr_schemas) - - # Store schema - self.__edge_attr_schemas[schema.key] = schema + schema = process_attr_key_args(key_or_schema, dtype, default_value, edge_schemas) # Add column to database self._add_new_column(self.Edge, schema) + edge_schemas[schema.key] = schema + self.__edge_attr_schemas = edge_schemas def remove_edge_attr_key(self, key: str) -> None: if key not in self.edge_attr_keys(): raise ValueError(f"Edge attribute key {key} does not exist") + edge_schemas = self.__edge_attr_schemas self._drop_column(self.Edge, key) - self.__edge_attr_schemas.pop(key, None) + edge_schemas.pop(key, None) + self.__edge_attr_schemas = edge_schemas def num_edges(self) -> int: with Session(self._engine) as session: @@ -1992,19 +2070,25 @@ def remove_edge( raise ValueError(f"Edge {edge_id} does not exist in the graph.") session.commit() - def metadata(self) -> dict[str, Any]: + def _metadata(self) -> dict[str, Any]: with Session(self._engine) as session: result = session.query(self.Metadata).all() return {row.key: row.value for row in result} - def update_metadata(self, **kwargs) -> None: + def _private_metadata_for_copy(self) -> dict[str, Any]: + private_metadata = super()._private_metadata_for_copy() + private_metadata.pop(self._PRIVATE_SQL_NODE_SCHEMA_STORE_KEY, None) + private_metadata.pop(self._PRIVATE_SQL_EDGE_SCHEMA_STORE_KEY, None) + return private_metadata + + def _update_metadata(self, **kwargs) -> None: with Session(self._engine) as session: for key, value in kwargs.items(): metadata_entry = self.Metadata(key=key, value=value) session.merge(metadata_entry) session.commit() - def remove_metadata(self, key: str) -> None: + def _remove_metadata(self, key: str) -> None: with Session(self._engine) as session: session.query(self.Metadata).filter(self.Metadata.key == key).delete() session.commit() diff --git a/src/tracksdata/graph/_test/test_graph_backends.py b/src/tracksdata/graph/_test/test_graph_backends.py index d6084cd8..8aa2324f 100644 --- a/src/tracksdata/graph/_test/test_graph_backends.py +++ b/src/tracksdata/graph/_test/test_graph_backends.py @@ -1,3 +1,4 @@ +import datetime as dt from pathlib import Path from typing import Any @@ -1359,7 +1360,7 @@ def test_from_other_with_edges( ) -> None: """Ensure from_other preserves structure across backend conversions.""" # Create source graph with nodes, edges, and attributes - graph_backend.update_metadata(special_key="special_value") + graph_backend.metadata.update(special_key="special_value") graph_backend.add_node_attr_key("x", dtype=pl.Float64) graph_backend.add_edge_attr_key("weight", dtype=pl.Float64, default_value=-1) @@ -1386,7 +1387,7 @@ def test_from_other_with_edges( assert set(new_graph.node_attr_keys()) == set(graph_backend.node_attr_keys()) assert set(new_graph.edge_attr_keys()) == set(graph_backend.edge_attr_keys()) - assert new_graph.metadata() == graph_backend.metadata() + assert new_graph.metadata == graph_backend.metadata assert new_graph._node_attr_schemas() == graph_backend._node_attr_schemas() assert new_graph._edge_attr_schemas() == graph_backend._edge_attr_schemas() @@ -1437,6 +1438,108 @@ def test_from_other_with_edges( assert new_overlaps == source_overlaps +@pytest.mark.parametrize( + ("target_cls", "target_kwargs"), + [ + pytest.param(RustWorkXGraph, {}, id="rustworkx"), + pytest.param( + SQLGraph, + { + "drivername": "sqlite", + "database": ":memory:", + "engine_kwargs": {"connect_args": {"check_same_thread": False}}, + }, + id="sql", + ), + pytest.param(IndexedRXGraph, {}, id="indexed"), + ], +) +def test_from_other_preserves_schema_roundtrip(target_cls: type[BaseGraph], target_kwargs: dict[str, Any]) -> None: + """Test that from_other preserves node and edge attribute schemas across backends.""" + graph = RustWorkXGraph() + for dtype in [ + pl.Float16, + pl.Float32, + pl.Float64, + pl.Int8, + pl.Int16, + pl.Int32, + pl.Int64, + pl.UInt8, + pl.UInt16, + pl.UInt32, + pl.UInt64, + pl.Date, + pl.Datetime, + pl.Boolean, + pl.Array(pl.Float32, 3), + pl.List(pl.Int32), + pl.Struct({"a": pl.Int8, "b": pl.Array(pl.String, 2)}), + pl.String, + pl.Object, + ]: + graph.add_node_attr_key(f"attr_{dtype}", dtype=dtype) + graph.add_node( + { + "t": 0, + "attr_Float16": np.float16(1.5), + "attr_Float32": np.float32(2.5), + "attr_Float64": np.float64(3.5), + "attr_Int8": np.int8(4), + "attr_Int16": np.int16(5), + "attr_Int32": np.int32(6), + "attr_Int64": np.int64(7), + "attr_UInt8": np.uint8(8), + "attr_UInt16": np.uint16(9), + "attr_UInt32": np.uint32(10), + "attr_UInt64": np.uint64(11), + "attr_Date": pl.date(2024, 1, 1), + "attr_Datetime": dt.datetime(2024, 1, 1, 12, 0, 0), + "attr_Boolean": True, + "attr_Array(Float32, shape=(3,))": np.array([1.0, 2.0, 3.0], dtype=np.float32), + "attr_List(Int32)": [1, 2, 3], + "attr_Struct({'a': Int8, 'b': Array(String, shape=(2,))})": { + "a": 1, + "b": np.array(["x", "y"], dtype=object), + }, + "attr_String": "test", + "attr_Object": {"key": "value"}, + } + ) + graph2 = target_cls.from_other(graph, **target_kwargs) + + assert graph2.num_nodes() == graph.num_nodes() + assert set(graph2.node_attr_keys()) == set(graph.node_attr_keys()) + + assert graph2._node_attr_schemas() == graph._node_attr_schemas() + assert graph2._edge_attr_schemas() == graph._edge_attr_schemas() + assert graph2.node_attrs().schema == graph.node_attrs().schema + assert graph2.edge_attrs().schema == graph.edge_attrs().schema + + graph3 = RustWorkXGraph.from_other(graph2) + assert graph3._node_attr_schemas() == graph._node_attr_schemas() + assert graph3._edge_attr_schemas() == graph._edge_attr_schemas() + assert graph3.node_attrs().schema == graph.node_attrs().schema + assert graph3.edge_attrs().schema == graph.edge_attrs().schema + + +@pytest.mark.xfail(reason="This is because of the lack of support of shape-less pl.Array in write_ipc of polars.") +def test_from_other_with_array_no_shape(): + """Test that from_other raises an error when trying to copy array attributes without shape information.""" + graph = RustWorkXGraph() + graph.add_node_attr_key("array_attr", pl.Array) + graph.add_node({"t": 0, "array_attr": np.array([1.0, 2.0, 3.0], dtype=np.float32)}) + + # This should raise an error because the schema does not include shape information + graph2 = SQLGraph.from_other( + graph, drivername="sqlite", database=":memory:", engine_kwargs={"connect_args": {"check_same_thread": False}} + ) + assert graph2.num_nodes() == graph.num_nodes() + assert set(graph2.node_attr_keys()) == set(graph.node_attr_keys()) + assert graph2._node_attr_schemas() == graph._node_attr_schemas() + assert graph2.node_attrs().schema == graph.node_attrs().schema + + @pytest.mark.parametrize( ("target_cls", "target_kwargs"), [ @@ -1619,6 +1722,72 @@ def test_sql_graph_max_id_restored_per_timepoint(tmp_path: Path) -> None: assert next_id == first_id + 1 +def test_sql_graph_schema_defaults_survive_reload(tmp_path: Path) -> None: + """Reloading a SQLGraph should preserve dtype and default schema metadata.""" + db_path = tmp_path / "schema_defaults.db" + graph = SQLGraph("sqlite", str(db_path)) + + node_array_default = np.array([1.0, 2.0, 3.0], dtype=np.float32) + node_object_default = {"nested": [1, 2, 3]} + edge_score_default = 0.25 + + graph.add_node_attr_key("node_array_default", pl.Array(pl.Float32, 3), node_array_default) + graph.add_node_attr_key("node_object_default", pl.Object, node_object_default) + graph.add_edge_attr_key("edge_score_default", pl.Float32, edge_score_default) + graph._engine.dispose() + + reloaded = SQLGraph("sqlite", str(db_path)) + + node_schemas = reloaded._node_attr_schemas() + edge_schemas = reloaded._edge_attr_schemas() + np.testing.assert_array_equal(node_schemas["node_array_default"].default_value, node_array_default) + assert node_schemas["node_array_default"].dtype == pl.Array(pl.Float32, 3) + assert node_schemas["node_object_default"].default_value == node_object_default + assert node_schemas["node_object_default"].dtype == pl.Object + assert edge_schemas["edge_score_default"].default_value == edge_score_default + assert edge_schemas["edge_score_default"].dtype == pl.Float32 + + +def test_sql_schema_metadata_not_copied_to_in_memory_graphs() -> None: + """SQL-private schema metadata should not leak into in-memory backends via from_other.""" + sql_graph = SQLGraph("sqlite", ":memory:") + sql_graph.add_node_attr_key("node_array_default", pl.Array(pl.Float32, 3), np.array([1.0, 2.0, 3.0], np.float32)) + sql_graph.add_node_attr_key("node_object_default", pl.Object, {"payload": [1, 2, 3]}) + sql_graph.add_edge_attr_key("edge_score_default", pl.Float32, 0.25) + + n1 = sql_graph.add_node( + { + "t": 0, + "node_array_default": np.array([1.0, 1.0, 1.0], dtype=np.float32), + "node_object_default": {"payload": [10]}, + } + ) + n2 = sql_graph.add_node( + { + "t": 1, + "node_array_default": np.array([2.0, 2.0, 2.0], dtype=np.float32), + "node_object_default": {"payload": [20]}, + } + ) + sql_graph.add_edge(n1, n2, {"edge_score_default": 0.75}) + + assert SQLGraph._PRIVATE_SQL_NODE_SCHEMA_STORE_KEY in sql_graph._private_metadata + assert SQLGraph._PRIVATE_SQL_EDGE_SCHEMA_STORE_KEY in sql_graph._private_metadata + + rx_graph = RustWorkXGraph.from_other(sql_graph) + assert SQLGraph._PRIVATE_SQL_NODE_SCHEMA_STORE_KEY not in rx_graph._metadata() + assert SQLGraph._PRIVATE_SQL_EDGE_SCHEMA_STORE_KEY not in rx_graph._metadata() + + sql_graph_roundtrip = SQLGraph.from_other( + rx_graph, + drivername="sqlite", + database=":memory:", + engine_kwargs={"connect_args": {"check_same_thread": False}}, + ) + assert sql_graph_roundtrip._node_attr_schemas() == sql_graph._node_attr_schemas() + assert sql_graph_roundtrip._edge_attr_schemas() == sql_graph._edge_attr_schemas() + + def test_compute_overlaps_invalid_threshold(graph_backend: BaseGraph) -> None: """Test compute_overlaps with invalid threshold values.""" with pytest.raises(ValueError, match=r"iou_threshold must be between 0.0 and 1\.0"): @@ -2322,7 +2491,7 @@ def _fill_mock_geff_graph(graph_backend: BaseGraph) -> None: graph_backend.add_edge_attr_key("weight", pl.Float16) - graph_backend.update_metadata( + graph_backend.metadata.update( shape=[1, 25, 25], path="path/to/image.ome.zarr", ) @@ -2383,11 +2552,11 @@ def test_geff_roundtrip(graph_backend: BaseGraph) -> None: geff_graph, _ = IndexedRXGraph.from_geff(output_store) - assert "geff" in geff_graph.metadata() + assert "geff" in geff_graph.metadata # geff metadata was not stored in original graph - geff_graph.metadata().pop("geff") - assert geff_graph.metadata() == graph_backend.metadata() + geff_graph.metadata.pop("geff") + assert geff_graph.metadata == graph_backend.metadata assert geff_graph.num_nodes() == 3 assert geff_graph.num_edges() == 2 @@ -2442,11 +2611,11 @@ def test_geff_with_keymapping(graph_backend: BaseGraph) -> None: edge_attr_key_map={"weight": "weight_new"}, ) - assert "geff" in geff_graph.metadata() + assert "geff" in geff_graph.metadata # geff metadata was not stored in original graph - geff_graph.metadata().pop("geff") - assert geff_graph.metadata() == graph_backend.metadata() + geff_graph.metadata.pop("geff") + assert geff_graph.metadata == graph_backend.metadata assert geff_graph.num_nodes() == 3 assert geff_graph.num_edges() == 2 @@ -2483,34 +2652,58 @@ def test_metadata_multiple_dtypes(graph_backend: BaseGraph) -> None: } # Update metadata with all test values - graph_backend.update_metadata(**test_metadata) + graph_backend.metadata.update(**test_metadata) # Retrieve and verify - retrieved = graph_backend.metadata() + retrieved = graph_backend.metadata for key, expected_value in test_metadata.items(): assert key in retrieved, f"Key '{key}' not found in metadata" assert retrieved[key] == expected_value, f"Value mismatch for '{key}': {retrieved[key]} != {expected_value}" # Test updating existing keys - graph_backend.update_metadata(string="updated_value", new_key="new_value") - retrieved = graph_backend.metadata() + graph_backend.metadata.update(string="updated_value", new_key="new_value") + retrieved = graph_backend.metadata assert retrieved["string"] == "updated_value" assert retrieved["new_key"] == "new_value" assert retrieved["integer"] == 42 # Other values unchanged # Testing removing metadata - graph_backend.remove_metadata("string") - retrieved = graph_backend.metadata() + graph_backend.metadata.pop("string", None) + retrieved = graph_backend.metadata assert "string" not in retrieved - graph_backend.remove_metadata("mixed_list") - retrieved = graph_backend.metadata() + graph_backend.metadata.pop("mixed_list", None) + retrieved = graph_backend.metadata assert "string" not in retrieved assert "mixed_list" not in retrieved +def test_private_metadata_is_hidden_from_public_apis(graph_backend: BaseGraph) -> None: + private_key = "__private_dtype_map" + + graph_backend._private_metadata.update(**{private_key: {"x": "float64"}}) + graph_backend.metadata.update(shape=[1, 2, 3]) + + public_metadata = graph_backend.metadata + assert private_key not in public_metadata + assert public_metadata["shape"] == [1, 2, 3] + + with pytest.raises(ValueError, match="reserved for internal use"): + graph_backend.metadata.update(**{private_key: {"x": "int64"}}) + + with pytest.raises(ValueError, match="reserved for internal use"): + graph_backend.metadata.pop(private_key, None) + + with pytest.raises(ValueError, match="is not private"): + graph_backend._private_metadata.update(shape=[1, 2, 3]) + + # Private metadata view can remove private keys. + graph_backend._private_metadata.pop(private_key, None) + assert private_key not in graph_backend._metadata() + + def test_pickle_roundtrip(graph_backend: BaseGraph) -> None: if isinstance(graph_backend, SQLGraph): pytest.skip("SQLGraph does not support pickle roundtrip") @@ -2585,7 +2778,7 @@ def test_to_traccuracy_graph(graph_backend: BaseGraph) -> None: graph_backend.add_node_attr_key("y", pl.Float64) graph_backend.add_node_attr_key(DEFAULT_ATTR_KEYS.MASK, pl.Object) graph_backend.add_node_attr_key(DEFAULT_ATTR_KEYS.BBOX, pl.Array(pl.Int64, 4)) - graph_backend.update_metadata(shape=[3, 25, 25]) + graph_backend.metadata.update(shape=[3, 25, 25]) # Create masks for first graph mask1_data = np.array([[True, True], [True, True]], dtype=bool) diff --git a/src/tracksdata/io/_test/test_ctc_io.py b/src/tracksdata/io/_test/test_ctc_io.py index 7c5fb925..01025213 100644 --- a/src/tracksdata/io/_test/test_ctc_io.py +++ b/src/tracksdata/io/_test/test_ctc_io.py @@ -68,7 +68,7 @@ def test_export_from_ctc_roundtrip(tmp_path: Path, metadata_shape: bool) -> None in_graph.add_edge(node_1, node_3, attrs={DEFAULT_ATTR_KEYS.EDGE_DIST: 1.0}) if metadata_shape: - in_graph.update_metadata(shape=(2, 4, 4)) + in_graph.metadata.update(shape=(2, 4, 4)) shape = None else: shape = (2, 4, 4) diff --git a/src/tracksdata/nodes/_regionprops.py b/src/tracksdata/nodes/_regionprops.py index c78feb32..5be49713 100644 --- a/src/tracksdata/nodes/_regionprops.py +++ b/src/tracksdata/nodes/_regionprops.py @@ -230,8 +230,8 @@ def add_nodes( axis_names = self._axis_names(labels) self._init_node_attrs(graph, axis_names, ndims=labels.ndim) - if "shape" not in graph.metadata(): - graph.update_metadata(shape=labels.shape) + if "shape" not in graph.metadata: + graph.metadata.update(shape=labels.shape) if t is None: time_points = range(labels.shape[0]) diff --git a/src/tracksdata/nodes/_test/test_regionprops.py b/src/tracksdata/nodes/_test/test_regionprops.py index 350d231b..567c62e0 100644 --- a/src/tracksdata/nodes/_test/test_regionprops.py +++ b/src/tracksdata/nodes/_test/test_regionprops.py @@ -79,8 +79,8 @@ def test_regionprops_add_nodes_2d() -> None: operator = RegionPropsNodes(extra_properties=extra_properties) operator.add_nodes(graph, labels=labels) - assert "shape" in graph.metadata() - assert graph.metadata()["shape"] == labels.shape + assert "shape" in graph.metadata + assert graph.metadata["shape"] == labels.shape # Check that nodes were added assert graph.num_nodes() == 2 # Two regions (labels 1 and 2) @@ -115,8 +115,8 @@ def test_regionprops_add_nodes_3d() -> None: operator = RegionPropsNodes(extra_properties=extra_properties) operator.add_nodes(graph, labels=labels) - assert "shape" in graph.metadata() - assert graph.metadata()["shape"] == labels.shape + assert "shape" in graph.metadata + assert graph.metadata["shape"] == labels.shape # Check that nodes were added assert graph.num_nodes() == 2 # Two regions @@ -150,8 +150,8 @@ def test_regionprops_add_nodes_with_intensity() -> None: operator.add_nodes(graph, labels=labels, intensity_image=intensity) - assert "shape" in graph.metadata() - assert graph.metadata()["shape"] == labels.shape + assert "shape" in graph.metadata + assert graph.metadata["shape"] == labels.shape # Check that nodes were added with intensity attributes nodes_df = graph.node_attrs() @@ -181,8 +181,8 @@ def test_regionprops_add_nodes_timelapse(n_workers: int) -> None: with options_context(n_workers=n_workers): operator.add_nodes(graph, labels=labels) - assert "shape" in graph.metadata() - assert graph.metadata()["shape"] == labels.shape + assert "shape" in graph.metadata + assert graph.metadata["shape"] == labels.shape # Check that nodes were added for both time points nodes_df = graph.node_attrs() @@ -209,8 +209,8 @@ def test_regionprops_add_nodes_timelapse_with_intensity() -> None: operator.add_nodes(graph, labels=labels, intensity_image=intensity) - assert "shape" in graph.metadata() - assert graph.metadata()["shape"] == labels.shape + assert "shape" in graph.metadata + assert graph.metadata["shape"] == labels.shape # Check that nodes were added with intensity attributes nodes_df = graph.node_attrs() @@ -237,8 +237,8 @@ def double_area(region: RegionProperties) -> float: operator.add_nodes(graph, labels=labels, t=0) - assert "shape" in graph.metadata() - assert graph.metadata()["shape"] == labels.shape + assert "shape" in graph.metadata + assert graph.metadata["shape"] == labels.shape # Check that custom property was calculated nodes_df = graph.node_attrs() @@ -275,8 +275,8 @@ def test_regionprops_mask_creation() -> None: operator.add_nodes(graph, labels=labels, t=0) - assert "shape" in graph.metadata() - assert graph.metadata()["shape"] == labels.shape + assert "shape" in graph.metadata + assert graph.metadata["shape"] == labels.shape # Check that masks were created nodes_df = graph.node_attrs() @@ -300,8 +300,8 @@ def test_regionprops_spacing() -> None: operator.add_nodes(graph, labels=labels, t=0) - assert "shape" in graph.metadata() - assert graph.metadata()["shape"] == labels.shape + assert "shape" in graph.metadata + assert graph.metadata["shape"] == labels.shape # Check that nodes were added (spacing affects internal calculations) nodes_df = graph.node_attrs() @@ -323,8 +323,8 @@ def test_regionprops_empty_labels() -> None: operator.add_nodes(graph, labels=labels, t=0) - assert "shape" in graph.metadata() - assert graph.metadata()["shape"] == labels.shape + assert "shape" in graph.metadata + assert graph.metadata["shape"] == labels.shape # No nodes should be added assert graph.num_nodes() == 0 diff --git a/src/tracksdata/solvers/_ilp_solver.py b/src/tracksdata/solvers/_ilp_solver.py index 6485eaf7..3f6676d5 100644 --- a/src/tracksdata/solvers/_ilp_solver.py +++ b/src/tracksdata/solvers/_ilp_solver.py @@ -175,6 +175,9 @@ def _evaluate_expr( expr: Attr, df: pl.DataFrame, ) -> list[float]: + if df.is_empty(): + return [] + if len(expr.expr_columns) == 0: return [expr.evaluate(df).item()] * len(df) else: @@ -388,7 +391,11 @@ def solve( node_attr_keys.extend(self.merge_weight_expr.columns) nodes_df = graph.node_attrs(attr_keys=node_attr_keys) - edges_df = graph.edge_attrs(attr_keys=self.edge_weight_expr.columns) + # When no edges exist, avoid requesting edge weight columns that may not + # be registered in the backend schema yet. _solve() handles this as a + # regular "no edges" ValueError. + edge_attr_keys = [] if graph.num_edges() == 0 else self.edge_weight_expr.columns + edges_df = graph.edge_attrs(attr_keys=edge_attr_keys) self._add_objective_and_variables(nodes_df, edges_df) self._add_continuous_flow_constraints(nodes_df[DEFAULT_ATTR_KEYS.NODE_ID].to_list(), edges_df) diff --git a/src/tracksdata/solvers/_nearest_neighbors_solver.py b/src/tracksdata/solvers/_nearest_neighbors_solver.py index 34011dee..21915290 100644 --- a/src/tracksdata/solvers/_nearest_neighbors_solver.py +++ b/src/tracksdata/solvers/_nearest_neighbors_solver.py @@ -235,7 +235,8 @@ def solve( The graph view of the solution if `return_solution` is True, otherwise None. """ # get edges and sort them by weight - edges_df = graph.edge_attrs(attr_keys=self.edge_weight_expr.columns) + edge_attr_keys = [] if graph.num_edges() == 0 else self.edge_weight_expr.columns + edges_df = graph.edge_attrs(attr_keys=edge_attr_keys) if len(edges_df) == 0: raise ValueError("No edges found in the graph, there is nothing to solve.") diff --git a/src/tracksdata/utils/_dtypes.py b/src/tracksdata/utils/_dtypes.py index 8e671487..05338fa5 100644 --- a/src/tracksdata/utils/_dtypes.py +++ b/src/tracksdata/utils/_dtypes.py @@ -1,5 +1,7 @@ from __future__ import annotations +import base64 +import io from dataclasses import dataclass from typing import Any @@ -202,6 +204,37 @@ def copy(self) -> AttrSchema: """ return AttrSchema(key=self.key, dtype=self.dtype, default_value=self.default_value) + def __eq__(self, other: object) -> bool: + if not isinstance(other, AttrSchema): + return NotImplemented + return ( + self.key == other.key + and self.dtype == other.dtype + and _values_equal(self.default_value, other.default_value) + ) + + +def _values_equal(left: Any, right: Any) -> bool: + if isinstance(left, np.ndarray) and isinstance(right, np.ndarray): + return bool(np.array_equal(left, right)) + if isinstance(left, dict) and isinstance(right, dict): + if left.keys() != right.keys(): + return False + return all(_values_equal(left[k], right[k]) for k in left) + if isinstance(left, list | tuple) and isinstance(right, list | tuple): + if len(left) != len(right): + return False + return all(_values_equal(lv, rv) for lv, rv in zip(left, right, strict=True)) + + try: + value = left == right + except Exception: + return False + + if isinstance(value, np.ndarray): + return bool(np.all(value)) + return bool(value) + def process_attr_key_args( key_or_schema: str | AttrSchema, @@ -445,6 +478,99 @@ def sqlalchemy_type_to_polars_dtype(sa_type: TypeEngine) -> pl.DataType: return pl.Object +def _normalize_default_for_dtype(default_value: Any, dtype: pl.DataType) -> Any: + if isinstance(dtype, pl.Array | pl.List) and isinstance(default_value, np.ndarray): + return default_value.tolist() + return default_value + + +def _normalize_deserialized_default(default_value: Any, dtype: pl.DataType) -> Any: + if isinstance(dtype, pl.Array): + if isinstance(default_value, pl.Series): + default_value = default_value.to_list() + numpy_dtype = polars_dtype_to_numpy_dtype(dtype.inner, allow_sequence=True) + return np.asarray(default_value, dtype=numpy_dtype).reshape(dtype.shape) + + if isinstance(dtype, pl.List): + if isinstance(default_value, pl.Series): + return default_value.to_list() + if isinstance(default_value, np.ndarray): + return default_value.tolist() + + return default_value + + +_ATTR_SCHEMA_VALUE_COL = "__attr_schema_value__" +_ATTR_SCHEMA_FALLBACK_COL = "__attr_schema_fallback__" + + +def serialize_attr_schema(schema: AttrSchema) -> str: + """ + Serialize an AttrSchema into a base64-encoded Arrow IPC payload. + + The primary format stores schema.default_value in the first row of a + single dummy column whose dtype is schema.dtype. This keeps dtype and + default value in one Arrow IPC payload. + """ + normalized_default = _normalize_default_for_dtype(schema.default_value, schema.dtype) + df = pl.DataFrame( + { + _ATTR_SCHEMA_VALUE_COL: pl.Series( + _ATTR_SCHEMA_VALUE_COL, + values=[normalized_default], + dtype=schema.dtype, + ), + } + ) + + buffer = io.BytesIO() + try: + df.write_ipc(buffer) + except Exception: + # Some dtypes (e.g. pl.Object) cannot roundtrip through Arrow IPC schema. + # Store pickled (dtype, default) in the first row of a binary dummy column. + fallback_payload = dumps((schema.dtype, schema.default_value)) + fallback_df = pl.DataFrame( + { + _ATTR_SCHEMA_FALLBACK_COL: pl.Series( + _ATTR_SCHEMA_FALLBACK_COL, + values=[fallback_payload], + dtype=pl.Binary, + ), + } + ) + buffer = io.BytesIO() + fallback_df.write_ipc(buffer) + + return base64.b64encode(buffer.getvalue()).decode("utf-8") + + +def deserialize_attr_schema(encoded_schema: str, *, key: str) -> AttrSchema: + """ + Deserialize an AttrSchema previously encoded by `serialize_attr_schema`. + """ + data = base64.b64decode(encoded_schema) + buffer = io.BytesIO(data) + restored_df = pl.read_ipc(buffer) + + if _ATTR_SCHEMA_VALUE_COL in restored_df.columns: + dtype = restored_df.schema[_ATTR_SCHEMA_VALUE_COL] + default_value = restored_df[_ATTR_SCHEMA_VALUE_COL][0] + elif _ATTR_SCHEMA_FALLBACK_COL in restored_df.columns: + fallback_payload = restored_df[_ATTR_SCHEMA_FALLBACK_COL][0] + if fallback_payload is None: + raise ValueError("Fallback schema payload is missing.") + dtype, default_value = loads(fallback_payload) + else: + raise ValueError("Unrecognized attr schema payload format.") + + if not pl.datatypes.is_polars_dtype(dtype): + raise TypeError(f"Decoded value is not a polars dtype: {type(dtype)}") + + default_value = _normalize_deserialized_default(default_value, dtype) + return AttrSchema(key=key, dtype=dtype, default_value=default_value) + + def validate_default_value_dtype_compatibility(default_value: Any, dtype: pl.DataType) -> None: """ Validate that a default value is compatible with a polars dtype. diff --git a/src/tracksdata/utils/_test/test_dtype_serialization.py b/src/tracksdata/utils/_test/test_dtype_serialization.py new file mode 100644 index 00000000..1f406224 --- /dev/null +++ b/src/tracksdata/utils/_test/test_dtype_serialization.py @@ -0,0 +1,83 @@ +import base64 +import binascii +import io + +import numpy as np +import polars as pl +import pytest + +from tracksdata.utils._dtypes import ( + AttrSchema, + deserialize_attr_schema, + serialize_attr_schema, +) + + +@pytest.mark.parametrize( + "dtype", + [ + pl.Int64, + pl.Float32, + pl.Boolean, + pl.String, + pl.List(pl.Int16), + pl.Array(pl.Float64, 4), + pl.Array(pl.Int32, (2, 3)), + pl.Struct({"x": pl.Int64, "y": pl.List(pl.String)}), + pl.Datetime("us", "UTC"), + ], +) +def test_serialize_deserialize_attr_schema_dtype_roundtrip(dtype: pl.DataType) -> None: + schema = AttrSchema(key="dummy", dtype=dtype) + encoded = serialize_attr_schema(schema) + + assert isinstance(encoded, str) + assert encoded + assert base64.b64decode(encoded) + + restored = deserialize_attr_schema(encoded, key=schema.key) + + assert restored == schema + + +def test_deserialize_attr_schema_invalid_base64_raises() -> None: + with pytest.raises(binascii.Error): + deserialize_attr_schema("not-base64", key="dummy") + + +def test_deserialize_attr_schema_non_ipc_payload_raises() -> None: + encoded = base64.b64encode(b"not-arrow-ipc").decode("utf-8") + + with pytest.raises((OSError, pl.exceptions.PolarsError)): + deserialize_attr_schema(encoded, key="dummy") + + +@pytest.mark.parametrize( + "schema", + [ + AttrSchema(key="score", dtype=pl.Float64, default_value=1.25), + AttrSchema( + key="vector", + dtype=pl.Array(pl.Float32, 3), + default_value=np.array([1.0, 2.0, 3.0], dtype=np.float32), + ), + AttrSchema(key="payload", dtype=pl.Object, default_value={"nested": [1, 2, 3]}), + ], +) +def test_serialize_deserialize_attr_schema_roundtrip(schema: AttrSchema) -> None: + encoded = serialize_attr_schema(schema) + restored = deserialize_attr_schema(encoded, key=schema.key) + assert restored == schema + + +def test_serialize_attr_schema_stores_default_in_dummy_row() -> None: + schema = AttrSchema(key="score", dtype=pl.Float64, default_value=1.25) + encoded = serialize_attr_schema(schema) + + payload = base64.b64decode(encoded) + df = pl.read_ipc(io.BytesIO(payload)) + + assert "__attr_schema_value__" in df.columns + assert df.schema["__attr_schema_value__"] == pl.Float64 + assert df["__attr_schema_value__"][0] == 1.25 + assert "__attr_schema_dtype_pickle__" not in df.columns