Skip to content

Reference

Derived systems

Wrappers that re-present an existing system through a new lens while keeping the System protocol intact, so analysis functions compose with them transparently. Prose introduction: the mental model.

PoincareMap

PoincareMap(
    system: Any,
    plane: tuple,
    *,
    direction: int = +1,
    dt: float = 0.01,
    max_time: float = 10000.0,
)

Bases: DerivedSystem

Present a flow as the discrete map of its crossings through a hyperplane.

One step() advances the underlying system until the trajectory crosses the section plane in the chosen direction, refines the crossing point by cubic Hermite interpolation of the bracketing samples (using the system's numeric RHS for endpoint derivatives — O(dt⁴) accuracy), and returns the full-dimensional crossing state.

Because a PoincareMap is a discrete system, everything written for maps applies to flows through it — e.g. an orbit diagram over a PoincareMap is a bifurcation diagram of the flow.

PARAMETER DESCRIPTION
system

A continuous-time system (ODE or DDE).

TYPE: System

plane

Either (i, c) — the section y_i = c — or (normal, offset) with an arbitrary normal vector, for the section normal · y = offset.

TYPE: tuple

direction

Count only crossings with d(normal·y)/dt > 0 (+1, default), < 0 (-1), or both (0).

TYPE: (+1, -1, 0) DEFAULT: +1

dt

March step used for crossing detection. The refinement makes the crossing itself far more accurate than dt; this only needs to be small enough not to skip crossings.

TYPE: float DEFAULT: 0.01

max_time

Raise if no crossing is found within this much time (e.g. the plane misses the attractor).

TYPE: float DEFAULT: 10000.0

Examples:

>>> pmap = PoincareMap(Rossler(), plane=(0, 0.0), direction=+1)
>>> section = pmap.trajectory(500)         # 500 crossings
>>> section.y.shape
(500, 3)
Source code in src/tsdynamics/derived/poincare.py
def __init__(
    self,
    system: Any,
    plane: tuple,
    *,
    direction: int = +1,
    dt: float = 0.01,
    max_time: float = 1e4,
) -> None:
    super().__init__(system)
    normal, offset = self._parse_plane(system.dim, plane)
    self.plane = plane
    self._normal = normal
    self._offset = offset
    self.direction = int(np.sign(direction))
    self.dt = float(dt)
    self.max_time = float(max_time)

    # Numeric RHS for Hermite endpoint derivatives; falls back to linear
    # interpolation (O(dt²)) for systems without one (e.g. DDEs).
    self._rhs = system._rhs_numeric() if hasattr(system, "_rhs_numeric") else None

    self._u_cross: np.ndarray | None = None
    self._t_cross: float | None = None
    self._n_cross = 0

is_discrete property

is_discrete: bool

A Poincaré map is a discrete view of the flow.

crossing_count property

crossing_count: int

Return the number of crossings recorded so far.

step

step(n_or_dt: int | None = None) -> ndarray

Advance to the n-th next crossing and return it (full-dim coords).

Source code in src/tsdynamics/derived/poincare.py
def step(self, n_or_dt: int | None = None) -> np.ndarray:
    """Advance to the ``n``-th next crossing and return it (full-dim coords)."""
    n = int(n_or_dt) if n_or_dt is not None else 1
    for _ in range(n):
        self._advance_to_crossing()
    return self._u_cross.copy()

state

state() -> ndarray

Return the last crossing point (or the inner state before any crossing).

Source code in src/tsdynamics/derived/poincare.py
def state(self) -> np.ndarray:
    """Return the last crossing point (or the inner state before any crossing)."""
    if self._u_cross is not None:
        return self._u_cross.copy()
    return self.system.state()

set_state

set_state(u: Any) -> None

Overwrite the inner flow state and reset crossing bookkeeping.

Source code in src/tsdynamics/derived/poincare.py
def set_state(self, u: Any) -> None:
    """Overwrite the inner flow state and reset crossing bookkeeping."""
    self.system.set_state(u)
    self._u_cross = None
    self._t_cross = None

time

time() -> float

Return the continuous time of the last crossing (inner time before any).

Source code in src/tsdynamics/derived/poincare.py
def time(self) -> float:
    """Return the continuous time of the last crossing (inner time before any)."""
    return self._t_cross if self._t_cross is not None else self.system.time()

reinit

reinit(u: Any | None = None, **kwargs: Any) -> None

Restart the inner flow and clear crossing bookkeeping.

Source code in src/tsdynamics/derived/poincare.py
def reinit(self, u: Any | None = None, **kwargs: Any) -> None:
    """Restart the inner flow and clear crossing bookkeeping."""
    self.system.reinit(u, **kwargs)
    # Parameter values are baked into the numeric RHS — rebuild it so the
    # Hermite refinement matches the (possibly re-parametrized) dynamics.
    if hasattr(self.system, "_rhs_numeric"):
        self._rhs = self.system._rhs_numeric()
    self._u_cross = None
    self._t_cross = None
    self._n_cross = 0

trajectory

trajectory(
    steps: int = 100, *, transient: int = 0, **kwargs: Any
) -> Trajectory

Collect crossings as a trajectory.

t holds the continuous crossing times; y the full-dimensional crossing states. transient crossings are discarded first.

Source code in src/tsdynamics/derived/poincare.py
def trajectory(self, steps: int = 100, *, transient: int = 0, **kwargs: Any) -> Trajectory:
    """
    Collect crossings as a trajectory.

    ``t`` holds the continuous crossing times; ``y`` the full-dimensional
    crossing states.  ``transient`` crossings are discarded first.
    """
    if kwargs:
        self.reinit(kwargs.pop("ic", None), **kwargs)
    for _ in range(transient):
        self._advance_to_crossing()
    times = np.empty(steps)
    points = np.empty((steps, self.system.dim))
    for k in range(steps):
        self._advance_to_crossing()
        times[k] = self._t_cross
        points[k] = self._u_cross
    meta = {
        "derived": "PoincareMap",
        "plane": self.plane,
        "direction": self.direction,
        "dt": self.dt,
        "system": type(self.system).__name__,
        "params": self.params.as_dict(),
    }
    return Trajectory(t=times, y=points, system=self.system, meta=meta)

StroboscopicMap

StroboscopicMap(system: Any, period: float)

Bases: DerivedSystem

Present a forced flow as the discrete map of once-per-period samples.

One step() advances the underlying continuous system by exactly one forcing period and returns the new state. Orbit diagrams over a StroboscopicMap are the standard way to study forced oscillators (Duffing, forced van der Pol, ...).

PARAMETER DESCRIPTION
system

A continuous-time system.

TYPE: System

period

Sampling period (the forcing period).

TYPE: float

Examples:

>>> smap = StroboscopicMap(ForcedVanDerPol(), period=2 * np.pi / 0.63)
>>> samples = smap.trajectory(300, transient=100)
Source code in src/tsdynamics/derived/stroboscopic.py
def __init__(self, system: Any, period: float) -> None:
    super().__init__(system)
    if period <= 0:
        raise ValueError(f"period must be positive, got {period}")
    self.period = float(period)

is_discrete property

is_discrete: bool

A stroboscopic map is a discrete view of the flow.

step

step(n_or_dt: int | None = None) -> ndarray

Advance n periods (default 1) and return the new state.

Source code in src/tsdynamics/derived/stroboscopic.py
def step(self, n_or_dt: int | None = None) -> np.ndarray:
    """Advance ``n`` periods (default 1) and return the new state."""
    n = int(n_or_dt) if n_or_dt is not None else 1
    return self.system.step(n * self.period)

time

time() -> float

Return the inner flow time.

Source code in src/tsdynamics/derived/stroboscopic.py
def time(self) -> float:
    """Return the inner flow time."""
    return self.system.time()

trajectory

trajectory(
    steps: int = 100, *, transient: int = 0, **kwargs: Any
) -> Trajectory

Collect steps once-per-period samples (after transient periods).

Source code in src/tsdynamics/derived/stroboscopic.py
def trajectory(self, steps: int = 100, *, transient: int = 0, **kwargs: Any) -> Trajectory:
    """Collect ``steps`` once-per-period samples (after ``transient`` periods)."""
    if kwargs:
        self.reinit(kwargs.pop("ic", None), **kwargs)
    if transient:
        self.system.step(transient * self.period)
    times = np.empty(steps)
    points = np.empty((steps, self.system.dim))
    for k in range(steps):
        points[k] = self.step()
        times[k] = self.system.time()
    meta = {
        "derived": "StroboscopicMap",
        "period": self.period,
        "system": type(self.system).__name__,
        "params": self.params.as_dict(),
    }
    return Trajectory(t=times, y=points, system=self.system, meta=meta)

TangentSystem

TangentSystem(system: Any, k: int | None = None)

Bases: DerivedSystem

Evolve a system together with k deviation (tangent) vectors.

Each step() advances the state and the deviation vectors, then QR-reorthonormalises; :meth:growths exposes the per-step logarithmic stretch factors log |diag R| and :meth:exponents their running time-average — the Lyapunov spectrum estimate.

Implementation per family
  • Maps: pure NumPy — W ← J(x)·W with the hand-written/compiled _jacobian, then QR.
  • ODEs: a thin wrapper over the compiled variational equations (jitcode_lyap), which JiTCODE builds by symbolic differentiation — no Python-side Jacobian evaluation in the hot loop.
  • DDEs: not supported — tangent dynamics of a DDE lives in an infinite-dimensional history space; use DelaySystem.lyapunov_spectrum which wraps jitcdde_lyap.

Examples:

>>> tang = TangentSystem(Henon(), k=2)
>>> tang.reinit([0.1, 0.1])
>>> for _ in range(5000):
...     tang.step()
>>> tang.exponents()        # ≈ [0.42, -1.62]
Source code in src/tsdynamics/derived/tangent.py
def __init__(self, system: Any, k: int | None = None) -> None:
    if isinstance(system, DelaySystem):
        raise NotImplementedError(
            "TangentSystem does not support delay systems — their tangent space is the "
            "infinite-dimensional history space. Use DelaySystem.lyapunov_spectrum."
        )
    if isinstance(system, DiscreteMap):
        self._mode = "map"
    elif isinstance(system, ContinuousSystem):
        self._mode = "ode"
    else:
        raise TypeError(
            f"TangentSystem needs a DiscreteMap or ContinuousSystem, "
            f"got {type(system).__name__}."
        )
    super().__init__(system)
    self.k = int(k) if k is not None else system.dim
    if not 1 <= self.k <= system.dim:
        raise ValueError(f"k must be in [1, {system.dim}], got {self.k}")

    self._W: np.ndarray | None = None  # (dim, k) deviation vectors (map mode)
    self._ode_lyap: Any = None  # jitcode_lyap wrapper (ode mode)
    self._t = 0.0
    self._last_growths = np.zeros(self.k)
    self._sum_growths = np.zeros(self.k)
    self._elapsed = 0.0

is_discrete property

is_discrete: bool

Match the wrapped system's time semantics.

reinit

reinit(
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
    **kwargs: Any,
) -> None

Restart state, deviation vectors, and accumulated growth sums.

Source code in src/tsdynamics/derived/tangent.py
def reinit(
    self,
    u: Any | None = None,
    *,
    t: float | None = None,
    params: dict | None = None,
    **kwargs: Any,
) -> None:
    """Restart state, deviation vectors, and accumulated growth sums."""
    self._last_growths = np.zeros(self.k)
    self._sum_growths = np.zeros(self.k)
    self._elapsed = 0.0

    if self._mode == "map":
        self.system.reinit(u, t=t, params=params)
        self._W = np.eye(self.system.dim)[:, : self.k]
        return

    # ode mode — fresh jitcode_lyap from the cached compiled module
    if params:
        for key, value in params.items():
            self.params[key] = value
    ic_arr = self.system.resolve_ic(u)
    t0 = float(t) if t is not None else 0.0

    method = kwargs.pop("method", None) or self.system._default_method
    rtol = kwargs.pop("rtol", 1e-6)
    atol = kwargs.pop("atol", 1e-9)
    from tsdynamics.families.continuous import _INTEGRATOR_MAP

    ode = self.system._ensure_compiled(for_lyap=True, n_lyap=self.k)
    ode.set_integrator(_INTEGRATOR_MAP.get(method, method), rtol=rtol, atol=atol, **kwargs)
    ode.set_parameters(*self.system._control_params().values())
    ode.set_initial_value(ic_arr, t0)
    self._ode_lyap = ode
    self._t = t0

step

step(n_or_dt: float | None = None) -> ndarray

Advance state + deviation vectors and reorthonormalise.

For maps n_or_dt is the number of iterations (QR after each); for ODEs it is the time increment (default 0.1). Returns the new state.

Source code in src/tsdynamics/derived/tangent.py
def step(self, n_or_dt: float | None = None) -> np.ndarray:
    """
    Advance state + deviation vectors and reorthonormalise.

    For maps ``n_or_dt`` is the number of iterations (QR after each);
    for ODEs it is the time increment (default 0.1).
    Returns the new state.
    """
    if self._mode == "map":
        return self._step_map(int(n_or_dt) if n_or_dt is not None else 1)
    return self._step_ode(float(n_or_dt) if n_or_dt is not None else self._ODE_DEFAULT_DT)

state

state() -> ndarray

Return a copy of the current base-system state.

Source code in src/tsdynamics/derived/tangent.py
def state(self) -> np.ndarray:
    """Return a copy of the current base-system state."""
    if self._mode == "map":
        return self.system.state()
    if self._ode_lyap is None:
        self.reinit()
    return np.asarray(self._ode_lyap.y[: self.system.dim], dtype=float).copy()

set_state

set_state(u: Any) -> None

Overwrite the base state (map mode only — ODE tangent vectors would desync).

Source code in src/tsdynamics/derived/tangent.py
def set_state(self, u: Any) -> None:
    """Overwrite the base state (map mode only — ODE tangent vectors would desync)."""
    if self._mode == "map":
        self.system.set_state(u)
    else:
        raise NotImplementedError(
            "TangentSystem(ode).set_state is not supported — the tangent vectors "
            "would desynchronise. Use reinit(u)."
        )

time

time() -> float

Return the current time / iteration count.

Source code in src/tsdynamics/derived/tangent.py
def time(self) -> float:
    """Return the current time / iteration count."""
    return self.system.time() if self._mode == "map" else self._t

deviations

deviations() -> ndarray

Return the current orthonormal deviation vectors, shape (dim, k) (maps only).

Source code in src/tsdynamics/derived/tangent.py
def deviations(self) -> np.ndarray:
    """Return the current orthonormal deviation vectors, shape ``(dim, k)`` (maps only)."""
    if self._mode != "map" or self._W is None:
        raise RuntimeError("deviations() is available for map mode after reinit()")
    return self._W.copy()

growths

growths() -> ndarray

Return the log stretch factors log|diag R| from the most recent step.

Source code in src/tsdynamics/derived/tangent.py
def growths(self) -> np.ndarray:
    """Return the log stretch factors ``log|diag R|`` from the most recent step."""
    return self._last_growths.copy()

exponents

exponents() -> ndarray

Return the running Lyapunov-spectrum estimate (accumulated growths / elapsed).

Source code in src/tsdynamics/derived/tangent.py
def exponents(self) -> np.ndarray:
    """Return the running Lyapunov-spectrum estimate (accumulated growths / elapsed)."""
    if self._elapsed == 0.0:
        return np.zeros(self.k)
    return self._sum_growths / self._elapsed

EnsembleSystem

EnsembleSystem(system: Any, states: Any)

Many copies of one system, advanced synchronously from different states.

Used for two-trajectory Lyapunov estimates, basin sampling, and ensemble statistics. Members are independent copies — parameters are shared at construction, states are per-member.

PARAMETER DESCRIPTION
system

The template system (copied per member; the original is untouched).

TYPE: System

states

One initial state per member.

TYPE: (array - like, shape(m, dim))

Examples:

>>> ens = EnsembleSystem(Lorenz(), [[1, 1, 1], [1.001, 1, 1]])
>>> ens.step(0.01)
array([[...], [...]])
Source code in src/tsdynamics/derived/ensemble.py
def __init__(self, system: Any, states: Any) -> None:
    states_arr = np.atleast_2d(np.asarray(states, dtype=float))
    if states_arr.shape[1] != system.dim:
        raise ValueError(f"states must have shape (m, {system.dim}), got {states_arr.shape}")
    self.template = system
    self.members = []
    for s in states_arr:
        member = system.copy()
        member.reinit(s)
        self.members.append(member)

size property

size: int

Number of ensemble members.

dim property

dim: int

State-space dimension of each member.

is_discrete property

is_discrete: bool

Match the template system's time semantics.

step

step(n_or_dt: float | int | None = None) -> ndarray

Advance every member and return the stacked states, shape (m, dim).

Source code in src/tsdynamics/derived/ensemble.py
def step(self, n_or_dt: float | int | None = None) -> np.ndarray:
    """Advance every member and return the stacked states, shape (m, dim)."""
    return np.array([m.step(n_or_dt) for m in self.members])

states

states() -> ndarray

Return the current states, shape (m, dim).

Source code in src/tsdynamics/derived/ensemble.py
def states(self) -> np.ndarray:
    """Return the current states, shape (m, dim)."""
    return np.array([m.state() for m in self.members])

set_states

set_states(states: Any) -> None

Overwrite every member's state.

Source code in src/tsdynamics/derived/ensemble.py
def set_states(self, states: Any) -> None:
    """Overwrite every member's state."""
    states_arr = np.atleast_2d(np.asarray(states, dtype=float))
    if states_arr.shape != (self.size, self.dim):
        raise ValueError(f"expected shape {(self.size, self.dim)}, got {states_arr.shape}")
    for member, s in zip(self.members, states_arr, strict=True):
        member.set_state(s)

time

time() -> float

Return the common member time.

Source code in src/tsdynamics/derived/ensemble.py
def time(self) -> float:
    """Return the common member time."""
    return self.members[0].time() if self.members else 0.0

ProjectedSystem

ProjectedSystem(
    system: Any,
    components: Any,
    *,
    complete: Callable[[ndarray], Any] | None = None,
)

Bases: DerivedSystem

View a system through a subset of its components.

The full system is stepped underneath; only state()/step() outputs are projected. set_state needs the inverse direction and therefore requires a complete callable mapping a projected state back to a full state.

PARAMETER DESCRIPTION
system

The full system.

TYPE: System

components

Component indices (or names, when the system declares variables).

TYPE: sequence of int or str

complete

complete(u_projected) -> u_full used by set_state/reinit when given projected-dimensional inputs.

TYPE: callable DEFAULT: None

Examples:

>>> proj = ProjectedSystem(Lorenz(), ["x", "z"])
>>> proj.step(0.01).shape
(2,)
Source code in src/tsdynamics/derived/projected.py
def __init__(
    self,
    system: Any,
    components: Any,
    *,
    complete: Callable[[np.ndarray], Any] | None = None,
) -> None:
    super().__init__(system)
    names = getattr(type(system), "variables", None)
    idx = []
    for c in components:
        if isinstance(c, str):
            if names is None:
                raise ValueError(
                    f"{type(system).__name__} declares no `variables`; "
                    f"use integer component indices."
                )
            idx.append(names.index(c))
        else:
            idx.append(int(c))
    if not idx:
        raise ValueError("components must be non-empty")
    self.components = tuple(idx)
    self.complete = complete

dim property

dim: int

Dimension of the projected view.

variables property

variables: tuple[str, ...] | None

Component names of the projected view (the inner names, subset).

Overrides :class:DerivedSystem's pass-through (which would return the inner system's full names and mislabel the projected columns). Returns None when the inner system declares no variables.

step

step(n_or_dt: float | int | None = None) -> ndarray

Advance the full system; return the projected new state.

Source code in src/tsdynamics/derived/projected.py
def step(self, n_or_dt: float | int | None = None) -> np.ndarray:
    """Advance the full system; return the projected new state."""
    return self.system.step(n_or_dt)[list(self.components)]

state

state() -> ndarray

Return the projected current state.

Source code in src/tsdynamics/derived/projected.py
def state(self) -> np.ndarray:
    """Return the projected current state."""
    return self.system.state()[list(self.components)]

set_state

set_state(u: Any) -> None

Overwrite the state (projected inputs need a complete callable).

Source code in src/tsdynamics/derived/projected.py
def set_state(self, u: Any) -> None:
    """Overwrite the state (projected inputs need a ``complete`` callable)."""
    u_arr = np.asarray(u, dtype=float)
    if u_arr.size == self.system.dim:
        self.system.set_state(u_arr)
        return
    if self.complete is None:
        raise NotImplementedError(
            "ProjectedSystem.set_state with a projected-dimensional state needs a "
            "`complete=` callable to reconstruct the full state."
        )
    self.system.set_state(np.asarray(self.complete(u_arr), dtype=float))

reinit

reinit(u: Any | None = None, **kwargs: Any) -> None

Restart the full system (projected inputs need a complete callable).

Source code in src/tsdynamics/derived/projected.py
def reinit(self, u: Any | None = None, **kwargs: Any) -> None:
    """Restart the full system (projected inputs need a ``complete`` callable)."""
    if u is not None:
        u_arr = np.asarray(u, dtype=float)
        if u_arr.size != self.system.dim:
            if self.complete is None:
                raise NotImplementedError(
                    "ProjectedSystem.reinit with a projected-dimensional state needs "
                    "a `complete=` callable."
                )
            u = np.asarray(self.complete(u_arr), dtype=float)
    self.system.reinit(u, **kwargs)

trajectory

trajectory(*args: Any, **kwargs: Any) -> Trajectory

Full-system trajectory with projected columns.

Source code in src/tsdynamics/derived/projected.py
def trajectory(self, *args: Any, **kwargs: Any) -> Trajectory:
    """Full-system trajectory with projected columns."""
    traj = self.system.trajectory(*args, **kwargs)
    meta = {**traj.meta, "projected": self.components}
    # Back-reference ``self`` (not the inner system): the returned ``y`` holds
    # only the projected columns, and ``self.variables`` names exactly those —
    # so ``traj["x"]`` resolves to the right column and an unknown name raises
    # KeyError rather than silently mislabelling or IndexError-ing.
    return Trajectory(traj.t, traj.y[:, list(self.components)], self, meta=meta)

DerivedSystem

DerivedSystem(system: Any)

Base for wrappers that present an existing system through a new lens.

A derived system implements the :class:~tsdynamics.families.System protocol by delegating to a wrapped system, transforming what "one step" or "the state" means (Poincaré crossings, stroboscopic samples, projections...).

Parameters and metadata are forwarded to the wrapped system, and with_params re-parametrizes the inner system and rebuilds the wrapper, so parameter sweeps compose: an orbit diagram over a PoincareMap is a bifurcation diagram of the underlying flow.

Source code in src/tsdynamics/derived/_base.py
def __init__(self, system: Any) -> None:
    self.system = system

with_params

with_params(**overrides: Any) -> DerivedSystem

Return a new wrapper of the same kind around a re-parametrized copy.

Source code in src/tsdynamics/derived/_base.py
def with_params(self, **overrides: Any) -> DerivedSystem:
    """Return a new wrapper of the same kind around a re-parametrized copy."""
    return self._rebuild(self.system.with_params(**overrides))

copy

copy() -> DerivedSystem

Return a new wrapper of the same kind around a copy of the inner system.

Source code in src/tsdynamics/derived/_base.py
def copy(self) -> DerivedSystem:
    """Return a new wrapper of the same kind around a copy of the inner system."""
    return self._rebuild(self.system.copy())