diff --git a/.gitmodules b/.gitmodules index 06ea934a0..6ff7c4611 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "test-data"] path = light-curve/tests/light-curve-test-data url = https://github.com/light-curve/test-data.git +[submodule "light-curve/tests/prep-models"] + path = light-curve/tests/prep-models + url = https://github.com/light-curve/prep-models.git diff --git a/CHANGELOG.md b/CHANGELOG.md index a3d733b84..a71a3e1df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added --- +- New `light_curve.embed` module with ONNX-backed light curve embedding models ([#692](https://github.com/light-curve/light-curve-python/pull/692)): + - `Astromer1` and `Astromer2` — transformer encoders pretrained on MACHO light curves (Donoso-Oliva et al. 2023/2026), returning 256-dimensional embeddings. Load directly from HuggingFace with `Astromer2.from_hf()`. Use the `output` parameter to select which named output to compute (`"mean"` (default), `"max"`, or `"sequence"`); onnxruntime prunes unused computation automatically. + - `NonOverlappingWindows`, `Beginning`, `End`, `RandomSubsample`, `MultipleReductions` — strategies for mapping variable-length light curves to fixed-length model inputs. + - `InputTensors` / `AstromerInputs` — typed dataclass containers for preprocessed tensors. + - `Dim` — enum of axis indices for the 4-D output array `(BAND, SUBSAMPLE, SEQUENCE, VALUE)`. ### Changed diff --git a/light-curve/light_curve/embed/__init__.py b/light-curve/light_curve/embed/__init__.py new file mode 100644 index 000000000..7a3fb30d5 --- /dev/null +++ b/light-curve/light_curve/embed/__init__.py @@ -0,0 +1,32 @@ +from .astromer import Astromer1, Astromer2, create_onnx_session +from .model import AstromerInputs, Dim, EmbeddingSession, SingleBandModel +from .reduction import ( + Beginning, + End, + InputTensors, + MultipleReductions, + NonOverlappingWindows, + RandomSubsample, + Reduction, + SingleSubsampleReduction, + reduction_from_str, +) + +__all__ = [ + "Astromer1", + "Astromer2", + "AstromerInputs", + "Beginning", + "Dim", + "EmbeddingSession", + "End", + "InputTensors", + "MultipleReductions", + "NonOverlappingWindows", + "RandomSubsample", + "Reduction", + "SingleBandModel", + "SingleSubsampleReduction", + "create_onnx_session", + "reduction_from_str", +] diff --git a/light-curve/light_curve/embed/astromer.py b/light-curve/light_curve/embed/astromer.py new file mode 100644 index 000000000..672cbd919 --- /dev/null +++ b/light-curve/light_curve/embed/astromer.py @@ -0,0 +1,327 @@ +from __future__ import annotations + +from typing import Sequence + +import numpy as np +from numpy.typing import ArrayLike + +from .model import AstromerInputs, SingleBandModel +from .reduction import Reduction + +_ONNX_INSTALL_HINT = ( + "An ONNX runtime is required to run embedding models. " + "Install the variant that matches your hardware:\n" + " CPU / Apple Silicon: pip install onnxruntime\n" + " NVIDIA GPU (CUDA): pip install onnxruntime-gpu\n" + " Windows DirectML: pip install onnxruntime-directml\n" + "See https://onnxruntime.ai for the full list of packages." +) + + +def create_onnx_session(model_path: str, **kwargs): + """Create an ``onnxruntime.InferenceSession``, with a helpful error if the package is missing. + + Parameters + ---------- + model_path : str + Path to the ONNX model file. + **kwargs + Forwarded verbatim to ``onnxruntime.InferenceSession``. + + Returns + ------- + onnxruntime.InferenceSession + A ready-to-use inference session. + + Raises + ------ + ImportError + If no onnxruntime variant is installed, with installation instructions. + """ + try: + import onnxruntime as ort + except ImportError as exc: + raise ImportError(_ONNX_INSTALL_HINT) from exc + return ort.InferenceSession(model_path, **kwargs) + + +class _AstromerModel(SingleBandModel): + """Internal base class for Astromer-family embedding models. + + Provides shared preprocessing (per-window zero-mean normalisation) and + ONNX inference logic for all Astromer variants. Concrete subclasses set + :attr:`_HF_REPO` and, if needed, override the default ``reduction``. + + Output shape + ------------ + :meth:`__call__` always returns a 4-D array + ``(n_bands, n_subsamples, seq_size, embed_dim)`` for consistency across + models and windowing strategies: + + * ``n_bands`` — number of photometric bands; 1 when ``bands`` is ``None``. + * ``n_subsamples`` — windows produced by the time reduction (1 for + :class:`NonOverlappingWindows`, which averages all windows). + * ``seq_size`` — 1 for the ``"mean"`` and ``"max"`` outputs (aggregated + over the sequence); equal to the model's sequence length for ``"sequence"``. + * ``embed_dim`` — embedding dimension (256 for all Astromer models). + + Use ``embedding.squeeze()`` to collapse unit dimensions when the full + 4-D layout is not needed. + """ + + seq_size: int = 200 + dtype: type = np.float32 + _HF_REPO: str + _OUTPUTS: frozenset[str] = frozenset({"mean", "max", "sequence"}) + + def __init__( + self, + session, + *, + output: str = "mean", + bands: Sequence[str | int] | None = None, + reduction: str | list[str] | Reduction = "non-overlapping-windows", + time_red_kwargs: dict[str, object] | None = None, + ) -> None: + super().__init__( + session, + bands=bands, + reduction=reduction, + time_red_kwargs=time_red_kwargs, + ) + if output not in self._OUTPUTS: + raise ValueError(f"Unknown output '{output}'. Must be one of: {', '.join(sorted(self._OUTPUTS))}") + self.output = output + + @classmethod + def from_hf( + cls, + output: str = "mean", + *, + bands: Sequence[str | int] | None = None, + reduction: str | list[str] | Reduction = "non-overlapping-windows", + time_red_kwargs: dict[str, object] | None = None, + providers=None, + sess_options=None, + ) -> "_AstromerModel": + """Load a model from the HuggingFace Hub. + + Downloads (and caches) the ONNX model file, creates an + ``onnxruntime.InferenceSession``, and returns a ready-to-use instance. + Only the requested output is computed at inference time — onnxruntime + prunes the unused computation graph automatically. + + Parameters + ---------- + output : str, optional + Named ONNX output to return. One of: + + * ``"mean"`` (default) — masked mean pooling over valid timesteps, + output shape ``(batch, 256)`` + * ``"max"`` — masked max pooling over valid timesteps, + output shape ``(batch, 256)`` + * ``"sequence"`` — per-timestep embeddings (no aggregation), + output shape ``(batch, seq_size, 256)`` + + bands : sequence of str or int, optional + Ordered band labels to embed. ``None`` (default) treats the whole + light curve as one band. + reduction : str, list of str, or Reduction, optional + Windowing / subsampling strategy. Defaults to + ``"non-overlapping-windows"``. + time_red_kwargs : dict, optional + Extra keyword arguments forwarded to :func:`reduction_from_str` + when ``reduction`` is given as a string. + providers : list of str, optional + ONNX Runtime execution providers, e.g. + ``["CUDAExecutionProvider", "CPUExecutionProvider"]``. + sess_options : onnxruntime.SessionOptions, optional + Advanced session configuration passed directly to + ``onnxruntime.InferenceSession``. + + Returns + ------- + instance of the calling class + Instance with a live ONNX inference session. + + Raises + ------ + ValueError + If ``output`` is not one of the recognised output names. + ImportError + If ``huggingface_hub`` is not installed, with instructions to + install it or to download the model file manually. + ImportError + If no ``onnxruntime`` variant is installed, with hardware-specific + installation instructions. + """ + if output not in cls._OUTPUTS: + raise ValueError(f"Unknown output '{output}'. Must be one of: {', '.join(sorted(cls._OUTPUTS))}") + + model_prefix = cls._HF_REPO.split("/")[-1] + filename = f"{model_prefix}.onnx" + + try: + from huggingface_hub import hf_hub_download + except ImportError as exc: + hf_url = f"https://huggingface.co/{cls._HF_REPO}/resolve/main/{filename}" + raise ImportError( + "huggingface_hub is required to download models from HuggingFace.\n" + "Install it with:\n" + " pip install huggingface-hub\n" + "Or download the model file directly:\n" + f" {hf_url}\n" + "then load it with:\n" + " import onnxruntime as ort\n" + f' {cls.__name__}(session=ort.InferenceSession("/path/to/{filename}"), output="{output}")' + ) from exc + + model_path = hf_hub_download(repo_id=cls._HF_REPO, filename=filename) + + session_kwargs = {} + if providers is not None: + session_kwargs["providers"] = providers + if sess_options is not None: + session_kwargs["sess_options"] = sess_options + + session = create_onnx_session(model_path, **session_kwargs) + return cls( + session=session, + output=output, + bands=bands, + reduction=reduction, + time_red_kwargs=time_red_kwargs, + ) + + def preprocess_lc( + self, + time: ArrayLike, + mag: ArrayLike, + ) -> AstromerInputs: + """Preprocess a light curve into Astromer model input tensors. + + Each window is independently zero-mean normalised (time and magnitude) + using only the valid (non-padded) observations. Normalisation is + computed in the original input precision before casting to ``float32`` + to avoid precision loss for large values such as MJD timestamps. + + Parameters + ---------- + time : array-like, shape ``(n,)`` + Observation times (e.g. MJD). + mag : array-like, shape ``(n,)`` + Magnitudes. + + Returns + ------- + AstromerInputs + ``times`` and ``input`` are per-window zero-mean arrays of shape + ``(n_windows, seq_size, 1)`` in float32; ``mask`` is 1 for valid + observations and 0 for zero-padded positions, same shape and dtype. + """ + time, mag, mask = self.reduction.preprocess_lc(time, mag, seq_size=self.seq_size) + + bool_mask = mask # (n_windows, seq_size), boolean + n_valid = mask.sum(axis=-1, keepdims=True) + time_mean = (time * mask).sum(axis=-1, keepdims=True) / n_valid + mag_mean = (mag * mask).sum(axis=-1, keepdims=True) / n_valid + time = np.where(mask, time - time_mean, time).astype(self.dtype) + mag = np.where(mask, mag - mag_mean, mag).astype(self.dtype) + mask = mask.astype(self.dtype) + + idx = (..., np.newaxis) + return AstromerInputs(times=time[idx], input=mag[idx], mask=mask[idx], bool_mask=bool_mask) + + def predict_tensors(self, tensors: AstromerInputs) -> np.ndarray: + """Run the ONNX model on pre-processed tensors and return reduced embeddings. + + Parameters + ---------- + tensors : AstromerInputs + As returned by :meth:`preprocess_lc`. + + Returns + ------- + np.ndarray, shape ``(n_subsamples, seq_size, embed_dim)`` + Embeddings after applying the time reduction's aggregation. + For aggregated models (mean / max) ``seq_size`` is 1. + """ + (raw_embedding,) = self.session.run( + [self.output], + {"input": tensors.input, "times": tensors.times, "mask_in": tensors.mask}, + ) + + # Aggregated outputs (mean / max) have shape (n_windows, embed_dim); add SEQUENCE=1 axis + if raw_embedding.ndim == 2: + raw_embedding = np.expand_dims(raw_embedding, axis=1) + + return self.reduction.reduce_embeddings(raw_embedding, tensors, output=self.output) + + +class Astromer1(_AstromerModel): + """Astromer 1 embedding model (Donoso-Oliva et al. 2023). + + Transformer encoder pretrained on MACHO R-band light curves via masked + magnitude prediction. Accepts irregularly-sampled single-band photometry + and returns a 256-dimensional embedding (2 layers, 4 attention heads). + + The ONNX model is hosted on HuggingFace at + ``https://huggingface.co/light-curve/astromer1`` (``astromer1.onnx``). + Three named outputs are available; select with the ``output`` parameter: + + * ``"mean"`` (default) — masked mean pooling → shape ``(batch, 256)`` + * ``"max"`` — masked max pooling → shape ``(batch, 256)`` + * ``"sequence"`` — per-timestep features → shape ``(batch, 200, 256)`` + + Use :meth:`from_hf` to download and load the model directly. + + Parameters + ---------- + session : + ONNX inference session for the Astromer 1 model file. + output : str, optional + Which named output to return: ``"mean"``, ``"max"``, or ``"sequence"``. + Defaults to ``"mean"``. + bands : sequence of str or int, optional + Band labels. ``None`` (default) treats the whole light curve as one + band. + reduction : str, list of str, or Reduction + Windowing strategy. Defaults to :class:`NonOverlappingWindows`. + """ + + _HF_REPO = "light-curve/astromer1" + + +class Astromer2(_AstromerModel): + """Astromer 2 embedding model (Donoso-Oliva et al. 2026). + + Pretrained on 1.5 million MACHO light curves. Accepts irregularly-sampled + single-band photometry and returns a 256-dimensional embedding. + + The ONNX model is hosted on HuggingFace at + ``https://huggingface.co/light-curve/astromer2`` (``astromer2.onnx``). + Three named outputs are available; select with the ``output`` parameter: + + * ``"mean"`` (default) — masked mean pooling → shape ``(batch, 256)`` + * ``"max"`` — masked max pooling → shape ``(batch, 256)`` + * ``"sequence"`` — per-timestep features → shape ``(batch, 200, 256)`` + + Use :meth:`from_hf` to download and load the model directly. + + Parameters + ---------- + session : + ONNX inference session for the Astromer 2 model file. + output : str, optional + Which named output to return: ``"mean"``, ``"max"``, or ``"sequence"``. + Defaults to ``"mean"``. + bands : sequence of str or int, optional + Band labels. ``None`` (default) treats the whole light curve as one + band. + reduction : str, list of str, or Reduction + Windowing strategy. Defaults to :class:`NonOverlappingWindows`, which + matches the sequential-window preprocessing used to produce the reference + embeddings on HuggingFace. + """ + + _HF_REPO = "light-curve/astromer2" diff --git a/light-curve/light_curve/embed/model.py b/light-curve/light_curve/embed/model.py new file mode 100644 index 000000000..ce98dd856 --- /dev/null +++ b/light-curve/light_curve/embed/model.py @@ -0,0 +1,191 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Sequence + +import numpy as np +from numpy.typing import ArrayLike + +from ..light_curve_py.warnings import warn_experimental +from .reduction import InputTensors, Reduction, reduction_from_str + + +@dataclass +class AstromerInputs(InputTensors): + """Input tensors for Astromer-family models. + + ``times`` and ``input`` are float32 arrays of shape + ``(n_windows, seq_size, 1)`` ready for ONNX inference. ``mask`` is the + same validity information cast to float32 for the model (1 = valid, + 0 = padded), shape ``(n_windows, seq_size, 1)``. ``bool_mask`` (inherited) + is the boolean equivalent, shape ``(n_windows, seq_size)``. + """ + + times: np.ndarray = field(kw_only=True) + input: np.ndarray = field(kw_only=True) + mask: np.ndarray = field(kw_only=True) + + +class Dim(IntEnum): + """Axis indices for the 4-D embedding array ``(BAND, SUBSAMPLE, SEQUENCE, VALUE)``.""" + + BAND = 0 + SUBSAMPLE = 1 + SEQUENCE = 2 + VALUE = 3 + + +class EmbeddingSession(ABC): + """Abstract base for ONNX-backed embedding models. + + Subclasses implement :meth:`preprocess_lc` (convert raw arrays to model + tensors) and :meth:`predict_tensors` (run the session and return embeddings). + + Parameters + ---------- + session : + An ``onnxruntime.InferenceSession`` (or any object with a compatible + ``.run()`` interface). + reduction : str, list of str, or Reduction + Strategy for mapping variable-length light curves to fixed-length + sequences. + time_red_kwargs : dict, optional + Extra keyword arguments forwarded to :func:`reduction_from_str` + when ``reduction`` is given as a string. + """ + + seq_size: int + + def __init__( + self, + session, + *, + reduction: str | list[str] | Reduction, + time_red_kwargs: dict[str, object] | None = None, + ) -> None: + warn_experimental( + f"{self.__class__.__module__}.{self.__class__.__name__} is experimental and may change in future versions" + ) + self.session = session + + if time_red_kwargs is None: + time_red_kwargs = {} + if not isinstance(reduction, Reduction): + reduction = reduction_from_str(reduction, **time_red_kwargs) + self.reduction = reduction + + @abstractmethod + def __call__(self, *arrays: ArrayLike) -> np.ndarray: + inputs = self.preprocess_lc(*arrays) + embedding = self.predict_tensors(inputs) + return embedding + + @abstractmethod + def preprocess_lc(self, *arrays: ArrayLike) -> InputTensors: + """Convert raw light curve arrays to model input tensors. + + Parameters + ---------- + *arrays : array-like + Raw light curve arrays (e.g. time, magnitude). + + Returns + ------- + InputTensors + Tensors ready to be passed to :meth:`predict_tensors`. + """ + raise NotImplementedError + + @abstractmethod + def predict_tensors(self, tensors: InputTensors) -> np.ndarray: + """Run the ONNX session on pre-processed tensors and return embeddings. + + Parameters + ---------- + tensors : InputTensors + Pre-processed model inputs as returned by :meth:`preprocess_lc`. + + Returns + ------- + np.ndarray + Embedding array with shape depending on the model and time reduction. + """ + raise NotImplementedError + + +class SingleBandModel(EmbeddingSession, ABC): + """Embedding model that processes one photometric band at a time. + + When ``bands`` is ``None`` the full light curve is treated as a single band. + When ``bands`` is provided, the light curve is split by band label, each band + is embedded independently, and the results are concatenated along + :attr:`Dim.BAND`. + + Parameters + ---------- + session : + ONNX inference session. + bands : sequence of str or int, optional + Ordered band labels to embed. ``None`` treats the whole light curve as + one band. + reduction : str, list of str, or Reduction + Windowing / subsampling strategy. Defaults to + ``"non-overlapping-windows"``. + time_red_kwargs : dict, optional + Extra kwargs forwarded to :func:`reduction_from_str`. + """ + + def __init__( + self, + session, + *, + bands: Sequence[str | int] | None = None, + reduction: str | list[str] | Reduction = "non-overlapping-windows", + time_red_kwargs: dict[str, object] | None = None, + ) -> None: + super().__init__( + session, + reduction=reduction, + time_red_kwargs=time_red_kwargs, + ) + self.bands = bands + + def __call__(self, time: ArrayLike, mag: ArrayLike, band: ArrayLike | None = None) -> np.ndarray: + """Embed a light curve. + + Parameters + ---------- + time : array-like, shape ``(n,)`` + Observation times. + mag : array-like, shape ``(n,)`` + Magnitudes. + band : array-like, shape ``(n,)``, optional + Band labels, required when ``self.bands`` is not ``None``. + + Returns + ------- + np.ndarray, shape ``(n_bands, n_subsamples, seq_size, embed_dim)`` + Embedding tensor. ``n_bands`` is 1 when ``self.bands`` is ``None``. + + Raises + ------ + ValueError + If ``band`` is provided but ``self.bands`` is ``None``, or vice versa. + """ + if (band is None) != (self.bands is None): + raise ValueError( + f"band array must be provided with values specified by `bands` class constructor argument: {self.bands}" + ) + + if self.bands is None: + embed = super().__call__(time, mag) + return np.expand_dims(embed, axis=Dim.BAND) + + embeddings = [] + for band_name in self.bands: + band_idx = band == band_name + embed_band = super().__call__(time[band_idx], mag[band_idx]) + embeddings.append(np.expand_dims(embed_band, axis=Dim.BAND)) + return np.concatenate(embeddings, axis=Dim.BAND) diff --git a/light-curve/light_curve/embed/reduction.py b/light-curve/light_curve/embed/reduction.py new file mode 100644 index 000000000..25501e9d8 --- /dev/null +++ b/light-curve/light_curve/embed/reduction.py @@ -0,0 +1,478 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +import numpy as np +from numpy.typing import ArrayLike + +if TYPE_CHECKING: + from typing import Self + + +@dataclass +class InputTensors: + """Base tensor container produced by :meth:`EmbeddingSession.preprocess_lc`. + + All subclasses carry a boolean ``bool_mask`` identifying valid (non-padded) + timesteps, shape ``(n_windows, seq_size)``. + """ + + bool_mask: np.ndarray = field(kw_only=True) + + +class Reduction(ABC): + """Abstract base for strategies that map a variable-length light curve to fixed-length sequences. + + Subclasses implement :meth:`subsample_lc` (windowing / subsampling logic) and + :meth:`reduce_embeddings` (how per-window embeddings are aggregated back to one + embedding per light curve). + """ + + def _mask(self, *, array_size: int, seq_size: int) -> np.ndarray: + padding_size = max(seq_size - array_size, 0) + values = np.ones(array_size, dtype=bool) + padding = np.zeros(padding_size, dtype=bool) + return np.r_[values, padding] + + def _pad(self, array: np.ndarray, seq_size: int) -> np.ndarray: + if array.size >= seq_size: + return array[:seq_size] + return np.r_[array, np.zeros(seq_size - array.size, dtype=array.dtype)] + + def preprocess_lc(self, *arrays: ArrayLike, seq_size: int) -> tuple[np.ndarray, ...]: + """Subsample, pad, and mask a light curve. + + Parameters + ---------- + *arrays : array-like + 1-D arrays of equal length (e.g. time, magnitude). + seq_size : int + Target sequence length; shorter windows are zero-padded. + + Returns + ------- + tuple of np.ndarray + One stacked array per input plus a boolean mask, each of shape + ``(n_subsamples, seq_size)``. The mask is ``True`` for real + observations and ``False`` for padding. + + Raises + ------ + ValueError + If any input array is not 1-D. + """ + arrays = np.broadcast_arrays(*arrays) + if arrays[0].ndim != 1: + raise ValueError(f"Inputs must be single dimensional, {arrays[0].ndim} is provided") + + subsamples = self.subsample_lc(*arrays, seq_size=seq_size) + sequences = [] + for subsample in subsamples: + mask = self._mask(array_size=subsample[0].size, seq_size=seq_size) + sequence = tuple(self._pad(array, seq_size) for array in subsample) + (mask,) + sequences.append(sequence) + + sequences = tuple(np.stack(arrays) for arrays in zip(*sequences)) + + return sequences + + @abstractmethod + def subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> list[tuple[np.ndarray, ...]]: + """Split or subsample the light curve into one or more windows. + + Parameters + ---------- + *arrays : np.ndarray + 1-D arrays of equal length. + seq_size : int + Maximum number of observations per window. + + Returns + ------- + list of tuple of np.ndarray + Each element is a tuple of arrays (one per input), containing at most + ``seq_size`` observations. + """ + raise NotImplementedError + + @abstractmethod + def reduce_embeddings(self, embeddings: ArrayLike, tensors: InputTensors, *, output: str) -> ArrayLike: + """Aggregate per-window embeddings into a single embedding per light curve. + + Parameters + ---------- + embeddings : array-like, shape ``(n_windows, seq_size, embed_dim)`` + Raw per-window embeddings from the model. ``seq_size`` is 1 for + aggregated (mean / max) outputs and the full sequence length for the + ``"sequence"`` output. + tensors : InputTensors + Preprocessed input tensors as returned by + :meth:`EmbeddingSession.preprocess_lc`. + output : str + The model output name (e.g. ``"mean"``, ``"max"``, ``"sequence"``). + Implementations use this to select the appropriate aggregation strategy + rather than inferring it from array shapes. + + Returns + ------- + array-like + Reduced embeddings, shape depends on the concrete implementation. + """ + raise NotImplementedError + + +class SingleSubsampleReduction(Reduction, ABC): + """Base for strategies that produce exactly one window per light curve.""" + + def subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> list[tuple[np.ndarray, ...]]: + """Return a single-element list wrapping :meth:`single_subsample_lc`. + + Parameters + ---------- + *arrays : np.ndarray + 1-D arrays of equal length. + seq_size : int + Maximum observations per window. + + Returns + ------- + list of tuple of np.ndarray + A one-element list containing the subsampled arrays. + """ + return [self.single_subsample_lc(*arrays, seq_size=seq_size)] + + @abstractmethod + def single_subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> tuple[np.ndarray, ...]: + """Return one subsampled window of at most ``seq_size`` observations. + + Parameters + ---------- + *arrays : np.ndarray + 1-D arrays of equal length. + seq_size : int + Maximum number of observations to keep. + + Returns + ------- + tuple of np.ndarray + Subsampled arrays, each of length ``<= seq_size``. + """ + raise NotImplementedError + + def reduce_embeddings(self, embeddings: ArrayLike, tensors: InputTensors, *, output: str) -> ArrayLike: + """Return embeddings unchanged (single window — no aggregation needed). + + Parameters + ---------- + embeddings : array-like + Per-window embeddings from the model. + tensors : InputTensors + Unused; accepted for interface compatibility. + output : str + Unused; accepted for interface compatibility. + + Returns + ------- + array-like + The input unchanged. + """ + return embeddings + + +class Beginning(SingleSubsampleReduction): + """Select the chronologically first ``seq_size`` observations of the light curve.""" + + def single_subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> tuple[np.ndarray, ...]: + """Return the leading ``seq_size`` elements of each array. + + Parameters + ---------- + *arrays : np.ndarray + 1-D arrays of equal length. + seq_size : int + Number of observations to keep from the start. + + Returns + ------- + tuple of np.ndarray + First ``seq_size`` elements of each input array. + """ + return tuple(array[:seq_size] for array in arrays) + + +class End(SingleSubsampleReduction): + """Select the chronologically last ``seq_size`` observations of the light curve.""" + + def single_subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> tuple[np.ndarray, ...]: + """Return the trailing ``seq_size`` elements of each array. + + Parameters + ---------- + *arrays : np.ndarray + 1-D arrays of equal length. + seq_size : int + Number of observations to keep from the end. + + Returns + ------- + tuple of np.ndarray + Last ``seq_size`` elements of each input array. + """ + return tuple(array[-seq_size:] for array in arrays) + + +class RandomSubsample(SingleSubsampleReduction): + """Draw ``seq_size`` observations uniformly at random without replacement. + + Parameters + ---------- + rng : int, np.random.Generator, or None + Seed or generator for reproducible sampling. + """ + + def __init__(self, *, rng: int | np.random.Generator | None) -> None: + super().__init__() + self.rng = np.random.default_rng(rng) + + def single_subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> tuple[np.ndarray, ...]: + """Return a random subsample of at most ``seq_size`` observations, in original order. + + Parameters + ---------- + *arrays : np.ndarray + 1-D arrays of equal length. + seq_size : int + Maximum number of observations to sample. + + Returns + ------- + tuple of np.ndarray + ``min(len, seq_size)`` randomly selected observations, sorted by + original index so temporal order is preserved. + """ + array_size = arrays[0].size + if array_size <= seq_size: + return arrays + + indices = self.rng.choice(array_size, size=seq_size, replace=False) + indices.sort() + return tuple(array[indices] for array in arrays) + + +class NonOverlappingWindows(Reduction): + """Split the light curve into consecutive non-overlapping windows of ``seq_size`` observations. + + A light curve of length *L* yields ``ceil(L / seq_size)`` windows; the last window + may be shorter than ``seq_size`` and is zero-padded. Per-window embeddings are + averaged to produce a single embedding per light curve. + """ + + def subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> list[tuple[np.ndarray, ...]]: + """Yield consecutive slices of length ``seq_size``. + + Parameters + ---------- + *arrays : np.ndarray + 1-D arrays of equal length. + seq_size : int + Window size. + + Returns + ------- + list of tuple of np.ndarray + ``ceil(len / seq_size)`` windows, each a tuple of sliced arrays. + """ + array_size = arrays[0].size + + subsamples = [] + for start in range(0, array_size, seq_size): + end = start + seq_size + subsample = tuple(array[start:end] for array in arrays) + subsamples.append(subsample) + + return subsamples + + def reduce_embeddings(self, embeddings: np.ndarray, tensors: InputTensors, *, output: str) -> np.ndarray: + """Reduce per-window embeddings to a single representation. + + For aggregated outputs (``output != "sequence"``) the window embeddings + are averaged, yielding shape ``(1, 1, embed_dim)``. + + For ``output == "sequence"`` a masked mean is computed across windows + for each timestep position, yielding shape ``(1, seq_size, embed_dim)`` + regardless of how many windows the light curve was split into. + + Parameters + ---------- + embeddings : np.ndarray, shape ``(n_windows, seq_size, embed_dim)`` + Per-window embeddings. + tensors : InputTensors + Preprocessed input tensors; ``tensors.bool_mask`` (shape + ``(n_windows, seq_size)``) identifies valid vs. padded positions + for the ``"sequence"`` output. + output : str + Model output name. Determines aggregation strategy. + + Returns + ------- + np.ndarray, shape ``(1, 1, embed_dim)`` or ``(1, seq_size, embed_dim)`` + For mean / max: mean over windows, shape ``(1, 1, embed_dim)``. + For sequence: masked mean over windows per timestep, shape + ``(1, seq_size, embed_dim)``. + """ + if output != "sequence": + return np.mean(embeddings, axis=0)[np.newaxis, ...] + # sequence output: masked mean over windows per timestep → (1, seq_size, embed_dim) + n_valid = np.maximum(tensors.bool_mask.sum(axis=0), 1) # (seq_size,) + window_sum = (embeddings * tensors.bool_mask[:, :, np.newaxis]).sum(axis=0) # (seq_size, embed_dim) + return (window_sum / n_valid[:, np.newaxis])[np.newaxis, ...] + + +class MultipleReductions(Reduction): + """Apply several :class:`SingleSubsampleReduction` strategies in parallel. + + Each strategy produces one window; embeddings are stacked along the subsample + axis rather than aggregated, giving one embedding per strategy. + + Parameters + ---------- + reductions : list of SingleSubsampleReduction + Ordered list of strategies to apply. + + Raises + ------ + ValueError + If any element of ``reductions`` is not a + :class:`SingleSubsampleReduction`. + """ + + def __init__( + self, + reductions: list[SingleSubsampleReduction], + ) -> None: + super().__init__() + for r in reductions: + if not isinstance(r, SingleSubsampleReduction): + raise ValueError( + f"Reduction '{r}' is not a subsampling reduction; " + "currently only subsampling reductions can be used in multiple reductions" + ) + self.reductions = reductions + + @classmethod + def from_strings( + cls, + reductions: list[str], + **kwargs, + ) -> Self: + """Construct from a list of strategy name strings. + + Parameters + ---------- + reductions : list of str + Strategy names recognised by :func:`reduction_from_str`. + **kwargs + Forwarded to each strategy constructor. If ``rng`` is an integer + seed it is converted to a :class:`numpy.random.Generator` first so + that each stochastic strategy gets an independent random stream. + + Returns + ------- + MultipleReductions + Instance wrapping the instantiated strategies. + """ + # In the cases where rng is an integer (seed), we should convert to a Generator first, so we don't reuse the + # same seed across multiple reductions. Reusing the same rng is ok: each call mutates it. + if "rng" in kwargs: + kwargs["rng"] = np.random.default_rng(kwargs["rng"]) + return cls( + reductions=[reduction_from_str(s, **kwargs) for s in reductions], + ) + + def subsample_lc(self, *arrays: np.ndarray, seq_size: int) -> list[tuple[np.ndarray, ...]]: + """Apply each strategy and return one window per strategy. + + Parameters + ---------- + *arrays : np.ndarray + 1-D arrays of equal length. + seq_size : int + Maximum observations per window. + + Returns + ------- + list of tuple of np.ndarray + One element per strategy, each a tuple of subsampled arrays. + """ + subsamples = [] + for reduction in self.reductions: + subsamples.append(reduction.single_subsample_lc(*arrays, seq_size=seq_size)) + return subsamples + + def reduce_embeddings(self, embeddings: ArrayLike, tensors: InputTensors, *, output: str) -> ArrayLike: + """Return embeddings unchanged — one per strategy, already stacked. + + Parameters + ---------- + embeddings : array-like + Per-strategy embeddings from the model. + tensors : InputTensors + Unused; accepted for interface compatibility. + output : str + Unused; accepted for interface compatibility. + + Returns + ------- + array-like + The input unchanged. + """ + return embeddings + + +def reduction_from_str(s: str | list[str], **kwargs) -> Reduction: + """Instantiate a :class:`Reduction` from a name string or list of name strings. + + Parameters + ---------- + s : str or list of str + Strategy name or list of names. Recognised values (case-insensitive, + underscores treated as hyphens): ``"beginning"``, ``"end"``, + ``"random-subsample"``, ``"non-overlapping-windows"``. A list with more + than one entry produces a :class:`MultipleReductions`. + **kwargs + Forwarded to the strategy constructor (e.g. ``rng`` for + :class:`RandomSubsample`). + + Returns + ------- + Reduction + The instantiated strategy. + + Raises + ------ + ValueError + If the name is not recognised, or if ``rng`` is missing for + ``"random-subsample"``. + """ + if isinstance(s, list): + if len(s) != 1: + return MultipleReductions.from_strings(s, **kwargs) + s = s[0] + + match s.lower().replace("_", "-"): + case "beginning": + return Beginning() + case "end": + return End() + case "random-subsample": + try: + rng = kwargs["rng"] + except KeyError: + raise ValueError("rng must be provided for random subsample reduction") + return RandomSubsample(rng=rng) + case "non-overlapping-windows": + return NonOverlappingWindows() + case _: + raise ValueError(f"Unknown reduction '{s}'") diff --git a/light-curve/tests/prep-models b/light-curve/tests/prep-models new file mode 160000 index 000000000..5ca55e355 --- /dev/null +++ b/light-curve/tests/prep-models @@ -0,0 +1 @@ +Subproject commit 5ca55e355fce2042ad19272b82c8819e03617e7e diff --git a/light-curve/tests/test_embed.py b/light-curve/tests/test_embed.py new file mode 100644 index 000000000..bea384d54 --- /dev/null +++ b/light-curve/tests/test_embed.py @@ -0,0 +1,219 @@ +"""Tests for light_curve.embed — Astromer1 and Astromer2 embedding models. + +ONNX models are downloaded from HuggingFace via huggingface_hub (cached in +the HF default cache directory). Expected embeddings come from the prep-models +submodule at ``tests/prep-models/``. +""" + +from dataclasses import dataclass +from pathlib import Path + +import numpy as np +import pyarrow.parquet as pq +import pytest +from huggingface_hub import hf_hub_download + +PREP_MODELS_DIR = Path(__file__).parent / "prep-models" / "models" + + +@dataclass(frozen=True) +class AstromerTestConfig: + model_name: str + repo_id: str + test_parquet: Path + # reduction used when generating the reference parquet + ref_reduction: str + # Astromer1 has larger TF→ONNX drift (~0.98) than Astromer2 (~0.999) + cos_sim_threshold: float + + +ASTROMER_CONFIGS = [ + AstromerTestConfig( + model_name="Astromer1", + repo_id="light-curve/astromer1", + test_parquet=PREP_MODELS_DIR / "astromer1" / "out" / "test-data" / "astromer1_test.parquet", + ref_reduction="beginning", + cos_sim_threshold=0.97, + ), + AstromerTestConfig( + model_name="Astromer2", + repo_id="light-curve/astromer2", + test_parquet=PREP_MODELS_DIR / "astromer2" / "out" / "test-data" / "astromer2_test.parquet", + ref_reduction="non-overlapping-windows", + cos_sim_threshold=0.999, + ), +] + + +# --------------------------------------------------------------------------- +# Shared fixtures (parametrized over both models) +# --------------------------------------------------------------------------- + + +@pytest.fixture( + scope="session", + params=ASTROMER_CONFIGS, + ids=[c.model_name.lower() for c in ASTROMER_CONFIGS], +) +def astromer_config(request) -> AstromerTestConfig: + return request.param + + +@pytest.fixture(scope="session") +def astromer_session(astromer_config): + import onnxruntime as ort + + prefix = astromer_config.repo_id.split("/")[-1] + path = hf_hub_download(repo_id=astromer_config.repo_id, filename=f"{prefix}.onnx") + return ort.InferenceSession(str(path)) + + +@pytest.fixture(scope="session") +def astromer_test_table(astromer_config): + return pq.read_table(astromer_config.test_parquet).to_pydict() + + +# --------------------------------------------------------------------------- +# Preprocessing tests (no ONNX session needed) +# --------------------------------------------------------------------------- + + +def test_non_overlapping_windows_short_lc(): + """LC shorter than seq_size produces one window.""" + from light_curve.embed import NonOverlappingWindows + + tr = NonOverlappingWindows() + time = np.arange(50, dtype=float) + mag = np.ones(50) + subsamples = tr.subsample_lc(time, mag, seq_size=200) + assert len(subsamples) == 1 + assert subsamples[0][0].shape == (50,) + + +def test_non_overlapping_windows_long_lc(): + """LC of length 400 with seq_size=200 gives 2 windows.""" + from light_curve.embed import NonOverlappingWindows + + tr = NonOverlappingWindows() + time = np.arange(400, dtype=float) + mag = np.ones(400) + subsamples = tr.subsample_lc(time, mag, seq_size=200) + assert len(subsamples) == 2 + + +def test_preprocess_lc_shape(): + """preprocess_lc pads/clips to seq_size and returns mask.""" + from light_curve.embed import NonOverlappingWindows + + tr = NonOverlappingWindows() + time = np.arange(50, dtype=float) + mag = np.ones(50) + time_out, mag_out, mask = tr.preprocess_lc(time, mag, seq_size=200) + assert time_out.shape == (1, 200) + assert mag_out.shape == (1, 200) + assert mask.shape == (1, 200) + assert mask[0, :50].all() + assert not mask[0, 50:].any() + + +@pytest.mark.parametrize( + "model_class_name", + ["Astromer1", "Astromer2"], +) +def test_astromer_preprocess_normalises(model_class_name): + """preprocess_lc produces zero-mean time and mag for each window.""" + import light_curve.embed as lce + + model_class = getattr(lce, model_class_name) + model = model_class(session=None, reduction="non-overlapping-windows") + time = np.linspace(0, 100, 200) + mag = np.linspace(10, 15, 200) + tensors = model.preprocess_lc(time, mag) + valid = tensors.bool_mask[0] + assert abs(tensors.times[0, valid, 0].mean()) < 1e-5 + assert abs(tensors.input[0, valid, 0].mean()) < 1e-5 + + +# --------------------------------------------------------------------------- +# End-to-end tests (parametrized over Astromer1 / Astromer2 via fixture) +# --------------------------------------------------------------------------- + + +def test_non_overlapping_windows_sequence_output_long_lc(astromer_config, astromer_session): + """NonOverlappingWindows + 'sequence' output yields consistent (1,1,200,256) shape.""" + import light_curve.embed as lce + + model_class = getattr(lce, astromer_config.model_name) + model = model_class(session=astromer_session, output="sequence", reduction="non-overlapping-windows") + + # 350 obs → 2 windows: 200 valid + 150 valid (+50 padded) + time = np.linspace(0, 1000, 350) + mag = np.ones(350) + embedding = model(time, mag) + + # shape: (BAND=1, SUBSAMPLE=1, SEQUENCE=200, VALUE=256) — masked mean over windows per timestep + assert embedding.shape == (1, 1, 200, 256) + assert np.all(np.isfinite(embedding)) + + +def test_from_hf_shape(astromer_config): + """from_hf() returns a working model with correct output shape.""" + import light_curve.embed as lce + + model_class = getattr(lce, astromer_config.model_name) + model = model_class.from_hf(output="mean") + time = np.linspace(0, 100, 50) + mag = np.ones(50) + embedding = model(time, mag) + assert embedding.shape == (1, 1, 1, 256) + assert np.all(np.isfinite(embedding)) + + +def test_from_hf_matches_session(astromer_config, astromer_session, astromer_test_table): + """from_hf() and a manually created session produce identical embeddings.""" + import light_curve.embed as lce + + model_class = getattr(lce, astromer_config.model_name) + lc = astromer_test_table["lightcurve"][0] + time = np.array([obs["mjd"] for obs in lc]) + mag = np.array([obs["mag"] for obs in lc]) + + model_hf = model_class.from_hf(output="mean") + model_manual = model_class(session=astromer_session, output="mean") + + emb_hf = model_hf(time, mag) + emb_manual = model_manual(time, mag) + np.testing.assert_array_equal(emb_hf, emb_manual) + + +def test_from_hf_invalid_output(astromer_config): + """from_hf() raises ValueError for unknown output names.""" + import light_curve.embed as lce + + model_class = getattr(lce, astromer_config.model_name) + with pytest.raises(ValueError, match="Unknown output"): + model_class.from_hf(output="nonexistent") + + +@pytest.mark.parametrize("row_idx", range(10)) +def test_mean_matches_reference(astromer_config, astromer_session, astromer_test_table, row_idx): + """Mean-pooling output matches the reference embedding from the parquet.""" + import light_curve.embed as lce + + model_class = getattr(lce, astromer_config.model_name) + lc = astromer_test_table["lightcurve"][row_idx] + time = np.array([obs["mjd"] for obs in lc]) + mag = np.array([obs["mag"] for obs in lc]) + expected = np.array(astromer_test_table["embedding_mean"][row_idx]) + + model = model_class(session=astromer_session, output="mean", reduction=astromer_config.ref_reduction) + embedding = model(time, mag) + + assert embedding.shape == (1, 1, 1, 256) + assert np.all(np.isfinite(embedding)) + emb_vec = embedding[0, 0, 0].astype(np.float64) + ref_vec = expected.astype(np.float64) + cos_sim = np.dot(emb_vec, ref_vec) / (np.linalg.norm(emb_vec) * np.linalg.norm(ref_vec)) + assert cos_sim > astromer_config.cos_sim_threshold, ( + f"cosine similarity {cos_sim:.6f} < {astromer_config.cos_sim_threshold}" + )