Skip to content

Reference

Base classes

The shared machinery (ParamSet, MetaStore, Trajectory, SystemBase), the three family bases users subclass, and the System protocol the analysis toolkit is written against.

ParamSet

ParamSet(data: dict[str, Any])

Bases: MutableMapping

Ordered, fixed-key parameter container.

Keys are frozen at construction time — you can change values but not add or remove keys. Supports both dict-style (p["sigma"]) and attribute-style (p.sigma) read/write.

PARAMETER DESCRIPTION
data

Initial key→value mapping. All future writes must use existing keys.

TYPE: dict

Examples:

>>> p = ParamSet({"sigma": 10.0, "rho": 28.0})
>>> p.sigma
10.0
>>> p.sigma = 15.0
>>> p["sigma"]
15.0
>>> p.unknown = 5.0            # raises AttributeError
Source code in src/tsdynamics/families/base.py
def __init__(self, data: dict[str, Any]) -> None:
    object.__setattr__(self, "_data", dict(data))

as_tuple

as_tuple() -> tuple

Return parameter values as a tuple (insertion order).

Source code in src/tsdynamics/families/base.py
def as_tuple(self) -> tuple:
    """Return parameter values as a tuple (insertion order)."""
    return tuple(self._data.values())

as_dict

as_dict() -> dict

Return a shallow copy as a plain dict.

Source code in src/tsdynamics/families/base.py
def as_dict(self) -> dict:
    """Return a shallow copy as a plain dict."""
    return dict(self._data)

param_hash

param_hash() -> int

Return a process-stable 64-bit integer hash of the current parameter values.

Uses MD5 over a JSON-serialised representation so the result is reproducible across Python process restarts (unlike hash()).

The hash backs cache keys for compiled JiTCODE / JiTCDDE modules and the Numba-iterate cache. At 64 bits the birthday-paradox collision probability reaches 50 % only around 2^32 ≈ 4·10⁹ distinct parameter sets, which is well beyond any realistic parameter sweep. The previous 32-bit width hit the same threshold at only 2^16 ≈ 65 000 sets, which a sufficiently large sweep could plausibly reach — and a collision there would silently return a compiled artifact built for a different parameter set.

Source code in src/tsdynamics/families/base.py
def param_hash(self) -> int:
    """
    Return a process-stable 64-bit integer hash of the current parameter values.

    Uses MD5 over a JSON-serialised representation so the result is
    reproducible across Python process restarts (unlike ``hash()``).

    The hash backs cache keys for compiled JiTCODE / JiTCDDE modules
    and the Numba-iterate cache.  At 64 bits the birthday-paradox
    collision probability reaches 50 % only around ``2^32 ≈ 4·10⁹``
    distinct parameter sets, which is well beyond any realistic
    parameter sweep.  The previous 32-bit width hit the same threshold
    at only ``2^16 ≈ 65 000`` sets, which a sufficiently large sweep
    could plausibly reach — and a collision there would silently
    return a compiled artifact built for a different parameter set.
    """
    import hashlib
    import json

    s = json.dumps(list(self._data.items()), default=str)
    return int(hashlib.md5(s.encode()).hexdigest()[:16], 16)

MetaStore

MetaStore()

Bases: MutableMapping

Append-with-history metadata store for computed results.

Behaves like a dict for everyday use (meta["lyapunov_spectrum"] reads/writes the latest value), but every write is appended rather than overwritten, so earlier results survive::

sys.meta.record("lyapunov_spectrum", spec, dt=0.1, final_time=200.0)
sys.meta["lyapunov_spectrum"]            # latest value
sys.meta.history("lyapunov_spectrum")    # every record, with context

Equality compares the latest values against a plain dict (or another MetaStore), preserving sys.meta == {} style assertions.

Source code in src/tsdynamics/families/base.py
def __init__(self) -> None:
    self._records: dict[str, list[dict]] = {}

record

record(key: str, value: Any, **context: Any) -> Any

Append value under key with optional context kwargs.

Source code in src/tsdynamics/families/base.py
def record(self, key: str, value: Any, **context: Any) -> Any:
    """Append ``value`` under ``key`` with optional context kwargs."""
    import time

    self._records.setdefault(key, []).append(
        {"value": value, "context": context, "timestamp": time.time()}
    )
    return value

history

history(key: str) -> list[dict]

Return every record for key (oldest first), with context.

Source code in src/tsdynamics/families/base.py
def history(self, key: str) -> list[dict]:
    """Return every record for ``key`` (oldest first), with context."""
    return list(self._records.get(key, []))

latest

latest() -> dict[str, Any]

Return a plain dict of the latest value per key.

Source code in src/tsdynamics/families/base.py
def latest(self) -> dict[str, Any]:
    """Return a plain dict of the latest value per key."""
    return {k: recs[-1]["value"] for k, recs in self._records.items()}

Trajectory

Trajectory(
    t: ndarray,
    y: ndarray,
    system: Any,
    meta: dict | None = None,
)

The result of integrating or iterating a dynamical system.

Supports tuple-unpacking for backward compatibility::

t, y = system.integrate(final_time=100)
ATTRIBUTE DESCRIPTION
t

Time points (or step indices for discrete maps).

TYPE: (ndarray, shape(T))

y

State at each time point.

TYPE: (ndarray, shape(T, dim))

system

Back-reference to the system that produced this trajectory.

TYPE: SystemBase

meta

Provenance: system name, params snapshot, solver, tolerances, ic.

TYPE: dict

Examples:

>>> traj = lor.integrate(final_time=100)
>>> traj.dim
3
>>> traj["x"]            # named component (via the class's ``variables``)
array([...])
>>> traj.after(20.0)     # drop transient
Trajectory(n_steps=..., dim=3, t=[20.0, 100.0])
>>> t, y = traj          # tuple-unpack still works
Source code in src/tsdynamics/data/trajectory.py
def __init__(
    self,
    t: np.ndarray,
    y: np.ndarray,
    system: Any,
    meta: dict | None = None,
) -> None:
    self.t = np.asarray(t)
    self.y = np.asarray(y)
    self.system = system
    self.meta = dict(meta) if meta else {}
    self._kdtree = None

variables property

variables: tuple[str, ...] | None

Component names declared by the system (instance attr or class ClassVar).

dim property

dim: int

State-space dimension.

n_steps property

n_steps: int

Number of time steps.

component

component(i: int | str) -> ndarray

Return a single state component.

PARAMETER DESCRIPTION
i

Component index, or component name when the system declares variables.

TYPE: int or str

RETURNS DESCRIPTION
(ndarray, shape(T))
Source code in src/tsdynamics/data/trajectory.py
def component(self, i: int | str) -> np.ndarray:
    """
    Return a single state component.

    Parameters
    ----------
    i : int or str
        Component index, or component name when the system declares
        ``variables``.

    Returns
    -------
    ndarray, shape (T,)
    """
    if isinstance(i, str):
        i = self._component_index(i)
    return self.y[:, i]

after

after(t0: float) -> Trajectory

Drop the initial transient.

PARAMETER DESCRIPTION
t0

Keep only time points t >= t0.

TYPE: float

RETURNS DESCRIPTION
Trajectory
Source code in src/tsdynamics/data/trajectory.py
def after(self, t0: float) -> Trajectory:
    """
    Drop the initial transient.

    Parameters
    ----------
    t0 : float
        Keep only time points ``t >= t0``.

    Returns
    -------
    Trajectory
    """
    mask = self.t >= t0
    return Trajectory(self.t[mask], self.y[mask], self.system, meta=self.meta)

minmax

minmax() -> tuple[ndarray, ndarray]

Return per-component (minima, maxima), each of shape (dim,).

Source code in src/tsdynamics/data/trajectory.py
def minmax(self) -> tuple[np.ndarray, np.ndarray]:
    """Return per-component ``(minima, maxima)``, each of shape ``(dim,)``."""
    return self.y.min(axis=0), self.y.max(axis=0)

standardize

standardize() -> Trajectory

Return a copy with zero mean and unit standard deviation per component.

The applied transform is recorded in meta["standardized"].

Source code in src/tsdynamics/data/trajectory.py
def standardize(self) -> Trajectory:
    """
    Return a copy with zero mean and unit standard deviation per component.

    The applied transform is recorded in ``meta["standardized"]``.
    """
    mean = self.y.mean(axis=0)
    std = self.y.std(axis=0)
    std = np.where(std < np.finfo(float).tiny, 1.0, std)
    return Trajectory(
        self.t,
        (self.y - mean) / std,
        self.system,
        meta={**self.meta, "standardized": {"mean": mean, "std": std}},
    )

neighbors

neighbors(q: Any, k: int = 1) -> tuple[ndarray, ndarray]

Nearest trajectory points to query point(s) q.

Builds a KD-tree lazily on first call and caches it; subsequent queries are O(log T).

PARAMETER DESCRIPTION
q

Query point(s).

TYPE: (array - like, shape(dim) or (m, dim))

k

Number of neighbours per query point.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
(distances, indices)

As returned by :meth:scipy.spatial.cKDTree.query.

Source code in src/tsdynamics/data/trajectory.py
def neighbors(self, q: Any, k: int = 1) -> tuple[np.ndarray, np.ndarray]:
    """
    Nearest trajectory points to query point(s) ``q``.

    Builds a KD-tree lazily on first call and caches it; subsequent
    queries are O(log T).

    Parameters
    ----------
    q : array-like, shape (dim,) or (m, dim)
        Query point(s).
    k : int
        Number of neighbours per query point.

    Returns
    -------
    (distances, indices)
        As returned by :meth:`scipy.spatial.cKDTree.query`.
    """
    from scipy.spatial import cKDTree

    if self._kdtree is None:
        self._kdtree = cKDTree(self.y)
    return self._kdtree.query(np.asarray(q, dtype=float), k=k)

set_distance

set_distance(
    other: Any, *, method: str = "centroid"
) -> float

Distance to another point set (Trajectory or array), as a set.

method is "centroid" (default), "hausdorff", or "minimum" — see :func:tsdynamics.data.set_distance. The matching primitive behind attractor deduplication and continuation.

Source code in src/tsdynamics/data/trajectory.py
def set_distance(self, other: Any, *, method: str = "centroid") -> float:
    """
    Distance to another point set (Trajectory or array), as a set.

    ``method`` is ``"centroid"`` (default), ``"hausdorff"``, or
    ``"minimum"`` — see :func:`tsdynamics.data.set_distance`.  The
    matching primitive behind attractor deduplication and continuation.
    """
    from tsdynamics.data import set_distance

    return set_distance(self, other, method=method)

SystemBase

SystemBase(
    params: dict | None = None,
    ic: Any | None = None,
    dim: int | None = None,
)

Abstract base class for all dynamical systems.

Provides: - params — a :class:ParamSet holding the system's parameter values. Attribute access on the system is transparently forwarded to params. - dim — integer state-space dimension. - ic — optional initial conditions array. - meta — dict for storing computed metadata (Lyapunov spectra, etc.). - copy() / with_params() for safe cloning. - resolve_ic() for uniform IC resolution across subclasses.

Class-level declarations

Subclasses should declare at class level::

class Lorenz(ContinuousSystem):
    params = {"sigma": 10.0, "rho": 28.0, "beta": 8/3}
    dim = 3
Constructor overrides

Individual instances can override params and/or ic::

lor = Lorenz(params={"rho": 30.0}, ic=[1.0, 0.0, 0.0])

Constructor will raise ValueError for unknown param keys.

Source code in src/tsdynamics/families/base.py
def __init__(
    self,
    params: dict | None = None,
    ic: Any | None = None,
    dim: int | None = None,
) -> None:
    # Build ParamSet from class defaults + constructor overrides
    defaults = dict(type(self).params)
    if params:
        unknown = set(params) - set(defaults)
        if unknown:
            raise ValueError(
                f"{type(self).__name__}: unknown parameter(s) "
                f"{sorted(unknown)}. Declared: {sorted(defaults)}"
            )
        defaults.update(params)
    object.__setattr__(self, "params", ParamSet(defaults))

    # dim: constructor arg > class attribute
    resolved_dim = dim if dim is not None else type(self).dim
    object.__setattr__(self, "dim", resolved_dim)

    # Initial conditions
    ic_arr = np.asarray(ic, dtype=float) if ic is not None else None
    object.__setattr__(self, "ic", ic_arr)

    # Metadata store: computed properties (Lyapunov, etc.) accumulate here
    # with history — repeated runs append instead of overwriting.
    object.__setattr__(self, "meta", MetaStore())

copy

copy() -> SystemBase

Return a deep copy with the same class, params, and ic.

The copy has its own independent params and meta stores.

Source code in src/tsdynamics/families/base.py
def copy(self) -> SystemBase:
    """
    Return a deep copy with the same class, params, and ic.

    The copy has its own independent ``params`` and ``meta`` stores.
    """
    return type(self)(
        params=self.params.as_dict(),
        ic=self.ic.copy() if self.ic is not None else None,
    )

with_params

with_params(**overrides) -> SystemBase

Return a new system with some parameters overridden.

Does not mutate self. Designed for parameter sweeps::

for rho in np.linspace(0, 50, 200):
    traj = base_system.with_params(rho=rho).integrate(final_time=50)
PARAMETER DESCRIPTION
**overrides

New parameter values. Keys must exist in params.

DEFAULT: {}

RETURNS DESCRIPTION
SystemBase

New instance of the same subclass.

Source code in src/tsdynamics/families/base.py
def with_params(self, **overrides) -> SystemBase:
    """
    Return a **new** system with some parameters overridden.

    Does not mutate ``self``.  Designed for parameter sweeps::

        for rho in np.linspace(0, 50, 200):
            traj = base_system.with_params(rho=rho).integrate(final_time=50)

    Parameters
    ----------
    **overrides
        New parameter values.  Keys must exist in ``params``.

    Returns
    -------
    SystemBase
        New instance of the same subclass.
    """
    new_p = {**self.params.as_dict(), **overrides}
    return type(self)(params=new_p, ic=self.ic)

resolve_ic

resolve_ic(ic: Any | None = None) -> ndarray

Resolve initial conditions consistently.

Priority:

  1. ic argument (if provided)
  2. self.ic (set by a previous integration / iteration)
  3. type(self).default_ic (class-level default, if declared)
  4. Random U[0, 1)^dim

The resolved IC is stored in self.ic so subsequent calls without an explicit ic reproduce the same initial state.

PARAMETER DESCRIPTION
ic

TYPE: array - like or None DEFAULT: None

RETURNS DESCRIPTION
(ndarray, shape(dim))
Source code in src/tsdynamics/families/base.py
def resolve_ic(self, ic: Any | None = None) -> np.ndarray:
    """
    Resolve initial conditions consistently.

    Priority:

    1. ``ic`` argument (if provided)
    2. ``self.ic`` (set by a previous integration / iteration)
    3. ``type(self).default_ic`` (class-level default, if declared)
    4. Random ``U[0, 1)^dim``

    The resolved IC is stored in ``self.ic`` so subsequent calls without
    an explicit ``ic`` reproduce the same initial state.

    Parameters
    ----------
    ic : array-like or None

    Returns
    -------
    ndarray, shape (dim,)
    """
    if ic is not None:
        arr = np.asarray(ic, dtype=float).reshape(self.dim)
    elif self.ic is not None:
        arr = np.asarray(self.ic, dtype=float).reshape(self.dim)
    elif type(self).default_ic is not None:
        arr = np.asarray(type(self).default_ic, dtype=float).reshape(self.dim)
    else:
        arr = np.random.rand(self.dim)
    object.__setattr__(self, "ic", arr.copy())
    return arr

ContinuousSystem

ContinuousSystem(
    params: dict | None = None,
    ic: Any | None = None,
    dim: int | None = None,
)

Bases: SystemBase, ABC

Base class for ODE-based dynamical systems, compiled via JiTCODE.

Subclass contract
  1. Declare params = {...} and dim = N at class level.
  2. Implement _equations as a @staticmethod returning a length-dim sequence of JiTCODE / SymEngine symbolic expressions.
  3. Optionally mark integer or loop-structural parameters in _structural_params — these are baked into the compiled C code rather than exposed as runtime control parameters.
Compilation

The first call to integrate or lyapunov_spectrum triggers JiTCODE compilation. Non-structural parameters become JiTCODE control_pars, meaning the resulting .so module is compiled once per class (or once per structural-param combination) and reused for all subsequent runs and parameter changes, even across process restarts.

Class-level attributes

_structural_params : frozenset[str] Parameter names that appear as integer loop bounds or affect the symbolic structure of _equations. These are baked in at compile time. For most systems this is empty (the default).

Example — Lorenz96 uses ``N`` to build the list comprehension::

    _structural_params = frozenset({"N"})

_default_method : str Default integrator name (default "RK45").

Examples:

>>> lor = Lorenz()
>>> traj = lor.integrate(final_time=100, dt=0.01)
>>> t, y = traj          # tuple-unpack
>>> lor.sigma = 15.0     # change param — zero recompile cost
>>> traj2 = lor.integrate(final_time=100)
Source code in src/tsdynamics/families/base.py
def __init__(
    self,
    params: dict | None = None,
    ic: Any | None = None,
    dim: int | None = None,
) -> None:
    # Build ParamSet from class defaults + constructor overrides
    defaults = dict(type(self).params)
    if params:
        unknown = set(params) - set(defaults)
        if unknown:
            raise ValueError(
                f"{type(self).__name__}: unknown parameter(s) "
                f"{sorted(unknown)}. Declared: {sorted(defaults)}"
            )
        defaults.update(params)
    object.__setattr__(self, "params", ParamSet(defaults))

    # dim: constructor arg > class attribute
    resolved_dim = dim if dim is not None else type(self).dim
    object.__setattr__(self, "dim", resolved_dim)

    # Initial conditions
    ic_arr = np.asarray(ic, dtype=float) if ic is not None else None
    object.__setattr__(self, "ic", ic_arr)

    # Metadata store: computed properties (Lyapunov, etc.) accumulate here
    # with history — repeated runs append instead of overwriting.
    object.__setattr__(self, "meta", MetaStore())

is_discrete property

is_discrete: bool

ODEs are continuous-time systems.

reinit

reinit(
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
    method: str | None = None,
    rtol: float = 1e-06,
    atol: float = 1e-09,
    **integrator_kwargs,
) -> None

(Re)start the incremental stepper from state u at time t.

PARAMETER DESCRIPTION
u

Initial state (falls back to self.ic, then random).

TYPE: array - like DEFAULT: None

t

Start time (default 0.0).

TYPE: float DEFAULT: None

params

Parameter overrides applied (in place) before restarting.

TYPE: dict DEFAULT: None

method

Stepper configuration, as in :meth:integrate.

TYPE: str | None DEFAULT: None

rtol

Stepper configuration, as in :meth:integrate.

TYPE: str | None DEFAULT: None

atol

Stepper configuration, as in :meth:integrate.

TYPE: str | None DEFAULT: None

**integrator_kwargs

Stepper configuration, as in :meth:integrate.

TYPE: str | None DEFAULT: None

Source code in src/tsdynamics/families/continuous.py
def reinit(
    self,
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
    method: str | None = None,
    rtol: float = 1e-6,
    atol: float = 1e-9,
    **integrator_kwargs,
) -> None:
    """
    (Re)start the incremental stepper from state ``u`` at time ``t``.

    Parameters
    ----------
    u : array-like, optional
        Initial state (falls back to ``self.ic``, then random).
    t : float, optional
        Start time (default 0.0).
    params : dict, optional
        Parameter overrides applied (in place) before restarting.
    method, rtol, atol, **integrator_kwargs
        Stepper configuration, as in :meth:`integrate`.
    """
    if params:
        for k, v in params.items():
            self.params[k] = v
    t0 = float(t) if t is not None else 0.0
    ic_arr = self.resolve_ic(u)

    stepper = self._fresh_stepper()
    m = method or self._default_method
    stepper.set_integrator(_INTEGRATOR_MAP.get(m, m), rtol=rtol, atol=atol, **integrator_kwargs)
    stepper.set_parameters(*self._control_params().values())
    stepper.set_initial_value(ic_arr, t0)

    self._stepper = stepper
    self._state_now = ic_arr.copy()
    self._t_now = t0

step

step(n_or_dt: float | None = None) -> ndarray

Advance the system by dt (default 0.01) and return the new state.

The first call performs an implicit :meth:reinit. Parameter changes made after reinit take effect on the next reinit, not on a live stepper.

Source code in src/tsdynamics/families/continuous.py
def step(self, n_or_dt: float | None = None) -> np.ndarray:
    """
    Advance the system by ``dt`` (default 0.01) and return the new state.

    The first call performs an implicit :meth:`reinit`.  Parameter changes
    made after ``reinit`` take effect on the next ``reinit``, not on a
    live stepper.
    """
    if self._stepper is None:
        self.reinit()
    dt = float(n_or_dt) if n_or_dt is not None else self._default_step_dt
    self._t_now = self._t_now + dt
    state = np.asarray(self._stepper.integrate(self._t_now), dtype=float)
    if not np.isfinite(state).all():
        raise RuntimeError(
            f"{type(self).__name__}: ODE diverged at t={self._t_now:.6g} during step()."
        )
    self._state_now = state.copy()
    return state

state

state() -> ndarray

Return a copy of the current state (implicit reinit if cold).

Source code in src/tsdynamics/families/continuous.py
def state(self) -> np.ndarray:
    """Return a copy of the current state (implicit ``reinit`` if cold)."""
    if self._state_now is None:
        self.reinit()
    return self._state_now.copy()

set_state

set_state(u: Any) -> None

Overwrite the current state without changing the current time.

Source code in src/tsdynamics/families/continuous.py
def set_state(self, u: Any) -> None:
    """Overwrite the current state without changing the current time."""
    u_arr = np.asarray(u, dtype=float).reshape(self.dim)
    if self._stepper is None:
        self.reinit(u_arr)
    else:
        self._stepper.set_initial_value(u_arr, self._t_now)
        self._state_now = u_arr.copy()

time

time() -> float

Return the current stepper time.

Source code in src/tsdynamics/families/continuous.py
def time(self) -> float:
    """Return the current stepper time."""
    return self._t_now

trajectory

trajectory(
    final_time: float = 100.0,
    *,
    dt: float = 0.02,
    transient: float = 0.0,
    **kwargs,
) -> Trajectory

Protocol-uniform trajectory: integrate plus optional transient drop.

Source code in src/tsdynamics/families/continuous.py
def trajectory(
    self,
    final_time: float = 100.0,
    *,
    dt: float = 0.02,
    transient: float = 0.0,
    **kwargs,
) -> Trajectory:
    """Protocol-uniform trajectory: ``integrate`` plus optional transient drop."""
    traj = self.integrate(final_time=transient + final_time, dt=dt, **kwargs)
    return traj.after(transient) if transient > 0 else traj

jacobian_sym

jacobian_sym() -> list[list[Any]]

Return the symbolic Jacobian of _equations, differentiated by SymEngine.

Rows are d f_i / d y(j) for the current structural parameters; non-structural parameters appear as symbols. Hand-written _jacobian methods on system classes are never used at runtime — this autogenerated form is the single source of truth (the test suite cross-checks hand-written ones against it).

RETURNS DESCRIPTION
list of ``dim`` rows, each a list of ``dim`` SymEngine expressions.
Source code in src/tsdynamics/families/continuous.py
def jacobian_sym(self) -> list[list[Any]]:
    """
    Return the symbolic Jacobian of ``_equations``, differentiated by SymEngine.

    Rows are ``d f_i / d y(j)`` for the *current* structural parameters;
    non-structural parameters appear as symbols.  Hand-written
    ``_jacobian`` methods on system classes are never used at runtime —
    this autogenerated form is the single source of truth (the test suite
    cross-checks hand-written ones against it).

    Returns
    -------
    list of ``dim`` rows, each a list of ``dim`` SymEngine expressions.
    """
    import symengine
    from jitcode import t as t_sym
    from jitcode import y

    struct_vals = self._structural_vals()
    control_syms = {k: symengine.Symbol(k) for k in self._control_params()}
    f_sym = list(type(self)._equations(y, t_sym, **{**struct_vals, **control_syms}))
    if len(f_sym) != self.dim:
        raise ValueError(f"_equations must return {self.dim} expressions, got {len(f_sym)}")
    return [
        [_resolve_derivative_nodes(symengine.sympify(fi).diff(y(j))) for j in range(self.dim)]
        for fi in f_sym
    ]

jacobian

jacobian(u: Any, t: float = 0.0) -> ndarray

Evaluate the (autogenerated) Jacobian numerically at state u.

PARAMETER DESCRIPTION
u

State at which to evaluate.

TYPE: (array - like, shape(dim))

t

Time (matters only for non-autonomous systems).

TYPE: float DEFAULT: 0.0

RETURNS DESCRIPTION
(ndarray, shape(dim, dim))
Source code in src/tsdynamics/families/continuous.py
def jacobian(self, u: Any, t: float = 0.0) -> np.ndarray:
    """
    Evaluate the (autogenerated) Jacobian numerically at state ``u``.

    Parameters
    ----------
    u : array-like, shape (dim,)
        State at which to evaluate.
    t : float
        Time (matters only for non-autonomous systems).

    Returns
    -------
    ndarray, shape (dim, dim)
    """
    _, jac_fn, control_names = self._build_lambdified()
    vals = [float(self.params[k]) for k in control_names]
    arg = np.concatenate([np.asarray(u, dtype=float).ravel(), [t], vals])
    return np.asarray(jac_fn(arg), dtype=float).reshape(self.dim, self.dim)

integrate

integrate(
    final_time: float = 100.0,
    dt: float = 0.02,
    *,
    t0: float = 0.0,
    ic: Any | None = None,
    method: str | None = None,
    rtol: float = 1e-06,
    atol: float = 1e-09,
    backend: str | None = None,
    **integrator_kwargs,
) -> Trajectory

Integrate the ODE and return a :class:~tsdynamics.families.Trajectory.

PARAMETER DESCRIPTION
final_time

End of integration window. Default 100.0.

TYPE: float DEFAULT: 100.0

dt

Output sampling interval. The internal stepper is adaptive.

TYPE: float DEFAULT: 0.02

t0

Start time. Default 0.0. Allows warm restarts from a non-zero time (the IC is interpreted as the state at t0).

TYPE: float DEFAULT: 0.0

ic

Initial state at t0. Falls back to self.ic, then U[0, 1)^dim.

TYPE: array - like DEFAULT: None

method

Integrator: "RK45" / "dopri5" (default), "DOP853", "LSODA", "VODE". The diffsol backend maps these onto tsit45 / bdf / tr_bdf2 / esdirk34.

TYPE: str DEFAULT: None

rtol

Solver tolerances (default 1e-6 / 1e-9).

TYPE: float DEFAULT: 1e-06

atol

Solver tolerances (default 1e-6 / 1e-9).

TYPE: float DEFAULT: 1e-06

backend

Where the ODE is integrated. Defaults to _default_backend ("jitcode").

  • "jitcode" — the v2 path: compile the RHS to C.
  • "diffsol" — the Rust solver suite via LLVM JIT — no C compiler, prebuilt wheels (pip install tsdynamics[diffsol]), ~10× faster on small chaotic systems, validated against JiTCODE across the whole ODE catalogue (see :mod:tsdynamics.engine.diffsol).
  • "auto""diffsol" when it is installed, else "jitcode".
  • "interp" / "jit" — the Rust sole engine (the SSA-tape interpreter or the Cranelift JIT) via the shared engine seam (:func:tsdynamics.engine.run.integrate). Requires the compiled extension (:mod:tsdynamics._rust); until it is built these raise :class:~tsdynamics.engine.run.EngineNotAvailableError.
  • "reference" — the dependency-light pure-Python oracle (the lowered tape integrated with SciPy); the engine path's validation backend, usable without the compiled wheel.

**integrator_kwargs apply to the "jitcode" path only.

TYPE: ('jitcode', 'diffsol', 'auto', 'interp', 'jit', 'reference') DEFAULT: "jitcode"

**integrator_kwargs

Forwarded to jitcode.set_integrator (e.g. max_step).

DEFAULT: {}

RETURNS DESCRIPTION
Trajectory

Supports tuple-unpacking: t, y = sys.integrate(...).

Source code in src/tsdynamics/families/continuous.py
def integrate(
    self,
    final_time: float = 100.0,
    dt: float = 0.02,
    *,
    t0: float = 0.0,
    ic: Any | None = None,
    method: str | None = None,
    rtol: float = 1e-6,
    atol: float = 1e-9,
    backend: str | None = None,
    **integrator_kwargs,
) -> Trajectory:
    """
    Integrate the ODE and return a :class:`~tsdynamics.families.Trajectory`.

    Parameters
    ----------
    final_time : float
        End of integration window. Default 100.0.
    dt : float
        Output sampling interval. The internal stepper is adaptive.
    t0 : float
        Start time. Default 0.0. Allows warm restarts from a non-zero
        time (the IC is interpreted as the state at ``t0``).
    ic : array-like, optional
        Initial state at ``t0``. Falls back to ``self.ic``, then
        ``U[0, 1)^dim``.
    method : str, optional
        Integrator: ``"RK45"`` / ``"dopri5"`` (default), ``"DOP853"``,
        ``"LSODA"``, ``"VODE"``.  The diffsol backend maps these onto
        ``tsit45`` / ``bdf`` / ``tr_bdf2`` / ``esdirk34``.
    rtol, atol : float
        Solver tolerances (default 1e-6 / 1e-9).
    backend : {"jitcode", "diffsol", "auto", "interp", "jit", "reference"}, optional
        Where the ODE is integrated.  Defaults to ``_default_backend``
        (``"jitcode"``).

        - ``"jitcode"`` — the v2 path: compile the RHS to C.
        - ``"diffsol"`` — the Rust solver suite via LLVM JIT — no C compiler,
          prebuilt wheels (``pip install tsdynamics[diffsol]``), ~10× faster on
          small chaotic systems, validated against JiTCODE across the whole
          ODE catalogue (see :mod:`tsdynamics.engine.diffsol`).
        - ``"auto"`` — ``"diffsol"`` when it is installed, else ``"jitcode"``.
        - ``"interp"`` / ``"jit"`` — the **Rust sole engine** (the SSA-tape
          interpreter or the Cranelift JIT) via the shared engine seam
          (:func:`tsdynamics.engine.run.integrate`).  Requires the compiled
          extension (:mod:`tsdynamics._rust`); until it is built these raise
          :class:`~tsdynamics.engine.run.EngineNotAvailableError`.
        - ``"reference"`` — the dependency-light pure-Python oracle (the
          lowered tape integrated with SciPy); the engine path's validation
          backend, usable without the compiled wheel.

        ``**integrator_kwargs`` apply to the ``"jitcode"`` path only.
    **integrator_kwargs
        Forwarded to ``jitcode.set_integrator`` (e.g. ``max_step``).

    Returns
    -------
    Trajectory
        Supports tuple-unpacking: ``t, y = sys.integrate(...)``.
    """
    backend = backend if backend is not None else self._default_backend

    if backend in ("interp", "jit", "reference"):
        # The Rust sole engine (or its pure-Python reference) via the one
        # shared engine-dispatch seam — the same path every family routes its
        # engine run through.
        return self._dispatch(
            backend=backend,
            final_time=final_time,
            dt=dt,
            t0=t0,
            ic=ic,
            method=method or self._default_method,
            rtol=rtol,
            atol=atol,
        )

    if backend == "auto":
        # Prefer the zero-compiler Rust path when its optional dependency
        # is installed; otherwise fall back to the always-available
        # JiTCODE path. Lets `tsdynamics[diffsol]` users get the fast
        # backend without naming it, with no surprise for everyone else.
        from tsdynamics.engine import diffsol as _diffsol

        backend = "diffsol" if _diffsol.available() else "jitcode"

    if backend == "diffsol":
        from tsdynamics.engine import diffsol as _diffsol

        ic_arr = self.resolve_ic(ic)
        t_eval, y_out = _diffsol.integrate(
            self, final_time, dt, t0=t0, ic=ic_arr, method=method, rtol=rtol, atol=atol
        )
        return Trajectory(
            t=t_eval,
            y=y_out,
            system=self,
            meta=self._provenance(
                family="ode",
                backend="diffsol",
                method=method or "tsit45",
                dt=dt,
                t0=t0,
                rtol=rtol,
                atol=atol,
                ic=ic_arr.copy(),
            ),
        )
    if backend != "jitcode":
        raise ValueError(
            f"Unknown backend {backend!r}; use 'jitcode', 'diffsol', 'auto', "
            f"'interp', 'jit', or 'reference'."
        )

    method = method or self._default_method
    integ_name = _INTEGRATOR_MAP.get(method, method)
    ic_arr = self.resolve_ic(ic)
    t_eval = make_output_grid(t0, final_time, dt)

    ode = self._ensure_compiled(for_lyap=False)
    ode.set_integrator(integ_name, rtol=rtol, atol=atol, **integrator_kwargs)
    ode.set_parameters(*self._control_params().values())
    ode.set_initial_value(ic_arr, t0)

    y_out = np.empty((t_eval.size, self.dim), dtype=float)
    for k, tk in enumerate(t_eval):
        state = ode.integrate(float(tk))
        if not np.isfinite(state).all():
            raise RuntimeError(
                f"{type(self).__name__}: ODE diverged at t={tk:.6g} — "
                f"state contains non-finite values: {state}"
            )
        y_out[k] = state

    return Trajectory(
        t=t_eval,
        y=y_out,
        system=self,
        meta=self._provenance(
            family="ode",
            method=integ_name,
            dt=dt,
            t0=t0,
            rtol=rtol,
            atol=atol,
            ic=ic_arr.copy(),
        ),
    )

lyapunov_spectrum

lyapunov_spectrum(
    final_time: float = 200.0,
    dt: float = 0.1,
    *,
    ic: Any | None = None,
    n_exp: int | None = None,
    burn_in: float = 50.0,
    method: str | None = None,
    rtol: float = 1e-06,
    atol: float = 1e-09,
    **integrator_kwargs,
) -> ndarray

Estimate the Lyapunov spectrum using :func:jitcode.jitcode_lyap.

Results are stored in self.meta['lyapunov_spectrum'].

PARAMETER DESCRIPTION
final_time

Averaging window length after burn-in. Default 200.0.

TYPE: float DEFAULT: 200.0

dt

Sampling interval for local exponent accumulation. Default 0.1.

TYPE: float DEFAULT: 0.1

ic

Initial state. Falls back to self.ic, then random.

TYPE: array - like DEFAULT: None

n_exp

Number of exponents. Defaults to dim.

TYPE: int DEFAULT: None

burn_in

Discard this much time before averaging. Default 50.0.

TYPE: float DEFAULT: 50.0

method

Integrator (default "RK45").

TYPE: str DEFAULT: None

rtol

Tolerances.

TYPE: float DEFAULT: 1e-06

atol

Tolerances.

TYPE: float DEFAULT: 1e-06

RETURNS DESCRIPTION
(ndarray, shape(n_exp))

Lyapunov exponents ordered from largest to smallest.

Source code in src/tsdynamics/families/continuous.py
def lyapunov_spectrum(
    self,
    final_time: float = 200.0,
    dt: float = 0.1,
    *,
    ic: Any | None = None,
    n_exp: int | None = None,
    burn_in: float = 50.0,
    method: str | None = None,
    rtol: float = 1e-6,
    atol: float = 1e-9,
    **integrator_kwargs,
) -> np.ndarray:
    """
    Estimate the Lyapunov spectrum using :func:`jitcode.jitcode_lyap`.

    Results are stored in ``self.meta['lyapunov_spectrum']``.

    Parameters
    ----------
    final_time : float
        Averaging window length after burn-in. Default 200.0.
    dt : float
        Sampling interval for local exponent accumulation. Default 0.1.
    ic : array-like, optional
        Initial state. Falls back to ``self.ic``, then random.
    n_exp : int, optional
        Number of exponents. Defaults to ``dim``.
    burn_in : float
        Discard this much time before averaging. Default 50.0.
    method : str, optional
        Integrator (default ``"RK45"``).
    rtol, atol : float
        Tolerances.

    Returns
    -------
    ndarray, shape (n_exp,)
        Lyapunov exponents ordered from largest to smallest.
    """
    if n_exp is not None and n_exp <= 0:
        raise ValueError(f"n_exp must be a positive integer, got {n_exp!r}")
    n_exp = n_exp if n_exp is not None else self.dim
    method = method or self._default_method
    integ_name = _INTEGRATOR_MAP.get(method, method)
    ic_arr = self.resolve_ic(ic)

    ode = self._ensure_compiled(for_lyap=True, n_lyap=n_exp)
    ode.set_integrator(integ_name, rtol=rtol, atol=atol, **integrator_kwargs)
    ode.set_parameters(*self._control_params().values())
    ode.set_initial_value(ic_arr, 0.0)

    # Burn-in (discard transient; no exponent accumulation)
    T = 0.0
    while burn_in > T:
        Tn = min(burn_in, T + dt)
        ode.integrate(float(Tn))
        T = Tn

    # Production: time-weighted average of local exponents
    T_end = float(ode.t) + final_time
    weights = []
    ly_steps = []
    T = float(ode.t)

    while T_end > T:
        Tn = min(T_end, T + dt)
        ret = ode.integrate(float(Tn))

        # jitcode_lyap.integrate returns (state, lyapunov_exponents).
        # The fallback "else ret" would silently use the state vector as
        # exponents — always enforce the tuple contract instead.
        if not (isinstance(ret, tuple) and len(ret) >= 2):
            raise RuntimeError(
                f"{type(self).__name__}: jitcode_lyap.integrate returned "
                f"unexpected type {type(ret)!r}; expected (state, lyap_exps) tuple."
            )
        local_lyaps = ret[1]
        v = np.asarray(local_lyaps, float).ravel()
        if v.size != n_exp:
            raise ValueError(f"Expected {n_exp} local LEs, got shape {v.shape}")

        ly_steps.append(v)
        weights.append(Tn - T)
        T = Tn

    W = np.asarray(weights, float)
    L = np.vstack(ly_steps) if ly_steps else np.empty((0, n_exp), float)
    exponents = (W[:, None] * L).sum(axis=0) / W.sum() if L.size else np.zeros(n_exp)

    self.meta.record(
        "lyapunov_spectrum",
        exponents,
        dt=dt,
        final_time=final_time,
        burn_in=burn_in,
        n_exp=n_exp,
        method=integ_name,
    )
    return exponents

DelaySystem

DelaySystem(
    params: dict | None = None,
    ic: Any | None = None,
    dim: int | None = None,
)

Bases: SystemBase, ABC

Base class for delay differential systems (DDEs), compiled via JiTCDDE.

Subclass contract
  1. Declare params = {...} and dim = N.
  2. Implement _equations as a @staticmethod returning a length-dim sequence of JiTCDDE symbolic expressions. Use y(i, t - tau) for delayed state access.
Compilation & caching

DDE systems cache compiled modules per (class, params_hash). Unlike ODEs, delay values directly affect the history-buffer structure and cannot easily be treated as runtime control parameters. The compiled .so is saved to disk so subsequent runs with the same parameters reload instantly.

DDEs typically need looser tolerances than ODEs (start with rtol=atol=1e-3).

History

Pass a history callable h(s) → sequence defining the past for s ≤ 0. If omitted, a constant past equal to ic is used.

.. note:: Provide a non-equilibrium history to avoid trivial Lyapunov exponents. For lyapunov_spectrum, past_from_function is incompatible with jitcdde_lyap; the workaround is to run integrate first with the desired history, then pass the end-state as ic to lyapunov_spectrum which uses constant_past internally.

Examples:

>>> mg = MackeyGlass()
>>> hist = lambda s: [1.0 + 0.1 * np.sin(0.2 * s)]
>>> traj = mg.integrate(final_time=500, history=hist)
>>> exps = mg.lyapunov_spectrum(n_exp=2, ic=traj.y[-1])
Source code in src/tsdynamics/families/base.py
def __init__(
    self,
    params: dict | None = None,
    ic: Any | None = None,
    dim: int | None = None,
) -> None:
    # Build ParamSet from class defaults + constructor overrides
    defaults = dict(type(self).params)
    if params:
        unknown = set(params) - set(defaults)
        if unknown:
            raise ValueError(
                f"{type(self).__name__}: unknown parameter(s) "
                f"{sorted(unknown)}. Declared: {sorted(defaults)}"
            )
        defaults.update(params)
    object.__setattr__(self, "params", ParamSet(defaults))

    # dim: constructor arg > class attribute
    resolved_dim = dim if dim is not None else type(self).dim
    object.__setattr__(self, "dim", resolved_dim)

    # Initial conditions
    ic_arr = np.asarray(ic, dtype=float) if ic is not None else None
    object.__setattr__(self, "ic", ic_arr)

    # Metadata store: computed properties (Lyapunov, etc.) accumulate here
    # with history — repeated runs append instead of overwriting.
    object.__setattr__(self, "meta", MetaStore())

is_discrete property

is_discrete: bool

DDEs are continuous-time systems.

reinit

reinit(
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
    rtol: float | None = None,
    atol: float | None = None,
    **kwargs,
) -> None

(Re)start the incremental stepper from a constant past equal to u.

DDE state is a history function; the protocol restart uses a constant past (the same convention as lyapunov_spectrum). For a custom history, use :meth:integrate with history= and continue from traj.y[-1].

Source code in src/tsdynamics/families/delay.py
def reinit(
    self,
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
    rtol: float | None = None,
    atol: float | None = None,
    **kwargs,
) -> None:
    """
    (Re)start the incremental stepper from a constant past equal to ``u``.

    DDE state is a history *function*; the protocol restart uses a
    constant past (the same convention as ``lyapunov_spectrum``).  For a
    custom history, use :meth:`integrate` with ``history=`` and continue
    from ``traj.y[-1]``.
    """
    from jitcdde import jitcdde as _jitcdde

    if params:
        for k, v in params.items():
            self.params[k] = v
    if t is not None and float(t) != 0.0:
        raise NotImplementedError(
            "DelaySystem.reinit only supports t=0 — JiTCDDE pasts start there."
        )
    ic_arr = self.resolve_ic(u)
    so_path = self._ensure_compiled(for_lyap=False)

    stepper = _jitcdde(
        module_location=str(so_path),
        n=self.dim,
        delays=self._delays(),
        max_delay=self._max_delay(),
        verbose=False,
    )
    stepper.constant_past(ic_arr)
    stepper.set_integration_parameters(
        rtol=rtol if rtol is not None else self._default_rtol,
        atol=atol if atol is not None else self._default_atol,
        **kwargs,
    )
    stepper.initial_discontinuities_handled = True

    self._stepper = stepper
    self._state_now = ic_arr.copy()
    self._t_now = float(stepper.t)

step

step(n_or_dt: float | None = None) -> ndarray

Advance by dt (default 0.1, forward-only) and return the new state.

Source code in src/tsdynamics/families/delay.py
def step(self, n_or_dt: float | None = None) -> np.ndarray:
    """Advance by ``dt`` (default 0.1, forward-only) and return the new state."""
    if self._stepper is None:
        self.reinit()
    dt = float(n_or_dt) if n_or_dt is not None else self._default_step_dt
    self._t_now = self._t_now + dt
    state = np.asarray(self._stepper.integrate(self._t_now), dtype=float)
    if not np.isfinite(state).all():
        raise RuntimeError(
            f"{type(self).__name__}: DDE diverged at t={self._t_now:.6g} during step()."
        )
    self._state_now = state.copy()
    return state

state

state() -> ndarray

Return a copy of the current state (implicit reinit if cold).

Source code in src/tsdynamics/families/delay.py
def state(self) -> np.ndarray:
    """Return a copy of the current state (implicit ``reinit`` if cold)."""
    if self._state_now is None:
        self.reinit()
    return self._state_now.copy()

set_state

set_state(u: Any) -> None

Not available for DDEs — their state is a whole history function.

Source code in src/tsdynamics/families/delay.py
def set_state(self, u: Any) -> None:
    """Not available for DDEs — their state is a whole history function."""
    raise NotImplementedError(
        f"{type(self).__name__}.set_state is impossible for delay systems: the "
        f"instantaneous state is a history function over [t - max_delay, t], not a "
        f"point.  Use reinit(u) to restart from a constant past, or integrate(...) "
        f"with a history callable."
    )

time

time() -> float

Return the current stepper time.

Source code in src/tsdynamics/families/delay.py
def time(self) -> float:
    """Return the current stepper time."""
    return self._t_now

trajectory

trajectory(
    final_time: float = 100.0,
    *,
    dt: float = 0.02,
    transient: float = 0.0,
    **kwargs,
) -> Trajectory

Protocol-uniform trajectory: integrate plus optional transient drop.

Source code in src/tsdynamics/families/delay.py
def trajectory(
    self,
    final_time: float = 100.0,
    *,
    dt: float = 0.02,
    transient: float = 0.0,
    **kwargs,
) -> Trajectory:
    """Protocol-uniform trajectory: ``integrate`` plus optional transient drop."""
    traj = self.integrate(final_time=transient + final_time, dt=dt, **kwargs)
    return traj.after(transient) if transient > 0 else traj

integrate

integrate(
    final_time: float = 100.0,
    dt: float = 0.02,
    *,
    ic: Any | None = None,
    history: History = None,
    rtol: float | None = None,
    atol: float | None = None,
    backend: str | None = None,
    method: str = "rk45",
    **kwargs,
) -> Trajectory

Integrate the DDE and return a :class:~tsdynamics.families.Trajectory.

PARAMETER DESCRIPTION
final_time

Integration end time. Default 100.0.

TYPE: float DEFAULT: 100.0

dt

Output sampling interval.

TYPE: float DEFAULT: 0.02

ic

Used for constant past when history is None. Falls back to self.ic, then random.

TYPE: array - like DEFAULT: None

history

h(s) → sequence of length dim for s ≤ 0. If None, a constant past equal to ic is used.

TYPE: callable DEFAULT: None

rtol

Integration tolerances. DDEs typically need 1e-3; very tight tolerances can stall the solver.

TYPE: float DEFAULT: None

atol

Integration tolerances. DDEs typically need 1e-3; very tight tolerances can stall the solver.

TYPE: float DEFAULT: None

backend

Which engine integrates the DDE. Defaults to _default_backend ("jitcdde", the v2 compiled backend). "interp" / "jit" route — through the shared engine seam (:func:tsdynamics.engine.run.integrate) — to the Rust method-of-steps engine (history ring buffer + cubic-Hermite dense interpolation; stream E-DDE), reusing the explicit solver kernels. The Rust path supports constant delays; a state-dependent delay raises (use "jitcdde" for those until the lowering grows dynamic delay slots). **kwargs are ignored by the engine backends.

TYPE: str DEFAULT: None

method

The explicit kernel for the engine backends ("rk45", "tsit5", "dop853", "rk4"); ignored when backend="jitcdde".

TYPE: str DEFAULT: "rk45"

**kwargs

Forwarded to jitcdde.set_integration_parameters (e.g. max_step, first_step) on the "jitcdde" backend.

DEFAULT: {}

RETURNS DESCRIPTION
Trajectory
Source code in src/tsdynamics/families/delay.py
def integrate(
    self,
    final_time: float = 100.0,
    dt: float = 0.02,
    *,
    ic: Any | None = None,
    history: History = None,
    rtol: float | None = None,
    atol: float | None = None,
    backend: str | None = None,
    method: str = "rk45",
    **kwargs,
) -> Trajectory:
    """
    Integrate the DDE and return a :class:`~tsdynamics.families.Trajectory`.

    Parameters
    ----------
    final_time : float
        Integration end time. Default 100.0.
    dt : float
        Output sampling interval.
    ic : array-like, optional
        Used for constant past when ``history`` is ``None``.
        Falls back to ``self.ic``, then random.
    history : callable, optional
        ``h(s) → sequence`` of length ``dim`` for ``s ≤ 0``.
        If ``None``, a constant past equal to ``ic`` is used.
    rtol, atol : float
        Integration tolerances.  DDEs typically need 1e-3; very tight
        tolerances can stall the solver.
    backend : str, optional
        Which engine integrates the DDE.  Defaults to ``_default_backend``
        (``"jitcdde"``, the v2 compiled backend).  ``"interp"`` / ``"jit"``
        route — through the shared engine seam
        (:func:`tsdynamics.engine.run.integrate`) — to the Rust method-of-steps
        engine (history ring buffer + cubic-Hermite dense interpolation;
        stream E-DDE), reusing the explicit solver kernels.  The Rust path
        supports **constant** delays; a state-dependent delay raises (use
        ``"jitcdde"`` for those until the lowering grows dynamic delay slots).
        ``**kwargs`` are ignored by the engine backends.
    method : str, default "rk45"
        The explicit kernel for the engine backends (``"rk45"``, ``"tsit5"``,
        ``"dop853"``, ``"rk4"``); ignored when ``backend="jitcdde"``.
    **kwargs
        Forwarded to ``jitcdde.set_integration_parameters``
        (e.g. ``max_step``, ``first_step``) on the ``"jitcdde"`` backend.

    Returns
    -------
    Trajectory
    """
    backend = backend if backend is not None else self._default_backend

    if backend not in ("jitcdde", "v2"):
        return self._integrate_engine(
            final_time,
            dt,
            ic=ic,
            history=history,
            rtol=rtol,
            atol=atol,
            backend=backend,
            method=method,
        )

    from jitcdde import jitcdde as _jitcdde

    rtol = rtol if rtol is not None else self._default_rtol
    atol = atol if atol is not None else self._default_atol

    ic_arr = self.resolve_ic(ic)
    t_eval = make_output_grid(0.0, final_time, dt)
    so_path = self._ensure_compiled(for_lyap=False)
    max_delay = self._max_delay()

    # Fresh jitcdde instance each call (DDE objects are stateful —
    # they own a past-buffer that must start clean every integration).
    dde = _jitcdde(
        module_location=str(so_path),
        n=self.dim,
        delays=self._delays(),
        max_delay=max_delay,
        verbose=False,
    )

    # Set past
    if history is None:
        dde.constant_past(ic_arr)
        y0 = ic_arr.copy()
    else:

        def hist_fn(s):
            return np.asarray(history(s), dtype=float).reshape(self.dim)

        dde.past_from_function(hist_fn)
        y0 = hist_fn(0.0)

    dde.set_integration_parameters(rtol=rtol, atol=atol, **kwargs)
    dde.initial_discontinuities_handled = True

    y_out = np.empty((t_eval.size, self.dim), dtype=float)
    y_out[0] = y0
    for k in range(1, t_eval.size):
        y_out[k] = dde.integrate(float(t_eval[k]))

    return Trajectory(
        t=t_eval,
        y=y_out,
        system=self,
        meta=self._provenance(
            family="dde",
            dt=dt,
            rtol=rtol,
            atol=atol,
            ic=ic_arr.copy(),
            history="callable" if history is not None else "constant",
        ),
    )

lyapunov_spectrum

lyapunov_spectrum(
    final_time: float = 200.0,
    dt: float = 0.1,
    *,
    ic: Any | None = None,
    n_exp: int = 1,
    burn_in: float = 50.0,
    rtol: float | None = None,
    atol: float | None = None,
    **kwargs,
) -> ndarray

Estimate the n_exp leading Lyapunov exponents via :class:jitcdde.jitcdde_lyap.

Results are stored in self.meta['lyapunov_spectrum'].

PARAMETER DESCRIPTION
final_time

Averaging window after burn-in. Default 200.0.

TYPE: float DEFAULT: 200.0

dt

Sampling interval. Default 0.1.

TYPE: float DEFAULT: 0.1

ic

Initial state. Provide the end-state of a prior integrate call so the trajectory starts on the attractor (recommended).

TYPE: array - like DEFAULT: None

n_exp

Number of leading exponents to estimate. DDEs have infinitely many; choose consciously. Default 1.

TYPE: int DEFAULT: 1

burn_in

Discard interval. Default 50.0.

TYPE: float DEFAULT: 50.0

rtol

Integration tolerances. Defaults to _default_rtol / _default_atol (both 1e-3). Do not use ODE-style tight values (e.g. 1e-6/1e-9) — DDE solvers are stiff and tight tolerances cause the variational equations to accumulate floating-point garbage before the first Lyapunov renormalisation, producing Inf / NaN exponents.

TYPE: float DEFAULT: None

atol

Integration tolerances. Defaults to _default_rtol / _default_atol (both 1e-3). Do not use ODE-style tight values (e.g. 1e-6/1e-9) — DDE solvers are stiff and tight tolerances cause the variational equations to accumulate floating-point garbage before the first Lyapunov renormalisation, producing Inf / NaN exponents.

TYPE: float DEFAULT: None

Notes

past_from_function is NOT used here because it is incompatible with jitcdde_lyap internally. A constant past from ic is used instead. For best results, pass ic=traj.y[-1] from a prior integrate run — this places the trajectory on the attractor and avoids trivial exponents from equilibrium pasts.

RETURNS DESCRIPTION
(ndarray, shape(n_exp))
Source code in src/tsdynamics/families/delay.py
def lyapunov_spectrum(
    self,
    final_time: float = 200.0,
    dt: float = 0.1,
    *,
    ic: Any | None = None,
    n_exp: int = 1,
    burn_in: float = 50.0,
    rtol: float | None = None,
    atol: float | None = None,
    **kwargs,
) -> np.ndarray:
    """
    Estimate the ``n_exp`` leading Lyapunov exponents via :class:`jitcdde.jitcdde_lyap`.

    Results are stored in ``self.meta['lyapunov_spectrum']``.

    Parameters
    ----------
    final_time : float
        Averaging window after burn-in. Default 200.0.
    dt : float
        Sampling interval. Default 0.1.
    ic : array-like, optional
        Initial state. Provide the end-state of a prior ``integrate``
        call so the trajectory starts on the attractor (recommended).
    n_exp : int
        Number of leading exponents to estimate. DDEs have infinitely
        many; choose consciously. Default 1.
    burn_in : float
        Discard interval. Default 50.0.
    rtol, atol : float, optional
        Integration tolerances.  Defaults to ``_default_rtol`` /
        ``_default_atol`` (both ``1e-3``).  Do **not** use ODE-style
        tight values (e.g. ``1e-6``/``1e-9``) — DDE solvers are stiff and
        tight tolerances cause the variational equations to accumulate
        floating-point garbage before the first Lyapunov renormalisation,
        producing ``Inf`` / ``NaN`` exponents.

    Notes
    -----
    ``past_from_function`` is NOT used here because it is incompatible
    with ``jitcdde_lyap`` internally.  A constant past from ``ic`` is
    used instead.  For best results, pass ``ic=traj.y[-1]`` from a
    prior ``integrate`` run — this places the trajectory on the
    attractor and avoids trivial exponents from equilibrium pasts.

    Returns
    -------
    ndarray, shape (n_exp,)
    """
    from jitcdde import jitcdde_lyap as _jitcdde_lyap

    rtol = rtol if rtol is not None else self._default_rtol
    atol = atol if atol is not None else self._default_atol

    ic_arr = self.resolve_ic(ic)
    so_path = self._ensure_compiled(for_lyap=True, n_lyap=n_exp)
    max_delay = self._max_delay()

    dde = _jitcdde_lyap(
        module_location=str(so_path),
        n=self.dim,
        delays=self._delays(),
        max_delay=max_delay,
        n_lyap=n_exp,
        verbose=False,
    )
    dde.set_integration_parameters(rtol=rtol, atol=atol, **kwargs)
    dde.constant_past(ic_arr)
    dde.step_on_discontinuities()

    # Burn-in
    T_burn = float(dde.t) + max(0.0, burn_in)
    while dde.t < T_burn:
        dde.integrate(min(T_burn, dde.t + dt))

    # Production: weight-averaged local LEs
    T_end = float(dde.t) + final_time
    weights = []
    ly_steps = []

    while dde.t < T_end:
        ret = dde.integrate(min(T_end, dde.t + dt))
        if not isinstance(ret, tuple):
            raise RuntimeError("jitcdde_lyap.integrate did not return a tuple")
        _, local_lyaps, w = ret
        v = np.asarray(local_lyaps, float).ravel()
        if v.size != n_exp:
            raise ValueError(f"Expected {n_exp} local LEs, got {v.shape}")
        weights.append(float(w))
        ly_steps.append(v)

    W = np.asarray(weights, float)
    mask = W > 0.0
    if not mask.any():
        warnings.warn(
            f"{type(self).__name__}.lyapunov_spectrum: every integration "
            f"step returned zero weight from jitcdde_lyap. This means the "
            f"sampling step `dt={dt}` is smaller than the internal step "
            f"size, so no Lyapunov estimates were accumulated. Increase "
            f"`dt` (typical: 0.1–1.0 for DDEs) and rerun.",
            RuntimeWarning,
            stacklevel=2,
        )
        return np.zeros(n_exp)

    L = np.vstack([ly_steps[i] for i, m in enumerate(mask) if m])
    exponents = (W[mask, None] * L).sum(0) / W[mask].sum()

    self.meta.record(
        "lyapunov_spectrum",
        exponents,
        dt=dt,
        final_time=final_time,
        burn_in=burn_in,
        n_exp=n_exp,
    )
    return exponents

DiscreteMap

DiscreteMap(
    params: dict | None = None,
    ic: Any | None = None,
    dim: int | None = None,
)

Bases: SystemBase

Base class for discrete maps with Numba-accelerated iteration.

Subclass contract
  1. Declare params = {...} and dim = N.
  2. Implement _step and _jacobian as @staticjit static methods. Parameters arrive as positional arguments in the order they appear in the class-level params dict.
Compilation

On the first call to iterate, a Numba-compiled loop is built that inlines _step with the current parameter values baked in. This eliminates per-step Python overhead. The compiled loop is cached in _iter_cache per (class, params_hash); changing a parameter triggers a fresh compile on the next call.

Lyapunov spectrum

Computed in a single forward pass via QR decomposition of the Jacobian product — no redundant second iteration over the trajectory.

Examples:

>>> h = Henon()
>>> traj = h.iterate(steps=10_000)
>>> t_idx, X = traj          # tuple-unpack
>>> exps = h.lyapunov_spectrum(steps=5_000)
>>> h_variant = h.with_params(a=1.2)
>>> traj2 = h_variant.iterate(steps=10_000)
Source code in src/tsdynamics/families/base.py
def __init__(
    self,
    params: dict | None = None,
    ic: Any | None = None,
    dim: int | None = None,
) -> None:
    # Build ParamSet from class defaults + constructor overrides
    defaults = dict(type(self).params)
    if params:
        unknown = set(params) - set(defaults)
        if unknown:
            raise ValueError(
                f"{type(self).__name__}: unknown parameter(s) "
                f"{sorted(unknown)}. Declared: {sorted(defaults)}"
            )
        defaults.update(params)
    object.__setattr__(self, "params", ParamSet(defaults))

    # dim: constructor arg > class attribute
    resolved_dim = dim if dim is not None else type(self).dim
    object.__setattr__(self, "dim", resolved_dim)

    # Initial conditions
    ic_arr = np.asarray(ic, dtype=float) if ic is not None else None
    object.__setattr__(self, "ic", ic_arr)

    # Metadata store: computed properties (Lyapunov, etc.) accumulate here
    # with history — repeated runs append instead of overwriting.
    object.__setattr__(self, "meta", MetaStore())

is_discrete property

is_discrete: bool

Maps are discrete-time systems.

reinit

reinit(
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
) -> None

(Re)start stepping from state u at iteration count t.

Source code in src/tsdynamics/families/discrete.py
def reinit(
    self,
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
) -> None:
    """(Re)start stepping from state ``u`` at iteration count ``t``."""
    if params:
        for k, v in params.items():
            self.params[k] = v
    self._state_now = self.resolve_ic(u)
    self._n_now = int(t) if t is not None else 0

step

step(n_or_dt: int | None = None) -> ndarray

Advance n iterations (default 1) and return the new state.

Source code in src/tsdynamics/families/discrete.py
def step(self, n_or_dt: int | None = None) -> np.ndarray:
    """Advance ``n`` iterations (default 1) and return the new state."""
    if self._state_now is None:
        self.reinit()
    if n_or_dt is None:
        n = 1
    else:
        nf = float(n_or_dt)
        if not nf.is_integer() or nf < 1:
            raise ValueError(
                f"{type(self).__name__}.step takes a positive whole number of "
                f"iterations, got {n_or_dt!r} (fractional time steps have no "
                f"meaning for discrete maps)."
            )
        n = int(nf)
    x = self._state_now
    params = self.params.as_tuple()

    iterate_fn = self._get_iterate_fn() if n > 16 else None
    if iterate_fn is not None:
        x = iterate_fn(x.copy(), n)[-1].copy()
    else:
        step_fn = type(self)._step
        for _ in range(n):
            x = np.asarray(step_fn(x, *params), dtype=float).ravel()
    if not np.isfinite(x).all():
        raise RuntimeError(
            f"{type(self).__name__}: map diverged at iteration {self._n_now + n}."
        )
    self._state_now = np.asarray(x, dtype=float).reshape(self.dim)
    self._n_now += n
    return self._state_now.copy()

state

state() -> ndarray

Return a copy of the current state (implicit reinit if cold).

Source code in src/tsdynamics/families/discrete.py
def state(self) -> np.ndarray:
    """Return a copy of the current state (implicit ``reinit`` if cold)."""
    if self._state_now is None:
        self.reinit()
    return self._state_now.copy()

set_state

set_state(u: Any) -> None

Overwrite the current state.

Source code in src/tsdynamics/families/discrete.py
def set_state(self, u: Any) -> None:
    """Overwrite the current state."""
    self._state_now = np.asarray(u, dtype=float).reshape(self.dim)

time

time() -> float

Return the current iteration count.

Source code in src/tsdynamics/families/discrete.py
def time(self) -> float:
    """Return the current iteration count."""
    return float(self._n_now)

trajectory

trajectory(
    steps: int = 1000, *, transient: int = 0, **kwargs
) -> Trajectory

Protocol-uniform trajectory: iterate plus optional transient drop.

Source code in src/tsdynamics/families/discrete.py
def trajectory(
    self,
    steps: int = 1000,
    *,
    transient: int = 0,
    **kwargs,
) -> Trajectory:
    """Protocol-uniform trajectory: ``iterate`` plus optional transient drop."""
    traj = self.iterate(steps=transient + steps, **kwargs)
    return traj[transient:] if transient > 0 else traj

iterate

iterate(
    steps: int = 1000,
    ic: Any | None = None,
    max_retries: int = 10,
    *,
    backend: str | None = None,
) -> Trajectory

Iterate the map for steps steps.

Uses a Numba-compiled loop when available (significant speedup for large steps or high-dim maps). Falls back to a Python loop when Numba is not installed.

PARAMETER DESCRIPTION
steps

Number of iterations. Default 1000.

TYPE: int DEFAULT: 1000

ic

Initial state. Falls back to self.ic, then random.

TYPE: array - like DEFAULT: None

max_retries

Retry with a new random IC if divergence is detected (Numba path only — the engine path raises on divergence instead of retrying).

TYPE: int DEFAULT: 10

backend

Where the iteration runs. Defaults to _default_backend ("numba").

  • "numba" (default) — the in-process Numba/Python loop described above.
  • "reference" — the lowered next-state tape, iterated in pure Python (the dependency-light oracle the engine is validated against).
  • "interp" / "jit" / "auto" — the Rust engine's native map loop (interpreter or Cranelift JIT). Requires the compiled extension (:mod:tsdynamics._rust, stream E7); until it is built these raise :class:~tsdynamics.engine.run.EngineNotAvailableError.

Any non-"numba" backend lowers _step to the engine IR, so it requires a map whose step traces symbolically (see :func:tsdynamics.engine.compile.lower_map); piecewise or numpy-ufunc steps raise :class:~tsdynamics.engine.compile.TapeCompileError.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
Trajectory

t is arange(steps) (integer step indices, not float times).

Source code in src/tsdynamics/families/discrete.py
def iterate(
    self,
    steps: int = 1000,
    ic: Any | None = None,
    max_retries: int = 10,
    *,
    backend: str | None = None,
) -> Trajectory:
    """
    Iterate the map for ``steps`` steps.

    Uses a Numba-compiled loop when available (significant speedup for
    large ``steps`` or high-dim maps).  Falls back to a Python loop
    when Numba is not installed.

    Parameters
    ----------
    steps : int
        Number of iterations. Default 1000.
    ic : array-like, optional
        Initial state. Falls back to ``self.ic``, then random.
    max_retries : int
        Retry with a new random IC if divergence is detected (Numba path
        only — the engine path raises on divergence instead of retrying).
    backend : str, optional
        Where the iteration runs.  Defaults to ``_default_backend``
        (``"numba"``).

        - ``"numba"`` (default) — the in-process Numba/Python loop described
          above.
        - ``"reference"`` — the lowered next-state tape, iterated in pure
          Python (the dependency-light oracle the engine is validated
          against).
        - ``"interp"`` / ``"jit"`` / ``"auto"`` — the Rust engine's native
          map loop (interpreter or Cranelift JIT).  Requires the compiled
          extension (:mod:`tsdynamics._rust`, stream E7); until it is built
          these raise :class:`~tsdynamics.engine.run.EngineNotAvailableError`.

        Any non-``"numba"`` backend lowers ``_step`` to the engine IR, so it
        requires a map whose step traces symbolically (see
        :func:`tsdynamics.engine.compile.lower_map`); piecewise or
        ``numpy``-ufunc steps raise
        :class:`~tsdynamics.engine.compile.TapeCompileError`.

    Returns
    -------
    Trajectory
        ``t`` is ``arange(steps)`` (integer step indices, not float times).
    """
    backend = backend if backend is not None else self._default_backend

    if backend != "numba":
        return self._iterate_engine(steps=steps, ic=ic, backend=backend)

    ic_arr = self.resolve_ic(ic)
    iterate_fn = self._get_iterate_fn()

    for attempt in range(max_retries):
        try:
            if iterate_fn is not None:
                out = iterate_fn(ic_arr.copy(), steps)
            else:
                out = self._iterate_python(ic_arr.copy(), steps)

            if not np.all(np.isfinite(out)):
                bad = np.argmax(~np.isfinite(out).all(axis=1))
                raise ValueError(f"Divergence detected at step {bad}")

            return Trajectory(
                t=np.arange(steps),
                y=out,
                system=self,
                meta=self._provenance(family="map", steps=steps, ic=ic_arr.copy()),
            )

        except ValueError as exc:
            if attempt == max_retries - 1:
                raise
            print(f"Warning: {exc}. Retrying with a new random IC.")
            ic_arr = np.random.rand(self.dim)
            object.__setattr__(self, "ic", ic_arr.copy())

lyapunov_spectrum

lyapunov_spectrum(
    steps: int = 5000,
    ic: Any | None = None,
    n_exp: int | None = None,
    reortho_interval: int = 1,
) -> ndarray

QR-based Lyapunov spectrum.

Computes the spectrum in a single forward pass — the Jacobian is evaluated at each step alongside the trajectory, avoiding the redundant double iteration of the previous implementation.

Results are stored in self.meta['lyapunov_spectrum'].

PARAMETER DESCRIPTION
steps

Number of iterations. Default 5000.

TYPE: int DEFAULT: 5000

ic

Initial state. Falls back to self.ic, then random.

TYPE: array - like DEFAULT: None

n_exp

Number of exponents. Defaults to dim.

TYPE: int DEFAULT: None

reortho_interval

Reorthonormalise every this many steps. Default 1.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
(ndarray, shape(n_exp))
Source code in src/tsdynamics/families/discrete.py
def lyapunov_spectrum(
    self,
    steps: int = 5000,
    ic: Any | None = None,
    n_exp: int | None = None,
    reortho_interval: int = 1,
) -> np.ndarray:
    """
    QR-based Lyapunov spectrum.

    Computes the spectrum in a **single forward pass** — the Jacobian is
    evaluated at each step alongside the trajectory, avoiding the
    redundant double iteration of the previous implementation.

    Results are stored in ``self.meta['lyapunov_spectrum']``.

    Parameters
    ----------
    steps : int
        Number of iterations. Default 5000.
    ic : array-like, optional
        Initial state. Falls back to ``self.ic``, then random.
    n_exp : int, optional
        Number of exponents. Defaults to ``dim``.
    reortho_interval : int
        Reorthonormalise every this many steps. Default 1.

    Returns
    -------
    ndarray, shape (n_exp,)
    """
    n_exp = n_exp or self.dim
    params = self.params.as_tuple()
    step = type(self)._step
    jac = type(self)._jacobian
    max_retries = 10

    # The QR loop can encounter overflow on transient huge iterates before
    # the divergence check triggers; silence the float warnings locally so
    # they don't elevate to errors under strict filterwarnings policies.
    # We track non-finiteness explicitly and treat it as a soft failure.
    for attempt in range(max_retries):
        x = self.resolve_ic(ic if attempt == 0 else None)
        Q = np.eye(self.dim)[:n_exp]
        lyap_sums = np.zeros(n_exp)
        intervals = 0
        failed = False

        with np.errstate(over="ignore", invalid="ignore", divide="ignore"):
            for i in range(steps):
                x = np.asarray(step(x, *params), dtype=float)
                if not np.all(np.isfinite(x)):
                    failed = True
                    break

                J = np.atleast_2d(np.asarray(jac(x, *params), dtype=float))
                if not np.all(np.isfinite(J)):
                    failed = True
                    break

                Q = (J @ Q.T).T
                if not np.all(np.isfinite(Q)):
                    failed = True
                    break

                if (i + 1) % reortho_interval == 0:
                    Q_new, R = np.linalg.qr(Q.T)
                    if not np.all(np.isfinite(Q_new)):
                        failed = True
                        break
                    Q = Q_new.T
                    diag = np.abs(np.diag(R))
                    diag = np.where(
                        diag < np.finfo(float).tiny,
                        np.finfo(float).tiny,
                        diag,
                    )
                    lyap_sums += np.log(diag)
                    intervals += 1

        if not failed and intervals > 0:
            exponents = lyap_sums / (intervals * reortho_interval)
            self.meta.record(
                "lyapunov_spectrum",
                exponents,
                steps=steps,
                n_exp=n_exp,
                reortho_interval=reortho_interval,
            )
            return exponents

        if attempt < max_retries - 1:
            # Clear stored IC so resolve_ic generates a fresh random one.
            object.__setattr__(self, "ic", None)

    raise ValueError(
        f"{type(self).__name__}.lyapunov_spectrum: failed after "
        f"{max_retries} retries — iterates diverge from every tried IC. "
        f"Try a larger `steps` budget or pass an `ic` from a known basin point."
    )

System

Bases: Protocol

Structural type for steppable dynamical systems.

is_discrete property

is_discrete: bool

True for iterated maps (and map-like wrappers such as Poincaré maps).

step

step(n_or_dt: float | int | None = None) -> ndarray

Advance by n iterations (discrete) or dt time (continuous).

Source code in src/tsdynamics/families/protocol.py
def step(self, n_or_dt: float | int | None = None) -> np.ndarray:
    """Advance by ``n`` iterations (discrete) or ``dt`` time (continuous)."""
    ...

state

state() -> ndarray

Return a copy of the current state vector.

Source code in src/tsdynamics/families/protocol.py
def state(self) -> np.ndarray:
    """Return a copy of the current state vector."""
    ...

set_state

set_state(u: Any) -> None

Overwrite the current state (not available for DDEs).

Source code in src/tsdynamics/families/protocol.py
def set_state(self, u: Any) -> None:
    """Overwrite the current state (not available for DDEs)."""
    ...

time

time() -> float

Return the current time (continuous) or iteration count (discrete).

Source code in src/tsdynamics/families/protocol.py
def time(self) -> float:
    """Return the current time (continuous) or iteration count (discrete)."""
    ...

reinit

reinit(
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
) -> None

Restart the stepper from state u at time t.

Source code in src/tsdynamics/families/protocol.py
def reinit(
    self,
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
) -> None:
    """Restart the stepper from state ``u`` at time ``t``."""
    ...

trajectory

trajectory(*args: Any, **kwargs: Any) -> Trajectory

Produce a trajectory on a uniform output grid.

Source code in src/tsdynamics/families/protocol.py
def trajectory(self, *args: Any, **kwargs: Any) -> Trajectory:
    """Produce a trajectory on a uniform output grid."""
    ...