Skip to content

CompositeCost

CompositeCost(alssms, segments, F=None, betas=None, label='n/a')

Bases: BaseCost, BaseCost1d


              flowchart TD
              lmlib.statespace.cost.CompositeCost[CompositeCost]
              lmlib.statespace.cost.BaseCost[BaseCost]
              lmlib.statespace.cost.BaseCost1d[BaseCost1d]

                              lmlib.statespace.cost.BaseCost --> lmlib.statespace.cost.CompositeCost
                
                lmlib.statespace.cost.BaseCost1d --> lmlib.statespace.cost.CompositeCost
                


              click lmlib.statespace.cost.CompositeCost href "" "lmlib.statespace.cost.CompositeCost"
              click lmlib.statespace.cost.BaseCost href "" "lmlib.statespace.cost.BaseCost"
              click lmlib.statespace.cost.BaseCost1d href "" "lmlib.statespace.cost.BaseCost1d"
            

Quadratic cost function defined by one or more ALSSMs and one or more Segments.

A CompositeCost combines multiple ALSSM models and multiple Segments in a grid, where each row corresponds to one ALSSM and each column to one Segment. The mapping matrix F enables or disables each ALSSM/Segment pair at each grid node; multiple active ALSSMs in one column are superimposed.

A CompositeCost is an implementation of [Wildhaber2019] PDF (Composite Cost)

====================
Class: CompositeCost (= M ALSSM's + P Segment's)
====================


M : number of ALSSM models
^
|                Segment[0]   Segment[1]    ...   Segment[P-1]
|              +------------+------------+--//--+--------------+
|  alssm[0]    |  F[0,0]    |            |      | F[0,P-1]     |
|              +------------+------------+--//--+--------------+
|    .         |     .      |     ...    |      |    .         |
|    .         |     .      |     ...    |      |    .         |
|              +------------+------------+--//--+--------------+
|  alssm[M-1]  |  F[M-1,0]  |            |      | F[M-1, P-1]  |
|              +------------+------------+--//--+--------------+``

               |------------|-------|----|------|--------------|--> k (time)
                                    0
                        (0: common relative reference
                         index for all segments)

P : number of segments

F[m,p] in R+, scalar weight on alssm m's output in segment p.
(0 for an inactive grid node; any positive scalar for an active node.)

This figure shows the internal relationships between Segments, ALSSMs, and the mapping matrix F.

For more details, see Chapter 9 in [Wildhaber2019]. The cost function of a composite cost is defined as

\[ J(k, x, \Theta) = \sum_{p = 1}^{P}\beta_p J_{a_p}^{b_p}(k, x, \theta_p) \ , \]

where, \(\Theta = (\theta_1, \theta_2,\dots, \theta_P)\) and the segment scalars \(\beta_p \in \mathbb{R}_+\).

Parameters:

  • alssms (iterable of ModelBase (length M), or a single ModelBase) –

    Set of M ALSSM models. A single ALSSM may be passed directly instead of wrapping it in an iterable (it is treated as M = 1).

  • segments (iterable of Segment (length P), or a single Segment) –

    Set of P Segments. A single Segment may be passed directly instead of wrapping it in an iterable (it is treated as P = 1).

  • F (array_like of shape (M, P), default: None ) –

    Mapping matrix. F[m, p] is the scalar weight of ALSSM m in Segment p. Set to 0 to disable a grid node. If omitted, F defaults to np.ones((M, P)); this default is only allowed when there is a single ALSSM (M = 1) or a single Segment (P = 1). With at least two ALSSMs and two Segments, F is mandatory.

  • betas (array_like of shape (P,), default: None ) –

    Per-segment scaling factors \(\beta_p\). Default: all ones.

  • label (str, default: 'n/a' ) –

    Label of this CompositeCost. Default: 'n/a'.

Examples:

>>> import lmlib as lm
>>>
>>> alssm_spike = lm.AlssmPoly(poly_degree=3, label='spike')
>>> alssm_baseline = lm.AlssmPoly(poly_degree=2, label='baseline')
>>>
>>> segment_left = lm.Segment(a=-50, b=-1, direction=lm.FORWARD, g=20, label="finite left")
>>> segment_middle = lm.Segment(a=0, b=10, direction=lm.FORWARD, g=100, label="finite middle")
>>> segment_right = lm.Segment(a=10, b=50, direction=lm.FORWARD, g=20, delta=10, label="finite right")
>>>
>>> F = [[0, 1, 0], [1, 1, 1]]
>>> cost = lm.CompositeCost((alssm_spike, alssm_baseline), (segment_left, segment_middle, segment_right), F, label='spike_baseline')
>>> print(cost)
CompositeCost(label=spike_baseline)
  └- ['AlssmPoly(A=..., C=..., label=spike)', 'AlssmPoly(A=..., C=..., label=baseline)'],
  └- [Segment(a=-50, ..., label=finite left), Segment(a=0, ..., label=finite middle), Segment(a=10, ..., label=finite right)]

A single ALSSM and a single Segment may be passed directly; F then defaults to np.ones((1, 1)).

>>> cost = lm.CompositeCost(lm.AlssmPoly(poly_degree=3), lm.Segment(0, 200, lm.BW, 100))

Methods:

Attributes:

  • label

    str : Label of the CompositeCost.

  • alssms

    list of ModelBase : ALSSM signal models.

  • segments

    list of Segment : Window segments.

  • M

    int : Number of ALSSMs \(M\).

  • P

    int : Number of Segments \(P\).

  • F

    ndarray : Mapping matrix \(F\), maps models to segments

  • betas

    ndarray of shape (P,) : Per-segment scaling factors \(\beta_p \geq 0\).

Source code in lmlib/statespace/cost.py
def __init__(self, alssms, segments, F=None, betas=None, label='n/a'):
    # set alssms (accept a single ALSSM in place of an iterable)
    if isinstance(alssms, ModelBase):
        alssms = [alssms]
    assert isinstance(alssms, Iterable), 'alssms is not iterable'
    for alssm in alssms:
        assert isinstance(alssm, ModelBase), 'element in alssms is not of instance ModelBase'
    self._alssms = list(alssms)

    # set segments (accept a single Segment in place of an iterable)
    if isinstance(segments, Segment):
        segments = [segments]
    assert isinstance(segments, Iterable), 'segments is not iterable'
    for segment in segments:
        assert isinstance(segment, Segment), 'element in segments is not instance of Segment'
    self._segments = list(segments)

    # set F (default to all-ones when there is a single ALSSM or a single
    # Segment; F is mandatory once there are at least two of each)
    if F is None:
        assert not (self.M >= 2 and self.P >= 2), \
            'F is mandatory when there are at least two ALSSMs and two Segments'
        F = np.ones((self.M, self.P))
    self.F = F

    # set betas
    if betas is not None:
        betas=np.array(betas)
        assert is_array_like(betas), 'betas is not array_like'
        assert betas.shape == (self.P,), f'betas has wrong shape, {info_str_found_shape(betas)}'
        for beta in betas:
            assert np.isscalar(beta), 'beta is not scalar'
            assert beta >= 0.0, 'beta is negative'
        self._betas = betas
    else:
        self._betas = np.ones(self.P)
    self.label = label
    r"""str : Label of the CompositeCost."""

Methods

get_alssm_order

get_alssm_order()

Return the combined state-space order N of all ALSSMs.

Source code in lmlib/statespace/cost.py
def get_alssm_order(self):
    """Return the combined state-space order N of all ALSSMs."""
    return self._get_sub_cost_term(seg=0).get_alssm_order()

get_alssm_output_dimension

get_alssm_output_dimension()

Return the output dimension Q shared by all ALSSMs.

Source code in lmlib/statespace/cost.py
def get_alssm_output_dimension(self):
    """Return the output dimension Q shared by all ALSSMs."""
    return self._get_sub_cost_term(seg=0).get_alssm_output_dimension()

get_number_of_dimensions

get_number_of_dimensions()

Return 1 — a CompositeCost always represents a single signal dimension.

Source code in lmlib/statespace/cost.py
def get_number_of_dimensions(self):
    """Return 1 — a CompositeCost always represents a single signal dimension."""
    return 1

get_steady_state_W

get_steady_state_W(dim_order=None, method='schur')

Compute the steady-state Gram matrix W as a beta-weighted sum over segments.

Parameters:

  • dim_order (ignored, default: None ) –

    Accepted for interface compatibility; unused for 1-D costs.

  • method (str, default: 'schur' ) –

    Forwarded to each get_steady_state_W.

Returns:

  • W ( ndarray of shape (N, N) ) –

    Aggregate steady-state Gram matrix.

Source code in lmlib/statespace/cost.py
def get_steady_state_W(self, dim_order=None, method='schur'):
    r"""
    Compute the steady-state Gram matrix W as a beta-weighted sum over segments.

    Parameters
    ----------
    dim_order : ignored
        Accepted for interface compatibility; unused for 1-D costs.
    method : str, optional
        Forwarded to each [`get_steady_state_W`][lmlib.statespace.cost.CostSegment.get_steady_state_W].

    Returns
    -------
    W : ndarray of shape (N, N)
        Aggregate steady-state Gram matrix.
    """
    N = self.get_alssm_order()
    W = np.zeros((N, N))

    for cost_segment in self:
        W += cost_segment.get_steady_state_W(method=method) * cost_segment.beta
    return W

eval_alssm_output

eval_alssm_output(xs, alssm_weights=None)

Evaluate the summed ALSSM output for a set of state vectors.

Parameters:

  • xs (array_like of shape (..., N)) –

    State vectors.

  • alssm_weights (None, scalar, or array_like, default: None ) –

    Per-ALSSM output weights forwarded to AlssmSum.

Returns:

  • out ( ndarray ) –

    Summed ALSSM output for each input state vector.

Source code in lmlib/statespace/cost.py
def eval_alssm_output(self, xs, alssm_weights=None):
    r"""
    Evaluate the summed ALSSM output for a set of state vectors.

    Parameters
    ----------
    xs : array_like of shape (..., N)
        State vectors.
    alssm_weights : None, scalar, or array_like, optional
        Per-ALSSM output weights forwarded to [`AlssmSum`][lmlib.statespace.model.AlssmSum].

    Returns
    -------
    out : ndarray
        Summed ALSSM output for each input state vector.
    """
    return AlssmSum(self.alssms, alssm_weights).eval_output(xs)

get_state_var_indices

get_state_var_indices(label)

Return the state indices for a named state variable in the stacked ALSSM.

Parameters:

  • label (str) –

    State variable label.

Returns:

  • indices ( list of int ) –
Source code in lmlib/statespace/cost.py
def get_state_var_indices(self, label):
    """
    Return the state indices for a named state variable in the stacked ALSSM.

    Parameters
    ----------
    label : str
        State variable label.

    Returns
    -------
    indices : list of int
    """
    return AlssmSum(self.alssms, label='cost').get_state_var_indices('cost.' + label)

get_alssms

get_alssms()

Return the list of ALSSMs comprising this CompositeCost.

Source code in lmlib/statespace/cost.py
def get_alssms(self):
    """Return the list of ALSSMs comprising this CompositeCost."""
    return self.alssms

spline_H

spline_H(max_continuity: int, alssm_index: int = 0) -> ndarray

Build the spline continuity H-matrix for a two-ALSSM cost.

For a CompositeCost with exactly two ALSSMs and mixing matrix F = [[1,0],[0,1]] (one ALSSM per segment, independent states), the full state vector is \(x = [x_L\;(N),\; x_R\;(N)]\). The H-matrix returned here reduces this to a lower-dimensional parameter vector \(v\) by imposing continuity constraints at the knot:

\[ x = H\,v, \qquad H \in \mathbb{R}^{2N \times (2N - m_c - 1)}, \]

where \(m_c =\) max_continuity and \(N\) is the model order.

Continuity constraints are derived from the k-th forward difference operator evaluated at the knot lag \(j = 0\):

\[ e_k = C\,(A - I)^k, \qquad k = 0, 1, \ldots \]

Constraint of order k:

\[ e_k\,(x_R - (-1)^k\,x_L) = 0 \]

which encodes:

  • \(k=0\) — value continuity: \(C x_R = C x_L\).
  • \(k=1\) — first-derivative continuity: \(e_1(x_R + x_L) = 0\).
  • etc.

The result is the unique minimum-rank H satisfying all constraints \(k = 0, \ldots, m_c\), computed via a stable linear solve.

Compatibility of the two ALSSMs

For constraints of order \(k \geq 1\), both ALSSMs must share the same \(A\) and \(C\) matrices (same basis functions). If they differ and max_continuity >= 1, a UserWarning is emitted because the H-matrix will use the basis of alssm_index and impose it on both sides, which is physically wrong for the other side.

For AlssmSin pairs, in particular:

  • max_continuity = 0 (value match only) always works.
  • max_continuity >= 1 requires the same omega and rho = 1 (undamped oscillation) on both sides; otherwise a warning is issued.

Parameters:

  • max_continuity (int) –

    Highest continuity order to impose. -1 returns the identity (free, no constraint); 0 imposes \(C^0\) (value match); D (= poly_degree) is the maximum possible (fully smooth).

  • alssm_index (int, default: 0 ) –

    Index (0-based) into self.alssms of the reference ALSSM whose \(A\) and \(C\) are used to build \(e_k\). Relevant only when the two ALSSMs differ. Default: 0.

Returns:

  • H ( ndarray of shape (2N, 2N - max_continuity - 1) ) –

    Linear constraint matrix. Pass to minimize_x as H=.

Raises:

  • ValueError

    If the cost does not have exactly two ALSSMs.

Warns:

  • UserWarning

    When max_continuity >= 1 and the two ALSSMs have different \(A\) or \(C\) matrices.

Examples:

Edge detection: test C^0 (offset-only) vs C^0+C^1 (slope join)::

alssm_L = lm.AlssmPolyMeixner(D, segment=segL)
alssm_R = lm.AlssmPolyMeixner(D, segment=segR)
cost    = lm.CompositeCost((alssm_L, alssm_R), (segL, segR), [[1,0],[0,1]])

rls.filter(y)
xs_free  = rls.minimize_x()                       # unconstrained
xs_cont  = rls.minimize_x(H=cost.spline_H(0))    # C^0 join
xs_peak  = rls.minimize_x(H=cost.spline_H(1))    # C^0+C^1 join
xs_full  = rls.minimize_x(H=cost.spline_H(D))    # fully smooth

Works equally for AlssmPoly, AlssmPolyJordan, AlssmPolyLegendre, AlssmSin. For AlssmPoly(1): spline_H(0) reproduces H_Continuous and spline_H(1) reproduces H_Peak from the literature.

Source code in lmlib/statespace/cost.py
def spline_H(self, max_continuity: int, alssm_index: int = 0) -> np.ndarray:
    r"""
    Build the spline continuity H-matrix for a two-ALSSM cost.

    For a [`CompositeCost`][lmlib.statespace.cost.CompositeCost] with **exactly two ALSSMs** and mixing matrix
    ``F = [[1,0],[0,1]]`` (one ALSSM per segment, independent states), the full
    state vector is $x = [x_L\;(N),\; x_R\;(N)]$.  The H-matrix returned
    here reduces this to a lower-dimensional parameter vector $v$ by
    imposing continuity constraints at the knot:

    $$
    x = H\,v, \qquad H \in \mathbb{R}^{2N \times (2N - m_c - 1)},
    $$

    where $m_c =$ ``max_continuity`` and $N$ is the model order.

    **Continuity constraints** are derived from the *k*-th forward difference
    operator evaluated at the knot lag $j = 0$:

    $$
    e_k = C\,(A - I)^k, \qquad k = 0, 1, \ldots
    $$

    Constraint of order *k*:

    $$
    e_k\,(x_R - (-1)^k\,x_L) = 0
    $$

    which encodes:

    * $k=0$ — value continuity: $C x_R = C x_L$.
    * $k=1$ — first-derivative continuity: $e_1(x_R + x_L) = 0$.
    * etc.

    The result is the unique minimum-rank H satisfying all constraints
    $k = 0, \ldots, m_c$, computed via a stable linear solve.

    **Compatibility of the two ALSSMs**

    For constraints of order $k \geq 1$, both ALSSMs must share the same
    $A$ and $C$ matrices (same basis functions).  If they differ and
    ``max_continuity >= 1``, a [`UserWarning`][UserWarning] is emitted because the
    H-matrix will use the basis of ``alssm_index`` and impose it on both sides,
    which is physically wrong for the other side.

    For [`AlssmSin`][lmlib.statespace.model.AlssmSin] pairs, in particular:

    * ``max_continuity = 0`` (value match only) always works.
    * ``max_continuity >= 1`` requires the same ``omega`` **and** ``rho = 1``
      (undamped oscillation) on both sides; otherwise a warning is issued.

    Parameters
    ----------
    max_continuity : int
        Highest continuity order to impose.  ``-1`` returns the identity
        (free, no constraint); ``0`` imposes $C^0$ (value match);
        ``D`` (= ``poly_degree``) is the maximum possible (fully smooth).
    alssm_index : int, optional
        Index (0-based) into ``self.alssms`` of the reference ALSSM whose
        $A$ and $C$ are used to build $e_k$.  Relevant
        only when the two ALSSMs differ.  Default: ``0``.

    Returns
    -------
    H : ndarray of shape (2N, 2N - max_continuity - 1)
        Linear constraint matrix.  Pass to
        [`minimize_x`][lmlib.statespace.rls.RLSAlssm.minimize_x] as ``H=``.

    Raises
    ------
    ValueError
        If the cost does not have exactly two ALSSMs.

    Warns
    -----
    UserWarning
        When ``max_continuity >= 1`` and the two ALSSMs have different
        $A$ or $C$ matrices.

    Examples
    --------
    Edge detection: test C^0 (offset-only) vs C^0+C^1 (slope join)::

        alssm_L = lm.AlssmPolyMeixner(D, segment=segL)
        alssm_R = lm.AlssmPolyMeixner(D, segment=segR)
        cost    = lm.CompositeCost((alssm_L, alssm_R), (segL, segR), [[1,0],[0,1]])

        rls.filter(y)
        xs_free  = rls.minimize_x()                       # unconstrained
        xs_cont  = rls.minimize_x(H=cost.spline_H(0))    # C^0 join
        xs_peak  = rls.minimize_x(H=cost.spline_H(1))    # C^0+C^1 join
        xs_full  = rls.minimize_x(H=cost.spline_H(D))    # fully smooth

    Works equally for AlssmPoly, AlssmPolyJordan, AlssmPolyLegendre, AlssmSin.
    For AlssmPoly(1):
    ``spline_H(0)`` reproduces ``H_Continuous`` and
    ``spline_H(1)`` reproduces ``H_Peak`` from the literature.
    """
    import warnings
    from numpy.linalg import matrix_power, solve

    if self.M != 2:
        raise ValueError(
            f"spline_H requires exactly 2 ALSSMs in the CompositeCost "
            f"(got {self.M}).  Use alssm_index to select the reference ALSSM.")

    # ── free case ────────────────────────────────────────────────────────────
    if max_continuity < 0:
        alssm_ref = self.alssms[alssm_index]
        N = alssm_ref.N
        return np.eye(2 * N)

    # ── select reference ALSSM ───────────────────────────────────────────────
    # For AlssmPolyMeixner TSLM the convention is (alssm_L[FW], alssm_R[BW]).
    # The spline e_k vectors must use A_bw (the backward matrix), so if
    # alssm_index points to a FW Meixner alssm, auto-select the BW one instead.
    _ref_idx = alssm_index
    from lmlib.statespace.model import AlssmPolyMeixner
    if (isinstance(self.alssms[_ref_idx], AlssmPolyMeixner)
            and getattr(self.alssms[_ref_idx], 'direction', 'bw') == 'fw'):
        other_idx = 1 - _ref_idx
        if (isinstance(self.alssms[other_idx], AlssmPolyMeixner)
                and getattr(self.alssms[other_idx], 'direction', 'bw') == 'bw'):
            _ref_idx = other_idx

    alssm_ref   = self.alssms[_ref_idx]
    alssm_other = self.alssms[1 - _ref_idx]
    A = np.asarray(alssm_ref.A)
    C = np.asarray(alssm_ref.C).ravel()
    N = A.shape[0]

    # ── compatibility check ──────────────────────────────────────────────────
    if max_continuity >= 1 and isinstance(alssm_ref, AlssmSin) and isinstance(alssm_other, AlssmSin):
        same_A = np.allclose(alssm_ref.A, alssm_other.A, rtol=1e-6, atol=1e-10)
        same_C = np.allclose(
            np.asarray(alssm_ref.C).ravel(),
            np.asarray(alssm_other.C).ravel(),
            rtol=1e-6, atol=1e-10)
        if not (same_A and same_C):
            warnings.warn(
                f"spline_H(max_continuity={max_continuity}): the two ALSSMs have "
                f"different A or C matrices.  Constraints of order >= 1 use the "
                f"basis of alssm[{alssm_index}] and will be physically wrong for "
                f"alssm[{1-alssm_index}].  For C^0 (value match only), set "
                f"max_continuity=0.",
                UserWarning, stacklevel=2)

    # ── build constraint vectors e_k = C (A-I)^k ────────────────────────────
    nc = min(max_continuity + 1, N)
    e = [C @ matrix_power(A - np.eye(N), k) for k in range(nc)]

    # ── assemble and solve for constrained block AR_top (nc x N) ────────────
    #
    # Full state: x_full = [x_L (N), x_R (N)], free params v = [v_L (N), v_free (N-nc)]
    # x_L = v_L
    # x_R[0:nc]  = AR_top @ v_L + BR_top @ v_free   (constrained)
    # x_R[nc:]   = v_free                             (free)
    #
    # Constraint k:  e_k @ x_R = (-1)^k * e_k @ x_L  for all (v_L, v_free)
    # => (1) E_top @ AR_top = RHS          (coefficient of v_L)
    # => (2) E_top @ BR_top + E_bot = 0    (coefficient of v_free)
    #
    # where E_top = E[:, :nc], E_bot = E[:, nc:], RHS[k,:] = (-1)^k * e_k.

    E     = np.array([e[k]       for k in range(nc)])   # nc x N
    RHS   = np.array([(-1)**k * e[k] for k in range(nc)])  # nc x N
    E_top = E[:, :nc]   # nc x nc
    E_bot = E[:, nc:]   # nc x (N-nc)

    AR_top = solve(E_top, RHS)           # nc x N
    BR_top = solve(E_top, -E_bot)        # nc x (N-nc)

    # ── assemble H ───────────────────────────────────────────────────────────
    nv = N + (N - nc)
    H  = np.zeros((2 * N, nv))
    H[:N, :N]         = np.eye(N)   # x_L = v_L
    H[N:N+nc, :N]     = AR_top      # x_R[0:nc] from v_L
    H[N:N+nc, N:]     = BR_top      # x_R[0:nc] from v_free
    for j in range(N - nc):         # x_R[nc:] = v_free
        H[N + nc + j, N + j] = 1.0
    return H