diff --git a/.gitmodules b/.gitmodules index 6ff7c4611..06ea934a0 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ [submodule "test-data"] path = light-curve/tests/light-curve-test-data url = https://github.com/light-curve/test-data.git -[submodule "light-curve/tests/prep-models"] - path = light-curve/tests/prep-models - url = https://github.com/light-curve/prep-models.git diff --git a/light-curve/CHANGELOG.md b/light-curve/CHANGELOG.md deleted file mode 100644 index 1cc2f019e..000000000 --- a/light-curve/CHANGELOG.md +++ /dev/null @@ -1,63 +0,0 @@ -# Changelog - -All notable changes to `light-curve-python` will be documented in this file. - -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), -and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - -## [Unreleased] - -### Added - -- New `light_curve.embed` module with ONNX-backed light curve embedding models ([#692](https://github.com/light-curve/light-curve-python/pull/692)): - - `Astromer1` and `Astromer2` — transformer encoders pretrained on MACHO light curves (Donoso-Oliva et al. 2023/2026), returning 256-dimensional embeddings. Load directly from HuggingFace with `Astromer2.from_hf()`. Use the `output` parameter to select which named output to compute (`"mean"` (default), `"max"`, or `"sequence"`); onnxruntime prunes unused computation automatically. - - `NonOverlappingWindows`, `Beginning`, `End`, `RandomSubsample`, `MultipleTimeReductions` — strategies for mapping variable-length light curves to fixed-length model inputs. - - `InputTensors` / `AstromerInputs` — typed dataclass containers for preprocessed tensors. - - `Dim` — enum of axis indices for the 4-D output array `(BAND, SUBSAMPLE, SEQUENCE, VALUE)`. - -### Changed - --- - -### Deprecated - --- - -### Removed - --- - -### Fixed - -- Fixed segmentation fault when using `pyarrow` 24 on macOS ([#694](https://github.com/light-curve/light-curve-python/pull/694)) - -### Security - --- - -## [0.12.0] 2026-03-24 - -### Added - -- `BazinFit`, `LinexpFit`, and `VillarFit` gained `nuts`, `nuts-ceres`, and `nuts-lmsder` algorithms using NUTS (No-U-Turn Sampler) for gradient-based Hamiltonian Monte Carlo optimization. New `nuts_ntune` and `nuts_niter` parameters control the number of tuning and drawing iterations respectively https://github.com/light-curve/light-curve-python/pull/635 - -### Changed - -- Bump `light-curve-feature` to 0.13.0, `rand` to 0.10, `rand_xoshiro` to 0.8 https://github.com/light-curve/light-curve-python/pull/648 -- **Build breaking**: Minimum supported Rust version (MSRV) bumped from 1.85 to 1.88 https://github.com/light-curve/light-curve-python/pull/648 - -### Deprecated - --- - -### Removed - --- - -### Fixed - -- `.many(arrow_array)` now checks for nulls, and raises an error if any nulls presented. Previously we used all the values, even masked, which may cause unexpected results https://github.com/light-curve/light-curve-python/pull/629 - -### Security - --- diff --git a/light-curve/light_curve/embed/__init__.py b/light-curve/light_curve/embed/__init__.py deleted file mode 100644 index a4436ee4e..000000000 --- a/light-curve/light_curve/embed/__init__.py +++ /dev/null @@ -1,32 +0,0 @@ -from .astromer import Astromer1, Astromer2, create_onnx_session -from .model import AstromerInputs, Dim, EmbeddingSession, SingleBandModel -from .time_reduction import ( - Beginning, - End, - InputTensors, - MultipleTimeReductions, - NonOverlappingWindows, - RandomSubsample, - SingleSubsampleTimeReduction, - TimeReduction, - time_reduction_from_str, -) - -__all__ = [ - "Astromer1", - "Astromer2", - "AstromerInputs", - "Beginning", - "Dim", - "EmbeddingSession", - "End", - "InputTensors", - "MultipleTimeReductions", - "NonOverlappingWindows", - "RandomSubsample", - "SingleBandModel", - "SingleSubsampleTimeReduction", - "TimeReduction", - "create_onnx_session", - "time_reduction_from_str", -] diff --git a/light-curve/light_curve/embed/astromer.py b/light-curve/light_curve/embed/astromer.py deleted file mode 100644 index 2ece2f50d..000000000 --- a/light-curve/light_curve/embed/astromer.py +++ /dev/null @@ -1,327 +0,0 @@ -from __future__ import annotations - -from typing import Sequence - -import numpy as np -from numpy.typing import ArrayLike - -from .model import AstromerInputs, SingleBandModel -from .time_reduction import TimeReduction - -_ONNX_INSTALL_HINT = ( - "An ONNX runtime is required to run embedding models. " - "Install the variant that matches your hardware:\n" - " CPU / Apple Silicon: pip install onnxruntime\n" - " NVIDIA GPU (CUDA): pip install onnxruntime-gpu\n" - " Windows DirectML: pip install onnxruntime-directml\n" - "See https://onnxruntime.ai for the full list of packages." -) - - -def create_onnx_session(model_path: str, **kwargs): - """Create an ``onnxruntime.InferenceSession``, with a helpful error if the package is missing. - - Parameters - ---------- - model_path : str - Path to the ONNX model file. - **kwargs - Forwarded verbatim to ``onnxruntime.InferenceSession``. - - Returns - ------- - onnxruntime.InferenceSession - A ready-to-use inference session. - - Raises - ------ - ImportError - If no onnxruntime variant is installed, with installation instructions. - """ - try: - import onnxruntime as ort - except ImportError as exc: - raise ImportError(_ONNX_INSTALL_HINT) from exc - return ort.InferenceSession(model_path, **kwargs) - - -class _AstromerModel(SingleBandModel): - """Internal base class for Astromer-family embedding models. - - Provides shared preprocessing (per-window zero-mean normalisation) and - ONNX inference logic for all Astromer variants. Concrete subclasses set - :attr:`_HF_REPO` and, if needed, override the default ``time_reduction``. - - Output shape - ------------ - :meth:`__call__` always returns a 4-D array - ``(n_bands, n_subsamples, seq_size, embed_dim)`` for consistency across - models and windowing strategies: - - * ``n_bands`` — number of photometric bands; 1 when ``bands`` is ``None``. - * ``n_subsamples`` — windows produced by the time reduction (1 for - :class:`NonOverlappingWindows`, which averages all windows). - * ``seq_size`` — 1 for the ``"mean"`` and ``"max"`` outputs (aggregated - over the sequence); equal to the model's sequence length for ``"sequence"``. - * ``embed_dim`` — embedding dimension (256 for all Astromer models). - - Use ``embedding.squeeze()`` to collapse unit dimensions when the full - 4-D layout is not needed. - """ - - seq_size: int = 200 - dtype: type = np.float32 - _HF_REPO: str - _OUTPUTS: frozenset[str] = frozenset({"mean", "max", "sequence"}) - - def __init__( - self, - session, - *, - output: str = "mean", - bands: Sequence[str | int] | None = None, - time_reduction: str | list[str] | TimeReduction = "non-overlapping-windows", - time_red_kwargs: dict[str, object] | None = None, - ) -> None: - super().__init__( - session, - bands=bands, - time_reduction=time_reduction, - time_red_kwargs=time_red_kwargs, - ) - if output not in self._OUTPUTS: - raise ValueError(f"Unknown output '{output}'. Must be one of: {', '.join(sorted(self._OUTPUTS))}") - self.output = output - - @classmethod - def from_hf( - cls, - output: str = "mean", - *, - bands: Sequence[str | int] | None = None, - time_reduction: str | list[str] | TimeReduction = "non-overlapping-windows", - time_red_kwargs: dict[str, object] | None = None, - providers=None, - sess_options=None, - ) -> "_AstromerModel": - """Load a model from the HuggingFace Hub. - - Downloads (and caches) the ONNX model file, creates an - ``onnxruntime.InferenceSession``, and returns a ready-to-use instance. - Only the requested output is computed at inference time — onnxruntime - prunes the unused computation graph automatically. - - Parameters - ---------- - output : str, optional - Named ONNX output to return. One of: - - * ``"mean"`` (default) — masked mean pooling over valid timesteps, - output shape ``(batch, 256)`` - * ``"max"`` — masked max pooling over valid timesteps, - output shape ``(batch, 256)`` - * ``"sequence"`` — per-timestep embeddings (no aggregation), - output shape ``(batch, seq_size, 256)`` - - bands : sequence of str or int, optional - Ordered band labels to embed. ``None`` (default) treats the whole - light curve as one band. - time_reduction : str, list of str, or TimeReduction, optional - Windowing / subsampling strategy. Defaults to - ``"non-overlapping-windows"``. - time_red_kwargs : dict, optional - Extra keyword arguments forwarded to :func:`time_reduction_from_str` - when ``time_reduction`` is given as a string. - providers : list of str, optional - ONNX Runtime execution providers, e.g. - ``["CUDAExecutionProvider", "CPUExecutionProvider"]``. - sess_options : onnxruntime.SessionOptions, optional - Advanced session configuration passed directly to - ``onnxruntime.InferenceSession``. - - Returns - ------- - instance of the calling class - Instance with a live ONNX inference session. - - Raises - ------ - ValueError - If ``output`` is not one of the recognised output names. - ImportError - If ``huggingface_hub`` is not installed, with instructions to - install it or to download the model file manually. - ImportError - If no ``onnxruntime`` variant is installed, with hardware-specific - installation instructions. - """ - if output not in cls._OUTPUTS: - raise ValueError(f"Unknown output '{output}'. Must be one of: {', '.join(sorted(cls._OUTPUTS))}") - - model_prefix = cls._HF_REPO.split("/")[-1] - filename = f"{model_prefix}.onnx" - - try: - from huggingface_hub import hf_hub_download - except ImportError as exc: - hf_url = f"https://huggingface.co/{cls._HF_REPO}/resolve/main/{filename}" - raise ImportError( - "huggingface_hub is required to download models from HuggingFace.\n" - "Install it with:\n" - " pip install huggingface-hub\n" - "Or download the model file directly:\n" - f" {hf_url}\n" - "then load it with:\n" - " import onnxruntime as ort\n" - f' {cls.__name__}(session=ort.InferenceSession("/path/to/{filename}"), output="{output}")' - ) from exc - - model_path = hf_hub_download(repo_id=cls._HF_REPO, filename=filename) - - session_kwargs = {} - if providers is not None: - session_kwargs["providers"] = providers - if sess_options is not None: - session_kwargs["sess_options"] = sess_options - - session = create_onnx_session(model_path, **session_kwargs) - return cls( - session=session, - output=output, - bands=bands, - time_reduction=time_reduction, - time_red_kwargs=time_red_kwargs, - ) - - def preprocess_lc( - self, - time: ArrayLike, - mag: ArrayLike, - ) -> AstromerInputs: - """Preprocess a light curve into Astromer model input tensors. - - Each window is independently zero-mean normalised (time and magnitude) - using only the valid (non-padded) observations. Normalisation is - computed in the original input precision before casting to ``float32`` - to avoid precision loss for large values such as MJD timestamps. - - Parameters - ---------- - time : array-like, shape ``(n,)`` - Observation times (e.g. MJD). - mag : array-like, shape ``(n,)`` - Magnitudes. - - Returns - ------- - AstromerInputs - ``times`` and ``input`` are per-window zero-mean arrays of shape - ``(n_windows, seq_size, 1)`` in float32; ``mask`` is 1 for valid - observations and 0 for zero-padded positions, same shape and dtype. - """ - time, mag, mask = self.time_reduction.preprocess_lc(time, mag, seq_size=self.seq_size) - - bool_mask = mask # (n_windows, seq_size), boolean - n_valid = mask.sum(axis=-1, keepdims=True) - time_mean = (time * mask).sum(axis=-1, keepdims=True) / n_valid - mag_mean = (mag * mask).sum(axis=-1, keepdims=True) / n_valid - time = np.where(mask, time - time_mean, time).astype(self.dtype) - mag = np.where(mask, mag - mag_mean, mag).astype(self.dtype) - mask = mask.astype(self.dtype) - - idx = (..., np.newaxis) - return AstromerInputs(times=time[idx], input=mag[idx], mask=mask[idx], bool_mask=bool_mask) - - def predict_tensors(self, tensors: AstromerInputs) -> np.ndarray: - """Run the ONNX model on pre-processed tensors and return reduced embeddings. - - Parameters - ---------- - tensors : AstromerInputs - As returned by :meth:`preprocess_lc`. - - Returns - ------- - np.ndarray, shape ``(n_subsamples, seq_size, embed_dim)`` - Embeddings after applying the time reduction's aggregation. - For aggregated models (mean / max) ``seq_size`` is 1. - """ - (raw_embedding,) = self.session.run( - [self.output], - {"input": tensors.input, "times": tensors.times, "mask_in": tensors.mask}, - ) - - # Aggregated outputs (mean / max) have shape (n_windows, embed_dim); add SEQUENCE=1 axis - if raw_embedding.ndim == 2: - raw_embedding = np.expand_dims(raw_embedding, axis=1) - - return self.time_reduction.reduce_embeddings(raw_embedding, tensors, output=self.output) - - -class Astromer1(_AstromerModel): - """Astromer 1 embedding model (Donoso-Oliva et al. 2023). - - Transformer encoder pretrained on MACHO R-band light curves via masked - magnitude prediction. Accepts irregularly-sampled single-band photometry - and returns a 256-dimensional embedding (2 layers, 4 attention heads). - - The ONNX model is hosted on HuggingFace at - ``https://huggingface.co/light-curve/astromer1`` (``astromer1.onnx``). - Three named outputs are available; select with the ``output`` parameter: - - * ``"mean"`` (default) — masked mean pooling → shape ``(batch, 256)`` - * ``"max"`` — masked max pooling → shape ``(batch, 256)`` - * ``"sequence"`` — per-timestep features → shape ``(batch, 200, 256)`` - - Use :meth:`from_hf` to download and load the model directly. - - Parameters - ---------- - session : - ONNX inference session for the Astromer 1 model file. - output : str, optional - Which named output to return: ``"mean"``, ``"max"``, or ``"sequence"``. - Defaults to ``"mean"``. - bands : sequence of str or int, optional - Band labels. ``None`` (default) treats the whole light curve as one - band. - time_reduction : str, list of str, or TimeReduction - Windowing strategy. Defaults to :class:`NonOverlappingWindows`. - """ - - _HF_REPO = "light-curve/astromer1" - - -class Astromer2(_AstromerModel): - """Astromer 2 embedding model (Donoso-Oliva et al. 2026). - - Pretrained on 1.5 million MACHO light curves. Accepts irregularly-sampled - single-band photometry and returns a 256-dimensional embedding. - - The ONNX model is hosted on HuggingFace at - ``https://huggingface.co/light-curve/astromer2`` (``astromer2.onnx``). - Three named outputs are available; select with the ``output`` parameter: - - * ``"mean"`` (default) — masked mean pooling → shape ``(batch, 256)`` - * ``"max"`` — masked max pooling → shape ``(batch, 256)`` - * ``"sequence"`` — per-timestep features → shape ``(batch, 200, 256)`` - - Use :meth:`from_hf` to download and load the model directly. - - Parameters - ---------- - session : - ONNX inference session for the Astromer 2 model file. - output : str, optional - Which named output to return: ``"mean"``, ``"max"``, or ``"sequence"``. - Defaults to ``"mean"``. - bands : sequence of str or int, optional - Band labels. ``None`` (default) treats the whole light curve as one - band. - time_reduction : str, list of str, or TimeReduction - Windowing strategy. Defaults to :class:`NonOverlappingWindows`, which - matches the sequential-window preprocessing used to produce the reference - embeddings on HuggingFace. - """ - - _HF_REPO = "light-curve/astromer2" diff --git a/light-curve/light_curve/embed/model.py b/light-curve/light_curve/embed/model.py deleted file mode 100644 index 638730029..000000000 --- a/light-curve/light_curve/embed/model.py +++ /dev/null @@ -1,187 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from dataclasses import dataclass, field -from enum import IntEnum -from typing import Sequence - -import numpy as np -from numpy.typing import ArrayLike - -from .time_reduction import InputTensors, TimeReduction, time_reduction_from_str - - -@dataclass -class AstromerInputs(InputTensors): - """Input tensors for Astromer-family models. - - ``times`` and ``input`` are float32 arrays of shape - ``(n_windows, seq_size, 1)`` ready for ONNX inference. ``mask`` is the - same validity information cast to float32 for the model (1 = valid, - 0 = padded), shape ``(n_windows, seq_size, 1)``. ``bool_mask`` (inherited) - is the boolean equivalent, shape ``(n_windows, seq_size)``. - """ - - times: np.ndarray = field(kw_only=True) - input: np.ndarray = field(kw_only=True) - mask: np.ndarray = field(kw_only=True) - - -class Dim(IntEnum): - """Axis indices for the 4-D embedding array ``(BAND, SUBSAMPLE, SEQUENCE, VALUE)``.""" - - BAND = 0 - SUBSAMPLE = 1 - SEQUENCE = 2 - VALUE = 3 - - -class EmbeddingSession(ABC): - """Abstract base for ONNX-backed embedding models. - - Subclasses implement :meth:`preprocess_lc` (convert raw arrays to model - tensors) and :meth:`predict_tensors` (run the session and return embeddings). - - Parameters - ---------- - session : - An ``onnxruntime.InferenceSession`` (or any object with a compatible - ``.run()`` interface). - time_reduction : str, list of str, or TimeReduction - Strategy for mapping variable-length light curves to fixed-length - sequences. - time_red_kwargs : dict, optional - Extra keyword arguments forwarded to :func:`time_reduction_from_str` - when ``time_reduction`` is given as a string. - """ - - seq_size: int - - def __init__( - self, - session, - *, - time_reduction: str | list[str] | TimeReduction, - time_red_kwargs: dict[str, object] | None = None, - ) -> None: - self.session = session - - if time_red_kwargs is None: - time_red_kwargs = {} - if not isinstance(time_reduction, TimeReduction): - time_reduction = time_reduction_from_str(time_reduction, **time_red_kwargs) - self.time_reduction = time_reduction - - @abstractmethod - def __call__(self, *arrays: ArrayLike) -> np.ndarray: - inputs = self.preprocess_lc(*arrays) - embedding = self.predict_tensors(inputs) - return embedding - - @abstractmethod - def preprocess_lc(self, *arrays: ArrayLike) -> InputTensors: - """Convert raw light curve arrays to model input tensors. - - Parameters - ---------- - *arrays : array-like - Raw light curve arrays (e.g. time, magnitude). - - Returns - ------- - InputTensors - Tensors ready to be passed to :meth:`predict_tensors`. - """ - raise NotImplementedError - - @abstractmethod - def predict_tensors(self, tensors: InputTensors) -> np.ndarray: - """Run the ONNX session on pre-processed tensors and return embeddings. - - Parameters - ---------- - tensors : InputTensors - Pre-processed model inputs as returned by :meth:`preprocess_lc`. - - Returns - ------- - np.ndarray - Embedding array with shape depending on the model and time reduction. - """ - raise NotImplementedError - - -class SingleBandModel(EmbeddingSession, ABC): - """Embedding model that processes one photometric band at a time. - - When ``bands`` is ``None`` the full light curve is treated as a single band. - When ``bands`` is provided, the light curve is split by band label, each band - is embedded independently, and the results are concatenated along - :attr:`Dim.BAND`. - - Parameters - ---------- - session : - ONNX inference session. - bands : sequence of str or int, optional - Ordered band labels to embed. ``None`` treats the whole light curve as - one band. - time_reduction : str, list of str, or TimeReduction - Windowing / subsampling strategy. Defaults to - ``"non-overlapping-windows"``. - time_red_kwargs : dict, optional - Extra kwargs forwarded to :func:`time_reduction_from_str`. - """ - - def __init__( - self, - session, - *, - bands: Sequence[str | int] | None = None, - time_reduction: str | list[str] | TimeReduction = "non-overlapping-windows", - time_red_kwargs: dict[str, object] | None = None, - ) -> None: - super().__init__( - session, - time_reduction=time_reduction, - time_red_kwargs=time_red_kwargs, - ) - self.bands = bands - - def __call__(self, time: ArrayLike, mag: ArrayLike, band: ArrayLike | None = None) -> np.ndarray: - """Embed a light curve. - - Parameters - ---------- - time : array-like, shape ``(n,)`` - Observation times. - mag : array-like, shape ``(n,)`` - Magnitudes. - band : array-like, shape ``(n,)``, optional - Band labels, required when ``self.bands`` is not ``None``. - - Returns - ------- - np.ndarray, shape ``(n_bands, n_subsamples, seq_size, embed_dim)`` - Embedding tensor. ``n_bands`` is 1 when ``self.bands`` is ``None``. - - Raises - ------ - ValueError - If ``band`` is provided but ``self.bands`` is ``None``, or vice versa. - """ - if (band is None) != (self.bands is None): - raise ValueError( - f"band array must be provided with values specified by `bands` class constructor argument: {self.bands}" - ) - - if self.bands is None: - embed = super().__call__(time, mag) - return np.expand_dims(embed, axis=Dim.BAND) - - embeddings = [] - for band_name in self.bands: - band_idx = band == band_name - embed_band = super().__call__(time[band_idx], mag[band_idx]) - embeddings.append(np.expand_dims(embed_band, axis=Dim.BAND)) - return np.concatenate(embeddings, axis=Dim.BAND) diff --git a/light-curve/light_curve/embed/time_reduction.py b/light-curve/light_curve/embed/time_reduction.py deleted file mode 100644 index 88c2a5df5..000000000 --- a/light-curve/light_curve/embed/time_reduction.py +++ /dev/null @@ -1,478 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from dataclasses import dataclass, field -from typing import TYPE_CHECKING - -import numpy as np -from numpy.typing import ArrayLike - -if TYPE_CHECKING: - from typing import Self - - -@dataclass -class InputTensors: - """Base tensor container produced by :meth:`EmbeddingSession.preprocess_lc`. - - All subclasses carry a boolean ``bool_mask`` identifying valid (non-padded) - timesteps, shape ``(n_windows, seq_size)``. - """ - - bool_mask: np.ndarray = field(kw_only=True) - - -class TimeReduction(ABC): - """Abstract base for strategies that map a variable-length light curve to fixed-length sequences. - - Subclasses implement :meth:`subsample_lc` (windowing / subsampling logic) and - :meth:`reduce_embeddings` (how per-window embeddings are aggregated back to one - embedding per light curve). - """ - - def _mask(self, *, array_size: int, seq_size: int) -> np.ndarray: - padding_size = max(seq_size - array_size, 0) - values = np.ones(array_size, dtype=bool) - padding = np.zeros(padding_size, dtype=bool) - return np.r_[values, padding] - - def _pad(self, array: np.ndarray, seq_size: int) -> np.ndarray: - if array.size >= seq_size: - return array[:seq_size] - return np.r_[array, np.zeros(seq_size - array.size, dtype=array.dtype)] - - def preprocess_lc(self, *arrays: ArrayLike, seq_size: int) -> tuple[np.ndarray, ...]: - """Subsample, pad, and mask a light curve. - - Parameters - ---------- - *arrays : array-like - 1-D arrays of equal length (e.g. time, magnitude). - seq_size : int - Target sequence length; shorter windows are zero-padded. - - Returns - ------- - tuple of np.ndarray - One stacked array per input plus a boolean mask, each of shape - ``(n_subsamples, seq_size)``. The mask is ``True`` for real - observations and ``False`` for padding. - - Raises - ------ - ValueError - If any input array is not 1-D. - """ - arrays = np.broadcast_arrays(*arrays) - if arrays[0].ndim != 1: - raise ValueError(f"Inputs must be single dimensional, {arrays[0].ndim} is provided") - - subsamples = self.subsample_lc(*arrays, seq_size=seq_size) - sequences = [] - for subsample in subsamples: - mask = self._mask(array_size=subsample[0].size, seq_size=seq_size) - sequence = tuple(self._pad(array, seq_size) for array in subsample) + (mask,) - sequences.append(sequence) - - sequences = tuple(np.stack(arrays) for arrays in zip(*sequences)) - - return sequences - - @abstractmethod - def subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> list[tuple[np.ndarray, ...]]: - """Split or subsample the light curve into one or more windows. - - Parameters - ---------- - *arrays : np.ndarray - 1-D arrays of equal length. - seq_size : int - Maximum number of observations per window. - - Returns - ------- - list of tuple of np.ndarray - Each element is a tuple of arrays (one per input), containing at most - ``seq_size`` observations. - """ - raise NotImplementedError - - @abstractmethod - def reduce_embeddings(self, embeddings: ArrayLike, tensors: InputTensors, *, output: str) -> ArrayLike: - """Aggregate per-window embeddings into a single embedding per light curve. - - Parameters - ---------- - embeddings : array-like, shape ``(n_windows, seq_size, embed_dim)`` - Raw per-window embeddings from the model. ``seq_size`` is 1 for - aggregated (mean / max) outputs and the full sequence length for the - ``"sequence"`` output. - tensors : InputTensors - Preprocessed input tensors as returned by - :meth:`EmbeddingSession.preprocess_lc`. - output : str - The model output name (e.g. ``"mean"``, ``"max"``, ``"sequence"``). - Implementations use this to select the appropriate aggregation strategy - rather than inferring it from array shapes. - - Returns - ------- - array-like - Reduced embeddings, shape depends on the concrete implementation. - """ - raise NotImplementedError - - -class SingleSubsampleTimeReduction(TimeReduction, ABC): - """Base for strategies that produce exactly one window per light curve.""" - - def subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> list[tuple[np.ndarray, ...]]: - """Return a single-element list wrapping :meth:`single_subsample_lc`. - - Parameters - ---------- - *arrays : np.ndarray - 1-D arrays of equal length. - seq_size : int - Maximum observations per window. - - Returns - ------- - list of tuple of np.ndarray - A one-element list containing the subsampled arrays. - """ - return [self.single_subsample_lc(*arrays, seq_size=seq_size)] - - @abstractmethod - def single_subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> tuple[np.ndarray, ...]: - """Return one subsampled window of at most ``seq_size`` observations. - - Parameters - ---------- - *arrays : np.ndarray - 1-D arrays of equal length. - seq_size : int - Maximum number of observations to keep. - - Returns - ------- - tuple of np.ndarray - Subsampled arrays, each of length ``<= seq_size``. - """ - raise NotImplementedError - - def reduce_embeddings(self, embeddings: ArrayLike, tensors: InputTensors, *, output: str) -> ArrayLike: - """Return embeddings unchanged (single window — no aggregation needed). - - Parameters - ---------- - embeddings : array-like - Per-window embeddings from the model. - tensors : InputTensors - Unused; accepted for interface compatibility. - output : str - Unused; accepted for interface compatibility. - - Returns - ------- - array-like - The input unchanged. - """ - return embeddings - - -class Beginning(SingleSubsampleTimeReduction): - """Select the chronologically first ``seq_size`` observations of the light curve.""" - - def single_subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> tuple[np.ndarray, ...]: - """Return the leading ``seq_size`` elements of each array. - - Parameters - ---------- - *arrays : np.ndarray - 1-D arrays of equal length. - seq_size : int - Number of observations to keep from the start. - - Returns - ------- - tuple of np.ndarray - First ``seq_size`` elements of each input array. - """ - return tuple(array[:seq_size] for array in arrays) - - -class End(SingleSubsampleTimeReduction): - """Select the chronologically last ``seq_size`` observations of the light curve.""" - - def single_subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> tuple[np.ndarray, ...]: - """Return the trailing ``seq_size`` elements of each array. - - Parameters - ---------- - *arrays : np.ndarray - 1-D arrays of equal length. - seq_size : int - Number of observations to keep from the end. - - Returns - ------- - tuple of np.ndarray - Last ``seq_size`` elements of each input array. - """ - return tuple(array[-seq_size:] for array in arrays) - - -class RandomSubsample(SingleSubsampleTimeReduction): - """Draw ``seq_size`` observations uniformly at random without replacement. - - Parameters - ---------- - rng : int, np.random.Generator, or None - Seed or generator for reproducible sampling. - """ - - def __init__(self, *, rng: int | np.random.Generator | None) -> None: - super().__init__() - self.rng = np.random.default_rng(rng) - - def single_subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> tuple[np.ndarray, ...]: - """Return a random subsample of at most ``seq_size`` observations, in original order. - - Parameters - ---------- - *arrays : np.ndarray - 1-D arrays of equal length. - seq_size : int - Maximum number of observations to sample. - - Returns - ------- - tuple of np.ndarray - ``min(len, seq_size)`` randomly selected observations, sorted by - original index so temporal order is preserved. - """ - array_size = arrays[0].size - if array_size <= seq_size: - return arrays - - indices = self.rng.choice(array_size, size=seq_size, replace=False) - indices.sort() - return tuple(array[indices] for array in arrays) - - -class NonOverlappingWindows(TimeReduction): - """Split the light curve into consecutive non-overlapping windows of ``seq_size`` observations. - - A light curve of length *L* yields ``ceil(L / seq_size)`` windows; the last window - may be shorter than ``seq_size`` and is zero-padded. Per-window embeddings are - averaged to produce a single embedding per light curve. - """ - - def subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> list[tuple[np.ndarray, ...]]: - """Yield consecutive slices of length ``seq_size``. - - Parameters - ---------- - *arrays : np.ndarray - 1-D arrays of equal length. - seq_size : int - Window size. - - Returns - ------- - list of tuple of np.ndarray - ``ceil(len / seq_size)`` windows, each a tuple of sliced arrays. - """ - array_size = arrays[0].size - - subsamples = [] - for start in range(0, array_size, seq_size): - end = start + seq_size - subsample = tuple(array[start:end] for array in arrays) - subsamples.append(subsample) - - return subsamples - - def reduce_embeddings(self, embeddings: np.ndarray, tensors: InputTensors, *, output: str) -> np.ndarray: - """Reduce per-window embeddings to a single representation. - - For aggregated outputs (``output != "sequence"``) the window embeddings - are averaged, yielding shape ``(1, 1, embed_dim)``. - - For ``output == "sequence"`` a masked mean is computed across windows - for each timestep position, yielding shape ``(1, seq_size, embed_dim)`` - regardless of how many windows the light curve was split into. - - Parameters - ---------- - embeddings : np.ndarray, shape ``(n_windows, seq_size, embed_dim)`` - Per-window embeddings. - tensors : InputTensors - Preprocessed input tensors; ``tensors.bool_mask`` (shape - ``(n_windows, seq_size)``) identifies valid vs. padded positions - for the ``"sequence"`` output. - output : str - Model output name. Determines aggregation strategy. - - Returns - ------- - np.ndarray, shape ``(1, 1, embed_dim)`` or ``(1, seq_size, embed_dim)`` - For mean / max: mean over windows, shape ``(1, 1, embed_dim)``. - For sequence: masked mean over windows per timestep, shape - ``(1, seq_size, embed_dim)``. - """ - if output != "sequence": - return np.mean(embeddings, axis=0)[np.newaxis, ...] - # sequence output: masked mean over windows per timestep → (1, seq_size, embed_dim) - n_valid = np.maximum(tensors.bool_mask.sum(axis=0), 1) # (seq_size,) - window_sum = (embeddings * tensors.bool_mask[:, :, np.newaxis]).sum(axis=0) # (seq_size, embed_dim) - return (window_sum / n_valid[:, np.newaxis])[np.newaxis, ...] - - -class MultipleTimeReductions(TimeReduction): - """Apply several :class:`SingleSubsampleTimeReduction` strategies in parallel. - - Each strategy produces one window; embeddings are stacked along the subsample - axis rather than aggregated, giving one embedding per strategy. - - Parameters - ---------- - reductions : list of SingleSubsampleTimeReduction - Ordered list of strategies to apply. - - Raises - ------ - ValueError - If any element of ``reductions`` is not a - :class:`SingleSubsampleTimeReduction`. - """ - - def __init__( - self, - reductions: list[SingleSubsampleTimeReduction], - ) -> None: - super().__init__() - for r in reductions: - if not isinstance(r, SingleSubsampleTimeReduction): - raise ValueError( - f"Time reduction '{r}' is not a subsampling time reduction; " - "currently only subsampling time reductions can be used in multiple time reductions" - ) - self.reductions = reductions - - @classmethod - def from_strings( - cls, - reductions: list[str], - **kwargs, - ) -> Self: - """Construct from a list of strategy name strings. - - Parameters - ---------- - reductions : list of str - Strategy names recognised by :func:`time_reduction_from_str`. - **kwargs - Forwarded to each strategy constructor. If ``rng`` is an integer - seed it is converted to a :class:`numpy.random.Generator` first so - that each stochastic strategy gets an independent random stream. - - Returns - ------- - MultipleTimeReductions - Instance wrapping the instantiated strategies. - """ - # In the cases where rng is an integer (seed), we should convert to a Generator first, so we don't reuse the - # same seed across multiple reductions. Reusing the same rng is ok: each call mutates it. - if "rng" in kwargs: - kwargs["rng"] = np.random.default_rng(kwargs["rng"]) - return cls( - reductions=[time_reduction_from_str(s, **kwargs) for s in reductions], - ) - - def subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> list[tuple[np.ndarray, ...]]: - """Apply each strategy and return one window per strategy. - - Parameters - ---------- - *arrays : np.ndarray - 1-D arrays of equal length. - seq_size : int - Maximum observations per window. - - Returns - ------- - list of tuple of np.ndarray - One element per strategy, each a tuple of subsampled arrays. - """ - subsamples = [] - for reduction in self.reductions: - subsamples.append(reduction.single_subsample_lc(*arrays, seq_size=seq_size)) - return subsamples - - def reduce_embeddings(self, embeddings: ArrayLike, tensors: InputTensors, *, output: str) -> ArrayLike: - """Return embeddings unchanged — one per strategy, already stacked. - - Parameters - ---------- - embeddings : array-like - Per-strategy embeddings from the model. - tensors : InputTensors - Unused; accepted for interface compatibility. - output : str - Unused; accepted for interface compatibility. - - Returns - ------- - array-like - The input unchanged. - """ - return embeddings - - -def time_reduction_from_str(s: str | list[str], **kwargs) -> TimeReduction: - """Instantiate a :class:`TimeReduction` from a name string or list of name strings. - - Parameters - ---------- - s : str or list of str - Strategy name or list of names. Recognised values (case-insensitive, - underscores treated as hyphens): ``"beginning"``, ``"end"``, - ``"random-subsample"``, ``"non-overlapping-windows"``. A list with more - than one entry produces a :class:`MultipleTimeReductions`. - **kwargs - Forwarded to the strategy constructor (e.g. ``rng`` for - :class:`RandomSubsample`). - - Returns - ------- - TimeReduction - The instantiated strategy. - - Raises - ------ - ValueError - If the name is not recognised, or if ``rng`` is missing for - ``"random-subsample"``. - """ - if isinstance(s, list): - if len(s) != 1: - return MultipleTimeReductions.from_strings(s, **kwargs) - s = s[0] - - match s.lower().replace("_", "-"): - case "beginning": - return Beginning() - case "end": - return End() - case "random-subsample": - try: - rng = kwargs["rng"] - except KeyError: - raise ValueError("rng must be provided for random subsample time reduction") - return RandomSubsample(rng=rng) - case "non-overlapping-windows": - return NonOverlappingWindows() - case _: - raise ValueError(f"Unknown time reduction '{s}'") diff --git a/light-curve/tests/prep-models b/light-curve/tests/prep-models deleted file mode 160000 index 5ca55e355..000000000 --- a/light-curve/tests/prep-models +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 5ca55e355fce2042ad19272b82c8819e03617e7e diff --git a/light-curve/tests/test_embed.py b/light-curve/tests/test_embed.py deleted file mode 100644 index 0247e2eef..000000000 --- a/light-curve/tests/test_embed.py +++ /dev/null @@ -1,219 +0,0 @@ -"""Tests for light_curve.embed — Astromer1 and Astromer2 embedding models. - -ONNX models are downloaded from HuggingFace via huggingface_hub (cached in -the HF default cache directory). Expected embeddings come from the prep-models -submodule at ``tests/prep-models/``. -""" - -from dataclasses import dataclass -from pathlib import Path - -import numpy as np -import pyarrow.parquet as pq -import pytest -from huggingface_hub import hf_hub_download - -PREP_MODELS_DIR = Path(__file__).parent / "prep-models" / "models" - - -@dataclass(frozen=True) -class AstromerTestConfig: - model_name: str - repo_id: str - test_parquet: Path - # time reduction used when generating the reference parquet - ref_time_reduction: str - # Astromer1 has larger TF→ONNX drift (~0.98) than Astromer2 (~0.999) - cos_sim_threshold: float - - -ASTROMER_CONFIGS = [ - AstromerTestConfig( - model_name="Astromer1", - repo_id="light-curve/astromer1", - test_parquet=PREP_MODELS_DIR / "astromer1" / "out" / "test-data" / "astromer1_test.parquet", - ref_time_reduction="beginning", - cos_sim_threshold=0.97, - ), - AstromerTestConfig( - model_name="Astromer2", - repo_id="light-curve/astromer2", - test_parquet=PREP_MODELS_DIR / "astromer2" / "out" / "test-data" / "astromer2_test.parquet", - ref_time_reduction="non-overlapping-windows", - cos_sim_threshold=0.999, - ), -] - - -# --------------------------------------------------------------------------- -# Shared fixtures (parametrized over both models) -# --------------------------------------------------------------------------- - - -@pytest.fixture( - scope="session", - params=ASTROMER_CONFIGS, - ids=[c.model_name.lower() for c in ASTROMER_CONFIGS], -) -def astromer_config(request) -> AstromerTestConfig: - return request.param - - -@pytest.fixture(scope="session") -def astromer_session(astromer_config): - import onnxruntime as ort - - prefix = astromer_config.repo_id.split("/")[-1] - path = hf_hub_download(repo_id=astromer_config.repo_id, filename=f"{prefix}.onnx") - return ort.InferenceSession(str(path)) - - -@pytest.fixture(scope="session") -def astromer_test_table(astromer_config): - return pq.read_table(astromer_config.test_parquet).to_pydict() - - -# --------------------------------------------------------------------------- -# Preprocessing tests (no ONNX session needed) -# --------------------------------------------------------------------------- - - -def test_non_overlapping_windows_short_lc(): - """LC shorter than seq_size produces one window.""" - from light_curve.embed import NonOverlappingWindows - - tr = NonOverlappingWindows() - time = np.arange(50, dtype=float) - mag = np.ones(50) - subsamples = tr.subsample_lc(time, mag, seq_size=200) - assert len(subsamples) == 1 - assert subsamples[0][0].shape == (50,) - - -def test_non_overlapping_windows_long_lc(): - """LC of length 400 with seq_size=200 gives 2 windows.""" - from light_curve.embed import NonOverlappingWindows - - tr = NonOverlappingWindows() - time = np.arange(400, dtype=float) - mag = np.ones(400) - subsamples = tr.subsample_lc(time, mag, seq_size=200) - assert len(subsamples) == 2 - - -def test_preprocess_lc_shape(): - """preprocess_lc pads/clips to seq_size and returns mask.""" - from light_curve.embed import NonOverlappingWindows - - tr = NonOverlappingWindows() - time = np.arange(50, dtype=float) - mag = np.ones(50) - time_out, mag_out, mask = tr.preprocess_lc(time, mag, seq_size=200) - assert time_out.shape == (1, 200) - assert mag_out.shape == (1, 200) - assert mask.shape == (1, 200) - assert mask[0, :50].all() - assert not mask[0, 50:].any() - - -@pytest.mark.parametrize( - "model_class_name", - ["Astromer1", "Astromer2"], -) -def test_astromer_preprocess_normalises(model_class_name): - """preprocess_lc produces zero-mean time and mag for each window.""" - import light_curve.embed as lce - - model_class = getattr(lce, model_class_name) - model = model_class(session=None, time_reduction="non-overlapping-windows") - time = np.linspace(0, 100, 200) - mag = np.linspace(10, 15, 200) - tensors = model.preprocess_lc(time, mag) - valid = tensors.bool_mask[0] - assert abs(tensors.times[0, valid, 0].mean()) < 1e-5 - assert abs(tensors.input[0, valid, 0].mean()) < 1e-5 - - -# --------------------------------------------------------------------------- -# End-to-end tests (parametrized over Astromer1 / Astromer2 via fixture) -# --------------------------------------------------------------------------- - - -def test_non_overlapping_windows_sequence_output_long_lc(astromer_config, astromer_session): - """NonOverlappingWindows + 'sequence' output yields consistent (1,1,200,256) shape.""" - import light_curve.embed as lce - - model_class = getattr(lce, astromer_config.model_name) - model = model_class(session=astromer_session, output="sequence", time_reduction="non-overlapping-windows") - - # 350 obs → 2 windows: 200 valid + 150 valid (+50 padded) - time = np.linspace(0, 1000, 350) - mag = np.ones(350) - embedding = model(time, mag) - - # shape: (BAND=1, SUBSAMPLE=1, SEQUENCE=200, VALUE=256) — masked mean over windows per timestep - assert embedding.shape == (1, 1, 200, 256) - assert np.all(np.isfinite(embedding)) - - -def test_from_hf_shape(astromer_config): - """from_hf() returns a working model with correct output shape.""" - import light_curve.embed as lce - - model_class = getattr(lce, astromer_config.model_name) - model = model_class.from_hf(output="mean") - time = np.linspace(0, 100, 50) - mag = np.ones(50) - embedding = model(time, mag) - assert embedding.shape == (1, 1, 1, 256) - assert np.all(np.isfinite(embedding)) - - -def test_from_hf_matches_session(astromer_config, astromer_session, astromer_test_table): - """from_hf() and a manually created session produce identical embeddings.""" - import light_curve.embed as lce - - model_class = getattr(lce, astromer_config.model_name) - lc = astromer_test_table["lightcurve"][0] - time = np.array([obs["mjd"] for obs in lc]) - mag = np.array([obs["mag"] for obs in lc]) - - model_hf = model_class.from_hf(output="mean") - model_manual = model_class(session=astromer_session, output="mean") - - emb_hf = model_hf(time, mag) - emb_manual = model_manual(time, mag) - np.testing.assert_array_equal(emb_hf, emb_manual) - - -def test_from_hf_invalid_output(astromer_config): - """from_hf() raises ValueError for unknown output names.""" - import light_curve.embed as lce - - model_class = getattr(lce, astromer_config.model_name) - with pytest.raises(ValueError, match="Unknown output"): - model_class.from_hf(output="nonexistent") - - -@pytest.mark.parametrize("row_idx", range(10)) -def test_mean_matches_reference(astromer_config, astromer_session, astromer_test_table, row_idx): - """Mean-pooling output matches the reference embedding from the parquet.""" - import light_curve.embed as lce - - model_class = getattr(lce, astromer_config.model_name) - lc = astromer_test_table["lightcurve"][row_idx] - time = np.array([obs["mjd"] for obs in lc]) - mag = np.array([obs["mag"] for obs in lc]) - expected = np.array(astromer_test_table["embedding_mean"][row_idx]) - - model = model_class(session=astromer_session, output="mean", time_reduction=astromer_config.ref_time_reduction) - embedding = model(time, mag) - - assert embedding.shape == (1, 1, 1, 256) - assert np.all(np.isfinite(embedding)) - emb_vec = embedding[0, 0, 0].astype(np.float64) - ref_vec = expected.astype(np.float64) - cos_sim = np.dot(emb_vec, ref_vec) / (np.linalg.norm(emb_vec) * np.linalg.norm(ref_vec)) - assert cos_sim > astromer_config.cos_sim_threshold, ( - f"cosine similarity {cos_sim:.6f} < {astromer_config.cos_sim_threshold}" - )