Skip to content

Reference

Utilities

JIT decorator

staticjit

staticjit(func: Callable) -> Callable

Apply numba's njit and staticmethod to a map method.

Source code in src/tsdynamics/utils/general.py
def staticjit(func: Callable) -> Callable:
    """Apply numba's ``njit`` and ``staticmethod`` to a map method."""
    return staticmethod(njit(func))

Timestep estimation

Sagitta-based selection of an output dt for a sampled trajectory: pick the largest stride whose mid-point deviation from the chord (the sagitta) stays below a curvature tolerance.

estimate_dt_from_sagitta

estimate_dt_from_sagitta(
    y: ndarray,
    dt0: float,
    *,
    epsilon: float,
    percentile: float = 95.0,
    coarsen_only: bool = True,
    use_relative: bool | None = None,
    min_points_per_segment: int = 3,
    search_growth: float = 1.5,
) -> SagittaDt

Sagitta-based Δt selector (derivative-free, robust, idempotent).

  • For d = 1: automatically applies Takens embedding after estimating optimal lag (AMI) and embedding dimension (FNN). The embedding is transparent to the user.
  • For d > 1: computes sagitta in state space as before. y_std is per-feature σ-normalized (ddof=1).
PARAMETER DESCRIPTION
y

Time-ordered samples y_i. If 1D, Takens embedding will be applied automatically.

TYPE: (ndarray, shape(n_samples, n_dim) or (n_samples,))

dt0

Base sampling step Δt0 > 0.

TYPE: float

epsilon

Geometric tolerance. Absolute (σ-normalized units) when use_relative=False, relative (sagitta / chord_median) when use_relative=True.

TYPE: float

percentile

Robust percentile p (e.g., 95.0).

TYPE: float DEFAULT: 95.0

coarsen_only

If True, never suggest Δt* < Δt0 (i.e., m >= 1). Does not affect behaviour when span=1 already exceeds ε — that case is flagged via 'notes' and stride=1 is returned regardless.

TYPE: bool DEFAULT: True

use_relative

Whether to use relative sagitta criterion (sagitta / chord_median <= ε). If None (default), automatically set to True for 1D input (post-embedding) and False for multivariate input. Pass explicitly to override. NOTE: relative criterion is not strictly monotone in stride; binary search is approximate for that case.

TYPE: bool or None DEFAULT: None

min_points_per_segment

Require at least this many triples (i-span, i, i+span) to evaluate a candidate span.

TYPE: int DEFAULT: 3

search_growth

Multiplicative growth for coarse search over spans.

TYPE: float DEFAULT: 1.5

RETURNS DESCRIPTION
SagittaDt

Result with chosen Δt*, stride, achieved percentile value, indices, and bookkeeping. For 1D input, the 'notes' field will contain embedding parameters (lag, dimension).

Source code in src/tsdynamics/utils/sagitta_dt.py
def estimate_dt_from_sagitta(
    y: np.ndarray,
    dt0: float,
    *,
    epsilon: float,
    percentile: float = 95.0,
    coarsen_only: bool = True,
    use_relative: bool | None = None,
    min_points_per_segment: int = 3,
    search_growth: float = 1.5,
) -> SagittaDt:
    """
    Sagitta-based Δt selector (derivative-free, robust, idempotent).

    - For d = 1: automatically applies Takens embedding after estimating optimal lag (AMI)
      and embedding dimension (FNN). The embedding is transparent to the user.
    - For d > 1: computes sagitta in state space as before.
      y_std is per-feature σ-normalized (ddof=1).

    Parameters
    ----------
    y : np.ndarray, shape (n_samples, n_dim) or (n_samples,)
        Time-ordered samples y_i. If 1D, Takens embedding will be applied automatically.
    dt0 : float
        Base sampling step Δt0 > 0.
    epsilon : float
        Geometric tolerance. Absolute (σ-normalized units) when use_relative=False,
        relative (sagitta / chord_median) when use_relative=True.
    percentile : float
        Robust percentile p (e.g., 95.0).
    coarsen_only : bool
        If True, never suggest Δt* < Δt0 (i.e., m >= 1). Does not affect behaviour
        when span=1 already exceeds ε — that case is flagged via 'notes' and
        stride=1 is returned regardless.
    use_relative : bool or None
        Whether to use relative sagitta criterion (sagitta / chord_median <= ε).
        If None (default), automatically set to True for 1D input (post-embedding)
        and False for multivariate input. Pass explicitly to override.
        NOTE: relative criterion is not strictly monotone in stride; binary search
        is approximate for that case.
    min_points_per_segment : int
        Require at least this many triples (i-span, i, i+span) to evaluate a candidate span.
    search_growth : float
        Multiplicative growth for coarse search over spans.

    Returns
    -------
    SagittaDt
        Result with chosen Δt*, stride, achieved percentile value, indices, and bookkeeping.
        For 1D input, the 'notes' field will contain embedding parameters (lag, dimension).
    """
    # -------- input validation and dimensionality handling --------
    if not isinstance(y, np.ndarray):
        raise ValueError("y must be a numpy array.")

    needs_embedding = False

    if y.ndim == 1:
        needs_embedding = True
        y_original = y.copy()
        n_samples_original = len(y)
    elif y.ndim == 2:
        n_samples_original_unused, n_dim = y.shape
        if n_dim == 1:
            needs_embedding = True
            y_original = y.squeeze()
            n_samples_original = len(y_original)
    else:
        raise ValueError("y must be 1D or 2D array.")

    embedding_notes = ""
    if needs_embedding:
        if n_samples_original < 50:
            raise ValueError(
                f"Need at least 50 samples for 1D embedding, got {n_samples_original}."
            )

        print("Estimating lag...")
        embedding_lag = _estimate_lag_ami(y_original)

        print("Estimating embedding dimension...")
        embedding_dim = _estimate_embedding_dim_fnn(y_original, embedding_lag)

        y = _takens_embedding(y_original, embedding_lag, embedding_dim)
        n_samples, n_dim = y.shape

        embedding_notes = f"Applied Takens embedding: lag={embedding_lag}, dim={embedding_dim}. "
    else:
        n_samples, n_dim = y.shape

    if n_samples < 5:
        raise ValueError("Need at least n_samples >= 5 after embedding.")
    if not (dt0 > 0):
        raise ValueError("dt0 must be > 0.")
    if not (epsilon > 0):
        raise ValueError("epsilon must be > 0.")
    if not (0.0 < percentile <= 100.0):
        raise ValueError("percentile must be in (0, 100].")
    if search_growth <= 1.0:
        raise ValueError("search_growth must be > 1.0.")
    if min_points_per_segment < 1:
        raise ValueError("min_points_per_segment must be >= 1.")

    # fix #2: use_relative is now an explicit parameter, not silently tied to needs_embedding
    if use_relative is None:
        use_relative = needs_embedding

    # -------- per-feature σ-normalization (scale invariance) --------
    y = np.asarray(y, dtype=float)
    feature_std = y.std(axis=0, ddof=1)
    feature_std[~np.isfinite(feature_std) | (feature_std == 0.0)] = 1.0
    samples_for_sagitta = y / feature_std

    criterion_note = "relative sagitta" if use_relative else "absolute sagitta"
    notes = embedding_notes + f"Used state-space {criterion_note}."
    if use_relative:
        notes += " NOTE: relative criterion is not strictly monotone; binary search is approximate."

    # -------- determine max span with enough triples --------
    max_span = (n_samples - 1) // 2
    while max_span > 1 and (n_samples - 2 * max_span) < min_points_per_segment:
        max_span -= 1

    def _make_result(stride: int, achieved: float, searched: list, extra_note: str) -> SagittaDt:
        idx = np.arange(0, n_samples, stride, dtype=int)
        if idx[-1] != (n_samples - 1):
            idx = np.concatenate([idx, np.array([n_samples - 1], dtype=int)])
        evs = np.array(sorted(searched, key=lambda x: x[0]), dtype=float)
        spans = evs[:, 0].astype(int) if evs.size else np.array([1], dtype=int)
        return SagittaDt(
            delta_t=float(stride * dt0),
            stride=int(stride),
            percentile_value=float(achieved),
            indices=idx,
            p=float(percentile),
            epsilon=float(epsilon),
            searched_ms=spans,
            notes=notes + extra_note,
        )

    if max_span < 1:
        return _make_result(1, 0.0, [(1, 0.0)], " Too few samples; returning Δt0.")

    # -------- helper to test a span --------
    def span_is_ok(span: int) -> tuple[bool, float]:
        if (n_samples - 2 * span) < min_points_per_segment:
            return False, np.nan
        s_p, chord_med = _compute_sagitta_stats(samples_for_sagitta, span, percentile)
        if use_relative:
            denom = chord_med if (chord_med > 1e-12 and np.isfinite(chord_med)) else 1.0
            val = s_p / denom
        else:
            val = s_p
        return (val <= epsilon), val

    # -------- coarse search (exponential growth) --------
    evaluated: list[tuple[int, float]] = []

    ok_1, val_1 = span_is_ok(1)
    evaluated.append((1, val_1))

    if not ok_1:
        # fix #1: this is not a coarsen_only decision — span=1 failing means the data
        # is already under-sampled relative to ε. Return stride=1 with a clear note.
        # coarsen_only is irrelevant here since we cannot go finer.
        return _make_result(
            1, val_1, evaluated, " span=1 exceeds ε; data may be under-sampled at dt0."
        )

    # fix #4: chosen_stride always valid from here; no sentinel 0 needed
    chosen_stride = 1
    achieved_percentile = val_1
    current_span = 1
    first_fail_span = None

    while True:
        next_span = int(np.floor(current_span * search_growth))
        if next_span <= current_span:
            next_span = current_span + 1
        if next_span > max_span:
            break
        ok_next, val_next = span_is_ok(next_span)
        evaluated.append((next_span, val_next))
        if ok_next:
            chosen_stride = next_span
            achieved_percentile = val_next
            current_span = next_span
        else:
            first_fail_span = next_span
            break

    # -------- binary search between last ok and first fail (if any) --------
    # fix #6: low_ok is now always a confirmed-ok stride (no sentinel ambiguity)
    low_ok = chosen_stride
    high_fail = first_fail_span if first_fail_span is not None else (max_span + 1)

    while high_fail - low_ok > 1:
        mid = (low_ok + high_fail) // 2
        ok_mid, val_mid = span_is_ok(mid)
        evaluated.append((mid, val_mid))
        if ok_mid:
            low_ok = mid
            chosen_stride = mid
            achieved_percentile = val_mid
        else:
            high_fail = mid

    return _make_result(chosen_stride, achieved_percentile, evaluated, "")

SagittaDt dataclass

SagittaDt(
    delta_t: float,
    stride: int,
    percentile_value: float,
    indices: ndarray,
    p: float,
    epsilon: float,
    searched_ms: ndarray,
    notes: str = "",
)

Result container for sagitta-based Δt selection.