Skip to content

Time and Amplitude Scaling [ex703.0]

Estimates both a time-scaling (dilation) factor and an amplitude-scaling factor between two locally polynomial signals. This example is published in [Wildhaber2020].

The polynomial coefficients of the reference and the candidate windows are related by the dilation and amplitude operators, enabling a closed-form least-squares estimator via the polynomial calculus.

Plot

Plot

Plot

Code

r"""
Time and Amplitude Scaling [ex703.0]
====================================

Estimates both a time-scaling (dilation) factor and an amplitude-scaling
factor between two locally polynomial signals.
This example is published in [\[Wildhaber2020\]](../../../bibliography.md#wildhaber2020).

The polynomial coefficients of the reference and the candidate windows are
related by the dilation and amplitude operators, enabling a closed-form
least-squares estimator via the polynomial calculus.
"""

import matplotlib.pyplot as plt
from matplotlib.lines import Line2D

import numpy as np
import lmlib as lm
from lmlib.utils import load_lib_csv_mc


def time_amplitude_scaling(Q, a, b):
    q = np.arange(Q)
    Mq = lm.poly_square_expo(q)
    Mqp1 = lm.poly_int_expo(Mq)
    L_int = lm.poly_int_coef_L(Mq)
    L_eta = lm.mpoly_dilate_ind_coef_L(q)  # matrix operation of vdiag
    R_QQ = lm.permutation_matrix_square(Q, Q)

    I_Q = np.eye(Q)
    I_QQ = np.eye(Q ** 2)

    c1 = L_int
    c2 = np.kron(I_Q, L_int).dot(np.kron(L_eta, I_Q))
    c3 = np.kron(I_QQ, L_int).dot(R_QQ).dot(np.kron(L_eta, L_eta))

    bmaT = (np.power(b, Mqp1) - np.power(a, Mqp1)).T
    A = bmaT @ c1
    B = np.kron(I_Q, bmaT) @ c2
    C = np.kron(I_QQ, bmaT) @ c3

    return A, B, C, q, Mq


y = load_lib_csv_mc('SECG3_FILT_HP51_3CH_20S_FS2400HZ.csv', k_start=5500, K=750)
y = np.column_stack([np.convolve(y0, 1 / 50 * np.ones(50), 'same') for y0 in y.T])

fs = 2000
K = len(y)
t = np.arange(K) / fs
M = 3
Q = 5
ENERGY_THD = 60

ab_half = int((80e-3 * fs) / 2)
ks_alpha = np.array([370])  # fixing template

alssm = lm.AlssmPoly(poly_degree=Q - 1)
segment_right = lm.Segment(a=-ab_half, b=ab_half, direction=lm.BW, g=50, delta=ab_half)
cost = lm.CostSegment(alssm, segment_right)
rls = lm.RLSAlssm(cost)
rls.filter(y)
xs = rls.minimize_x(solver='inv')

# generate template
dilate_factor = 1.456
amplitude_factor = 0.945
y_template = amplitude_factor * np.column_stack(
    [np.interp(np.linspace(0, K, int(K * dilate_factor)), np.arange(K), y0) for y0 in y.T])

rls_tmpl = lm.RLSAlssm(cost)
rls_tmpl.filter(y_template)
xs_tmpl = rls_tmpl.minimize_x(solver='inv')
alphas = xs_tmpl[(ks_alpha * dilate_factor).astype(int)]

A, B, C, q, Mq = time_amplitude_scaling(Q, a=-ab_half, b=ab_half)
etas = np.linspace(0.5, 2, 1000)

J = np.full(K, np.inf)
cost_ratio = np.full(K, np.inf)
model_energy_obs = np.full(K, np.inf)
time_scaling_hat = np.full(K, np.nan)
amplitude_hat = np.full(K, np.nan)

for k in range(K):

    beta_beta = np.zeros(Q ** 2)
    alpha_beta = np.zeros(Q ** 2)
    alpha_alpha = np.zeros(Q ** 2)

    for m in range(M):
        beta_beta += np.kron(xs[k, m], xs[k, m])
        alpha_beta += np.kron(alphas[0, m], xs[k, m])
        alpha_alpha += np.kron(alphas[0, m], alphas[0, m])

    a1 = A @ beta_beta
    for eta in etas:
        J_ = a1 - (((B @ alpha_beta).T @ np.power(eta, q)) ** 2) / ((C @ alpha_alpha).T @ np.power(eta, Mq))
        if J_ < J[k]:
            J[k] = J_
            time_scaling_hat[k] = eta
            amplitude_hat[k] = ((B @ alpha_beta).T @ np.power(eta, q)) / ((C @ alpha_alpha).T @ np.power(eta, Mq))
            cost_ratio[k] = J_ / a1
            model_energy_obs[k] = a1

mask = ENERGY_THD < model_energy_obs
k_range_mask = np.flatnonzero(mask)
k_min = k_range_mask[np.argmin(cost_ratio[mask])]

L_dilate = lm.poly_dilation_coef_L(np.arange(Q), time_scaling_hat[k_min])
alphas_hat = amplitude_hat[k_min] * np.einsum('mn, ksn->ksm', L_dilate, alphas)

# -- Plotting --


# Plot A: Trajectories
offset_channels = np.arange(M) + 1
fig, (ax1, ax2, ax3) = plt.subplots(1, 3, gridspec_kw={'width_ratios': [1, 2, 2]})
ab_half_ext = int(ab_half * 1.15)
segment_right = lm.Segment(a=-ab_half_ext, b=ab_half_ext, direction=lm.BW, g=50, delta=ab_half_ext)
cost_ext = lm.CostSegment(alssm, segment_right)

trajs_obs      = lm.Trajectory.eval_y(cost, xs[ks_alpha], ks_alpha, K)
trajs_tmpl_hat = lm.Trajectory.eval_y(cost, alphas_hat, k_min, K)
trajs_obs_ext  = lm.Trajectory.eval_y(cost_ext, xs[ks_alpha], ks_alpha, K)

k_tmpl, trajs_tmpl = zip(*lm.Trajectory.eval(cost, alphas, merged_ks=True,merged_seg=True))
k_tmpl = np.array(k_tmpl[0])   # all ranges identical, take first
trajs_tmpl = np.array(trajs_tmpl).T #have K in first dimension

ax1.plot(k_tmpl, trajs_tmpl + offset_channels, lw=1, c='r', label=r'$\alpha^T z^q$')

ax1.set_xlim([min(k_tmpl), max(k_tmpl)])
ax1.set_xticks([max(k_tmpl)])
ax1.set_xticklabels(['1'])
ax1.legend([Line2D([0], [0], color='r', lw=1)], [r'$\alpha^T z^q$'], loc=1, fontsize=8)

ax2.plot(t, y + offset_channels, lw=1, c='k')
ax2.plot(t, trajs_obs_ext +  offset_channels, lw=0.8, ls='--', c='b')
ax2.plot(t, trajs_obs +  offset_channels, lw=1, c='b')
ax2.legend([Line2D([0], [0], color='b', lw=1)], [r'$\beta_k^T z^q$'], loc=1, fontsize=8)

ax3.plot(t, y + offset_channels, lw=1, c='grey')
ax3.plot(t, trajs_obs_ext + offset_channels, lw=0.8, ls='--', c='b')
ax3.plot(t, trajs_obs  + offset_channels, lw=1, c='b')
ax3.plot(t, trajs_tmpl_hat  + offset_channels, lw=1, c='r', label=r'$\alpha^T z^q$')
ax3.legend([Line2D([0], [0], color='r', lw=1)], [r'$\lambda \alpha^T (\eta z)^q$'], loc=1, fontsize=8)

for ax in fig.axes:
    ax.set_yticks([1, 2, 3])

plt.show()

# Plot B: Show cost and estimate over time
if True:
    fig, axs = plt.subplots(5, 1, sharex='all')
    axs[0].plot(J, c='k', lw=0.6, ls=':', label='cost')
    axs[0].plot(k_range_mask, J[mask], c='b', lw=1)
    axs[1].plot(cost_ratio, c='k', lw=0.6, ls=':', label='cost_ratio')
    axs[1].plot(k_range_mask, cost_ratio[mask], c='b', lw=1)
    axs[2].plot(time_scaling_hat, c='k', lw=0.6, ls=':', label=r'time scaling $\eta$')
    axs[2].plot(k_range_mask, time_scaling_hat[mask], c='b', lw=1)
    axs[3].plot(amplitude_hat, c='k', lw=0.6, ls=':', label=r'amplitude scaling $\lambda$')
    axs[3].plot(k_range_mask, amplitude_hat[mask], c='b', lw=1)
    axs[4].plot(model_energy_obs, c='k', lw=0.6, ls=':', label=r'Model Energy Observation')
    axs[4].plot(k_range_mask, model_energy_obs[mask], c='b', lw=1)
    for ax in axs:
        ax.axvline(k_min, c='k', lw=0.5)
        ax.legend(loc=1, fontsize=8)
        ax.spines['top'].set_visible(False)
        ax.spines['right'].set_visible(False)
    axs[-1].set_xlabel('k')
    plt.show()