Moved code from sg_filter.py to demo_sg.py

master
Per A Brodtkorb 8 years ago
parent a105a45af1
commit 24b8784c21

@ -1,13 +1,18 @@
import matplotlib.pyplot as plt
import numpy as np
from wafo.sg_filter import SavitzkyGolay, smoothn # calc_coeff, smooth
from scipy.sparse.linalg import expm
from scipy.signal import medfilt
from wafo.plotbackend import plotbackend as plt
from wafo.sg_filter import (SavitzkyGolay, smoothn, Kalman, HodrickPrescott,
HampelFilter)
def example_reconstruct_noisy_chirp():
def demo_savitzky_on_noisy_chirp():
"""
Example
-------
>>> example_reconstruct_noisy_chirp()
>>> demo_savitzky_on_noisy_chirp()
>>> plt.close()
"""
plt.figure(figsize=(7, 12))
@ -47,8 +52,437 @@ def example_reconstruct_noisy_chirp():
plt.title('smoothed derivative of signal')
def demo_kalman_voltimeter():
"""
Example
-------
>>> demo_kalman_voltimeter()
>>> plt.close()
"""
V0 = 12
h = np.atleast_2d(1) # voltimeter measure the voltage itself
q = 1e-9 # variance of process noise as the car operates
r = 0.05 ** 2 # variance of measurement error
b = 0 # no system input
u = 0 # no system input
filt = Kalman(R=r, A=1, Q=q, H=h, B=b)
# Generate random voltages and watch the filter operate.
n = 50
truth = np.random.randn(n) * np.sqrt(q) + V0
z = truth + np.random.randn(n) * np.sqrt(r) # measurement
x = np.zeros(n)
for i, zi in enumerate(z):
x[i] = filt(zi, u) # perform a Kalman filter iteration
_hz = plt.plot(z, 'r.', label='observations')
# a-posteriori state estimates:
_hx = plt.plot(x, 'b-', label='Kalman output')
_ht = plt.plot(truth, 'g-', label='true voltage')
plt.legend()
plt.title('Automobile Voltimeter Example')
def lti_disc(F, L=None, Q=None, dt=1):
"""LTI_DISC Discretize LTI ODE with Gaussian Noise.
Syntax:
[A,Q] = lti_disc(F,L,Qc,dt)
In:
F - NxN Feedback matrix
L - NxL Noise effect matrix (optional, default identity)
Qc - LxL Diagonal Spectral Density (optional, default zeros)
dt - Time Step (optional, default 1)
Out:
A - Transition matrix
Q - Discrete Process Covariance
Description:
Discretize LTI ODE with Gaussian Noise. The original
ODE model is in form
dx/dt = F x + L w, w ~ N(0,Qc)
Result of discretization is the model
x[k] = A x[k-1] + q, q ~ N(0,Q)
Which can be used for integrating the model
exactly over time steps, which are multiples
of dt.
"""
n = np.shape(F)[0]
if L is None:
L = np.eye(n)
if Q is None:
Q = np.zeros((n, n))
# Closed form integration of transition matrix
A = expm(F * dt)
# Closed form integration of covariance
# by matrix fraction decomposition
Phi = np.vstack((np.hstack((F, np.dot(np.dot(L, Q), L.T))),
np.hstack((np.zeros((n, n)), -F.T))))
AB = np.dot(expm(Phi * dt), np.vstack((np.zeros((n, n)), np.eye(n))))
# Q = AB[:n, :] / AB[n:(2 * n), :]
Q = np.linalg.solve(AB[n:(2 * n), :].T, AB[:n, :].T)
return A, Q
def demo_kalman_sine():
"""Kalman Filter demonstration with sine signal.
Example
-------
>>> demo_kalman_sine()
>>> plt.close()
"""
sd = 0.5
dt = 0.1
w = 1
T = np.arange(0, 30 + dt / 2, dt)
n = len(T)
X = 3*np.sin(w * T)
Y = X + sd * np.random.randn(n)
''' Initialize KF to values
x = 0
dx/dt = 0
with great uncertainty in derivative
'''
M = np.zeros((2, 1))
P = np.diag([0.1, 2])
R = sd ** 2
H = np.atleast_2d([1, 0])
q = 0.1
F = np.atleast_2d([[0, 1],
[0, 0]])
A, Q = lti_disc(F, L=None, Q=np.diag([0, q]), dt=dt)
# Track and animate
m = M.shape[0]
_MM = np.zeros((m, n))
_PP = np.zeros((m, m, n))
'''In this demonstration we estimate a stationary sine signal from noisy
measurements by using the classical Kalman filter.'
'''
filt = Kalman(R=R, x=M, P=P, A=A, Q=Q, H=H, B=0)
# Generate random voltages and watch the filter operate.
# n = 50
# truth = np.random.randn(n) * np.sqrt(q) + V0
# z = truth + np.random.randn(n) * np.sqrt(r) # measurement
truth = X
z = Y
x = np.zeros((n, m))
for i, zi in enumerate(z):
x[i] = np.ravel(filt(zi, u=0))
_hz = plt.plot(z, 'r.', label='observations')
# a-posteriori state estimates:
_hx = plt.plot(x[:, 0], 'b-', label='Kalman output')
_ht = plt.plot(truth, 'g-', label='true voltage')
plt.legend()
plt.title('Automobile Voltimeter Example')
plt.show('hold')
# for k in range(m):
# [M,P] = kf_predict(M,P,A,Q);
# [M,P] = kf_update(M,P,Y(k),H,R);
#
# MM(:,k) = M;
# PP(:,:,k) = P;
#
# %
# % Animate
# %
# if rem(k,10)==1
# plot(T,X,'b--',...
# T,Y,'ro',...
# T(k),M(1),'k*',...
# T(1:k),MM(1,1:k),'k-');
# legend('Real signal','Measurements','Latest estimate',
# 'Filtered estimate')
# title('Estimating a noisy sine signal with Kalman filter.');
# drawnow;
#
# pause;
# end
# end
#
# clc;
# disp('In this demonstration we estimate a stationary sine signal '
# 'from noisy measurements by using the classical Kalman filter.');
# disp(' ');
# disp('The filtering results are now displayed sequantially for 10 time '
# 'step at a time.');
# disp(' ');
# disp('<push any key to see the filtered and smoothed results together>')
# pause;
# %
# % Apply Kalman smoother
# %
# SM = rts_smooth(MM,PP,A,Q);
# plot(T,X,'b--',...
# T,MM(1,:),'k-',...
# T,SM(1,:),'r-');
# legend('Real signal','Filtered estimate','Smoothed estimate')
# title('Filtered and smoothed estimate of the original signal');
#
# clc;
# disp('The filtered and smoothed estimates of the signal are now '
# 'displayed.')
# disp(' ');
# disp('RMS errors:');
# %
# % Errors
# %
# fprintf('KF = %.3f\nRTS = %.3f\n',...
# sqrt(mean((MM(1,:)-X(1,:)).^2)),...
# sqrt(mean((SM(1,:)-X(1,:)).^2)));
def demo_hampel():
"""
Example
-------
>>> demo_hampel()
>>> plt.close()
"""
randint = np.random.randint
Y = 5000 + np.random.randn(1000)
outliers = randint(0, 1000, size=(10,))
Y[outliers] = Y[outliers] + randint(1000, size=(10,))
YY, res = HampelFilter(dx=3, t=3, fulloutput=True)(Y)
YY1, res1 = HampelFilter(dx=1, t=3, adaptive=0.1, fulloutput=True)(Y)
YY2, res2 = HampelFilter(dx=3, t=0, fulloutput=True)(Y) # median
plt.figure(1)
plot_hampel(Y, YY, res)
plt.title('Standard HampelFilter')
plt.figure(2)
plot_hampel(Y, YY1, res1)
plt.title('Adaptive HampelFilter')
plt.figure(3)
plot_hampel(Y, YY2, res2)
plt.title('Median filter')
def plot_hampel(Y, YY, res):
X = np.arange(len(YY))
plt.plot(X, Y, 'b.') # Original Data
plt.plot(X, YY, 'r') # Hampel Filtered Data
plt.plot(X, res['Y0'], 'b--') # Nominal Data
plt.plot(X, res['LB'], 'r--') # Lower Bounds on Hampel Filter
plt.plot(X, res['UB'], 'r--') # Upper Bounds on Hampel Filter
i = res['outliers']
plt.plot(X[i], Y[i], 'ks') # Identified Outliers
def demo_tide_filter():
"""
Example
-------
>>> demo_tide_filter()
>>> plt.close()
"""
# import statsmodels.api as sa
import wafo.spectrum.models as sm
sd = 10
Sj = sm.Jonswap(Hm0=4.*sd)
S = Sj.tospecdata()
q = (0.1 * sd) ** 2 # variance of process noise s the car operates
r = (100 * sd) ** 2 # variance of measurement error
b = 0 # no system input
u = 0 # no system input
from scipy.signal import butter, filtfilt, lfilter_zi # lfilter,
freq_tide = 1. / (12 * 60 * 60)
freq_wave = 1. / 10
freq_filt = freq_wave / 10
dt = 1.
freq = 1. / dt
fn = (freq / 2)
P = 10 * np.diag([1, 0.01])
R = r
H = np.atleast_2d([1, 0])
F = np.atleast_2d([[0, 1],
[0, 0]])
A, Q = lti_disc(F, L=None, Q=np.diag([0, q]), dt=dt)
t = np.arange(0, 60 * 12, 1. / freq)
w = 2 * np.pi * freq # 1 Hz
tide = 100 * np.sin(freq_tide * w * t + 2 * np.pi / 4) + 100
y = tide + S.sim(len(t), dt=1. / freq)[:, 1].ravel()
# lowess = sa.nonparametric.lowess
# y2 = lowess(y, t, frac=0.5)[:,1]
filt = Kalman(R=R, x=np.array([[tide[0]], [0]]), P=P, A=A, Q=Q, H=H, B=b)
filt2 = Kalman(R=R, x=np.array([[tide[0]], [0]]), P=P, A=A, Q=Q, H=H, B=b)
# y = tide + 0.5 * np.sin(freq_wave * w * t)
# Butterworth filter
b, a = butter(9, (freq_filt / fn), btype='low')
# y2 = [lowess(y[max(i-60,0):i + 1], t[max(i-60,0):i + 1], frac=.3)[-1,1]
# for i in range(len(y))]
# y2 = [lfilter(b, a, y[:i + 1])[i] for i in range(len(y))]
# y3 = filtfilt(b, a, y[:16]).tolist() + [filtfilt(b, a, y[:i + 1])[i]
# for i in range(16, len(y))]
# y0 = medfilt(y, 41)
_zi = lfilter_zi(b, a)
# y2 = lfilter(b, a, y)#, zi=y[0]*zi) # standard filter
y3 = filtfilt(b, a, y) # filter with phase shift correction
y4 = []
y5 = []
for _i, j in enumerate(y):
tmp = np.ravel(filt(j, u=u))
tmp = np.ravel(filt2(tmp[0], u=u))
# if i==0:
# print(filt.x)
# print(filt2.x)
y4.append(tmp[0])
y5.append(tmp[1])
_y0 = medfilt(y4, 41)
print(filt.P)
# plot
plt.plot(t, y, 'r.-', linewidth=2, label='raw data')
# plt.plot(t, y2, 'b.-', linewidth=2, label='lowess @ %g Hz' % freq_filt)
# plt.plot(t, y2, 'b.-', linewidth=2, label='filter @ %g Hz' % freq_filt)
plt.plot(t, y3, 'g.-', linewidth=2, label='filtfilt @ %g Hz' % freq_filt)
plt.plot(t, y4, 'k.-', linewidth=2, label='kalman')
# plt.plot(t, y5, 'k.', linewidth=2, label='kalman2')
plt.plot(t, tide, 'y-', linewidth=2, label='True tide')
plt.legend(frameon=False, fontsize=14)
plt.xlabel("Time [s]")
plt.ylabel("Amplitude")
def demo_savitzky_on_exponential():
"""
Example
-------
>>> demo_savitzky_on_exponential()
>>> plt.close()
"""
t = np.linspace(-4, 4, 500)
y = np.exp(-t ** 2) + np.random.normal(0, 0.05, np.shape(t))
n = 11
ysg = SavitzkyGolay(n, degree=1, diff_order=0)(y)
plt.plot(t, y, t, ysg, '--')
def demo_smoothn_on_1d_cos():
"""
Example
-------
>>> demo_smoothn_on_1d_cos()
>>> plt.close()
"""
x = np.linspace(0, 100, 2 ** 8)
y = np.cos(x / 10) + (x / 50) ** 2 + np.random.randn(np.size(x)) / 10
y[np.r_[70, 75, 80]] = np.array([5.5, 5, 6])
z = smoothn(y) # Regular smoothing
zr = smoothn(y, robust=True) # Robust smoothing
_h0 = plt.subplot(121),
_h = plt.plot(x, y, 'r.', x, z, 'k', linewidth=2)
plt.title('Regular smoothing')
plt.subplot(122)
plt.plot(x, y, 'r.', x, zr, 'k', linewidth=2)
plt.title('Robust smoothing')
def demo_smoothn_on_2d_exp_sin():
"""
Example
-------
>>> demo_smoothn_on_2d_exp_sin()
>>> plt.close()
"""
xp = np.arange(0, 1, 0.02) # np.r_[0:1:0.02]
[x, y] = np.meshgrid(xp, xp)
f = np.exp(x + y) + np.sin((x - 2 * y) * 3)
fn = f + np.random.randn(*f.shape) * 0.5
_fs, s = smoothn(fn, fulloutput=True)
fs2 = smoothn(fn, s=2 * s)
_h = plt.subplot(131),
_h = plt.contourf(xp, xp, fn)
_h = plt.subplot(132),
_h = plt.contourf(xp, xp, fs2)
_h = plt.subplot(133),
_h = plt.contourf(xp, xp, f)
def _cardioid(n=1000):
t = np.linspace(0, 2 * np.pi, n)
x0 = 2 * np.cos(t) * (1 - np.cos(t))
y0 = 2 * np.sin(t) * (1 - np.cos(t))
x = x0 + np.random.randn(x0.size) * 0.1
y = y0 + np.random.randn(y0.size) * 0.1
return x, y, x0, y0
def demo_smoothn_on_cardioid():
"""
Example
-------
>>> demo_smoothn_cardoid()
>>> plt.close()
"""
x, y, x0, y0 = _cardioid()
z = smoothn(x + 1j * y, robust=False)
plt.plot(x0, y0, 'y',
x, y, 'r.',
np.real(z), np.imag(z), 'k', linewidth=2)
def demo_hodrick_on_cardioid():
"""
Example
-------
>>> demo_hodrick_on_cardioid()
>>> plt.close()
"""
x, y, x0, y0 = _cardioid()
smooth = HodrickPrescott(w=20000)
# smooth = HampelFilter(adaptive=50)
xs, ys = smooth(x), smooth(y)
plt.plot(x0, y0, 'y',
x, y, 'r.',
xs, ys, 'k', linewidth=2)
if __name__ == '__main__':
from wafo.testing import test_docstrings
test_docstrings(__file__)
# example_reconstruct_noisy_chirp()
# demo_savitzky_on_noisy_chirp()
# plt.show('hold') # show plot
# demo_kalman_sine()
# demo_tide_filter()
# demo_hampel()
# demo_kalman_voltimeter()
# demo_savitzky_on_exponential()
# plt.figure(1)
# demo_hodrick_on_cardioid()
# plt.figure(2)
# # demo_smoothn_on_1d_cos()
# demo_smoothn_on_cardioid()
# plt.show('hold')

@ -9,17 +9,17 @@ import warnings
import numpy as np
try:
from . import mvn # @UnresolvedImport
from wafo import mvn # @UnresolvedImport
except ImportError:
warnings.warn('mvn not found. Check its compilation.')
mvn = None
try:
from . import mvnprdmod # @UnresolvedImport
from wafo import mvnprdmod # @UnresolvedImport
except ImportError:
warnings.warn('mvnprdmod not found. Check its compilation.')
mvnprdmod = None
try:
from . import rindmod # @UnresolvedImport
from wafo import rindmod # @UnresolvedImport
except ImportError:
warnings.warn('rindmod not found. Check its compilation.')
rindmod = None
@ -27,7 +27,7 @@ except ImportError:
__all__ = ['Rind', 'rindmod', 'mvnprdmod', 'mvn', 'cdflomax', 'prbnormtndpc',
'prbnormndpc', 'prbnormnd', 'cdfnorm2d', 'prbnorm2d', 'cdfnorm',
'invnorm', 'test_docstring']
'invnorm']
class Rind(object):
@ -1028,12 +1028,9 @@ def bvd(lo, up, r):
return cdfnorm2d(-lo, -up, r)
def test_docstrings():
import doctest
doctest.testmod()
if __name__ == '__main__':
test_docstrings()
from wafo.testing import test_docstrings
test_docstrings(__file__)
# if __name__ == '__main__':
# if False: #True: #
# test_rind()

@ -1,21 +1,23 @@
from __future__ import absolute_import, division
import numpy as np
# from math import pow
# from numpy import zeros,dot
from numpy import (pi, convolve, linalg, concatenate, sqrt)
from numpy import (pi, linalg, concatenate, sqrt)
from scipy.sparse import spdiags
from scipy.sparse.linalg import spsolve, expm
from scipy.signal import medfilt
from .dctpack import dctn, idctn
from .plotbackend import plotbackend as plt
from scipy.sparse.linalg import spsolve
import scipy.optimize as optimize
from scipy.signal import _savitzky_golay
from scipy.ndimage import convolve1d
from scipy.ndimage.morphology import distance_transform_edt
import warnings
from wafo.dctpack import dctn, idctn
__all__ = ['SavitzkyGolay', 'Kalman', 'HodrickPrescott', 'smoothn']
__all__ = ['SavitzkyGolay', 'Kalman', 'HodrickPrescott', 'smoothn',
'HampelFilter', 'SmoothNd']
def _assert(cond, msg):
if not cond:
raise ValueError(msg)
class SavitzkyGolay(object):
@ -162,43 +164,6 @@ class SavitzkyGolay(object):
y = convolve1d(x, coeffs, axis=axis, mode=mode, cval=self.cval)
return y
def _smooth(self, signal, pad=True):
"""
Returns smoothed signal (or it's n-th derivative).
Parameters
----------
y : array_like, shape (N,)
the values of the time history of the signal.
pad : bool
pad first and last values to lessen the end effects.
Returns
-------
ys : ndarray, shape (N)
the smoothed signal (or it's n-th derivative).
"""
coeff = self._coeff
n = np.size(coeff - 1) // 2
y = np.squeeze(signal)
if n == 0:
return y
if pad:
first_vals = y[0] - np.abs(y[n:0:-1] - y[0])
last_vals = y[-1] + np.abs(y[-2:-n - 2:-1] - y[-1])
y = concatenate((first_vals, y, last_vals))
n *= 2
d = y.ndim
if d > 1:
y1 = y.reshape(y.shape[0], -1)
res = []
for i in range(y1.shape[1]):
res.append(convolve(y1[:, i], coeff)[n:-n])
res = np.asarray(res).T
else:
res = convolve(y, coeff)[n:-n]
return res
def evar(y):
"""Noise variance estimation. Assuming that the deterministic function Y
@ -417,8 +382,7 @@ class _Filter(object):
(Inf/NaN values = missing data).
"""
weights = weight * is_finite
if (weights < 0).any():
raise ValueError('Weights must all be >=0')
_assert(np.all(0 <= weights), 'Weights must all be >=0')
return weights / weights.max()
@staticmethod
@ -523,6 +487,66 @@ class _Filter(object):
return z, s, converged
class SmoothNd(object):
def __init__(self, s=None, weight=None, robust=False, z0=None, tolz=1e-3,
maxiter=100, fulloutput=False):
self.s = s
self.weight = weight
self.robust = robust
self.z0 = z0
self.tolz = tolz
self.maxiter = maxiter
self.fulloutput = fulloutput
@property
def weightstr(self):
if isinstance(self._weight, str):
return self._weight.lower()
return 'bisquare'
@property
def weight(self):
if self._weight is None or isinstance(self._weight, str):
return 1.0
return self._weight
@weight.setter
def weight(self, weight):
self._weight = weight
def _init_filter(self, y):
return _Filter(y, self.z0, self.weightstr, self.weight, self.s,
self.robust, self.maxiter, self.tolz)
@property
def num_steps(self):
return 3 if self.robust else 1
def __call__(self, data):
y = np.atleast_1d(data)
if y.size < 2:
return data
_filter = self._init_filter(y)
z = _filter.z0
s = _filter.s
converged = False
for _i in range(self.num_steps):
z, s, converged = _filter(z, s)
if not converged:
msg = '''Maximum number of iterations (%d) has been exceeded.
Increase MaxIter option or decrease TolZ value.''' % (self.maxiter)
warnings.warn(msg)
_filter.check_smooth_parameter(s)
if self.fulloutput:
return z, s
return z
def smoothn(data, s=None, weight=None, robust=False, z0=None, tolz=1e-3,
maxiter=100, fulloutput=False):
'''
@ -656,115 +680,6 @@ def smoothn(data, s=None, weight=None, robust=False, z0=None, tolz=1e-3,
return SmoothNd(s, weight, robust, z0, tolz, maxiter, fulloutput)(data)
class SmoothNd(object):
def __init__(self, s=None, weight=None, robust=False, z0=None, tolz=1e-3,
maxiter=100, fulloutput=False):
self.s = s
self.weight = weight
self.robust = robust
self.z0 = z0
self.tolz = tolz
self.maxiter = maxiter
self.fulloutput = fulloutput
@property
def weightstr(self):
if isinstance(self._weight, str):
return self._weight.lower()
return 'bisquare'
@property
def weight(self):
if self._weight is None or isinstance(self._weight, str):
return 1.0
return self._weight
@weight.setter
def weight(self, weight):
self._weight = weight
def _init_filter(self, y):
return _Filter(y, self.z0, self.weightstr, self.weight, self.s,
self.robust, self.maxiter, self.tolz)
@property
def num_steps(self):
return 3 if self.robust else 1
def __call__(self, data):
y = np.atleast_1d(data)
if y.size < 2:
return data
_filter = self._init_filter(y)
z = _filter.z0
s = _filter.s
converged = False
for _i in range(self.num_steps):
z, s, converged = _filter(z, s)
if not converged:
msg = '''Maximum number of iterations (%d) has been exceeded.
Increase MaxIter option or decrease TolZ value.''' % (self.maxiter)
warnings.warn(msg)
_filter.check_smooth_parameter(s)
if self.fulloutput:
return z, s
return z
def test_smoothn_1d():
x = np.linspace(0, 100, 2 ** 8)
y = np.cos(x / 10) + (x / 50) ** 2 + np.random.randn(x.size) / 10
y[np.r_[70, 75, 80]] = np.array([5.5, 5, 6])
z = smoothn(y) # Regular smoothing
zr = smoothn(y, robust=True) # Robust smoothing
_h0 = plt.subplot(121),
_h = plt.plot(x, y, 'r.', x, z, 'k', linewidth=2)
plt.title('Regular smoothing')
plt.subplot(122)
plt.plot(x, y, 'r.', x, zr, 'k', linewidth=2)
plt.title('Robust smoothing')
plt.show('hold')
def test_smoothn_2d():
# import mayavi.mlab as plt
xp = np.r_[0:1:0.02]
[x, y] = np.meshgrid(xp, xp)
f = np.exp(x + y) + np.sin((x - 2 * y) * 3)
fn = f + np.random.randn(*f.shape) * 0.5
_fs, s = smoothn(fn, fulloutput=True)
fs2 = smoothn(fn, s=2 * s)
_h = plt.subplot(131),
_h = plt.contourf(xp, xp, fn)
_h = plt.subplot(132),
_h = plt.contourf(xp, xp, fs2)
_h = plt.subplot(133),
_h = plt.contourf(xp, xp, f)
plt.show('hold')
def test_smoothn_cardioid():
t = np.linspace(0, 2 * pi, 1000)
cos = np.cos
sin = np.sin
randn = np.random.randn
x0 = 2 * cos(t) * (1 - cos(t))
x = x0 + randn(t.size) * 0.1
y0 = 2 * sin(t) * (1 - cos(t))
y = y0 + randn(t.size) * 0.1
z = smoothn(x + 1j * y, robust=False)
plt.plot(x0, y0, 'y',
x, y, 'r.',
z.real, z.imag, 'k', linewidth=2)
plt.show('hold')
class HodrickPrescott(object):
'''Smooth data with a Hodrick-Prescott filter.
@ -1066,192 +981,6 @@ class Kalman(object):
return self._filter(z, u)
def test_kalman():
V0 = 12
h = np.atleast_2d(1) # voltimeter measure the voltage itself
q = 1e-9 # variance of process noise as the car operates
r = 0.05 ** 2 # variance of measurement error
b = 0 # no system input
u = 0 # no system input
filt = Kalman(R=r, A=1, Q=q, H=h, B=b)
# Generate random voltages and watch the filter operate.
n = 50
truth = np.random.randn(n) * np.sqrt(q) + V0
z = truth + np.random.randn(n) * np.sqrt(r) # measurement
x = np.zeros(n)
for i, zi in enumerate(z):
x[i] = filt(zi, u) # perform a Kalman filter iteration
_hz = plt.plot(z, 'r.', label='observations')
# a-posteriori state estimates:
_hx = plt.plot(x, 'b-', label='Kalman output')
_ht = plt.plot(truth, 'g-', label='true voltage')
plt.legend()
plt.title('Automobile Voltimeter Example')
plt.show('hold')
def lti_disc(F, L=None, Q=None, dt=1):
"""LTI_DISC Discretize LTI ODE with Gaussian Noise.
Syntax:
[A,Q] = lti_disc(F,L,Qc,dt)
In:
F - NxN Feedback matrix
L - NxL Noise effect matrix (optional, default identity)
Qc - LxL Diagonal Spectral Density (optional, default zeros)
dt - Time Step (optional, default 1)
Out:
A - Transition matrix
Q - Discrete Process Covariance
Description:
Discretize LTI ODE with Gaussian Noise. The original
ODE model is in form
dx/dt = F x + L w, w ~ N(0,Qc)
Result of discretization is the model
x[k] = A x[k-1] + q, q ~ N(0,Q)
Which can be used for integrating the model
exactly over time steps, which are multiples
of dt.
"""
n = np.shape(F)[0]
if L is None:
L = np.eye(n)
if Q is None:
Q = np.zeros((n, n))
# Closed form integration of transition matrix
A = expm(F * dt)
# Closed form integration of covariance
# by matrix fraction decomposition
Phi = np.vstack((np.hstack((F, np.dot(np.dot(L, Q), L.T))),
np.hstack((np.zeros((n, n)), -F.T))))
AB = np.dot(expm(Phi * dt), np.vstack((np.zeros((n, n)), np.eye(n))))
# Q = AB[:n, :] / AB[n:(2 * n), :]
Q = np.linalg.solve(AB[n:(2 * n), :].T, AB[:n, :].T)
return A, Q
def test_kalman_sine():
"""Kalman Filter demonstration with sine signal."""
sd = 0.5
dt = 0.1
w = 1
T = np.arange(0, 30 + dt / 2, dt)
n = len(T)
X = 3*np.sin(w * T)
Y = X + sd * np.random.randn(n)
''' Initialize KF to values
x = 0
dx/dt = 0
with great uncertainty in derivative
'''
M = np.zeros((2, 1))
P = np.diag([0.1, 2])
R = sd ** 2
H = np.atleast_2d([1, 0])
q = 0.1
F = np.atleast_2d([[0, 1],
[0, 0]])
A, Q = lti_disc(F, L=None, Q=np.diag([0, q]), dt=dt)
# Track and animate
m = M.shape[0]
_MM = np.zeros((m, n))
_PP = np.zeros((m, m, n))
'''In this demonstration we estimate a stationary sine signal from noisy
measurements by using the classical Kalman filter.'
'''
filt = Kalman(R=R, x=M, P=P, A=A, Q=Q, H=H, B=0)
# Generate random voltages and watch the filter operate.
# n = 50
# truth = np.random.randn(n) * np.sqrt(q) + V0
# z = truth + np.random.randn(n) * np.sqrt(r) # measurement
truth = X
z = Y
x = np.zeros((n, m))
for i, zi in enumerate(z):
x[i] = np.ravel(filt(zi, u=0))
_hz = plt.plot(z, 'r.', label='observations')
# a-posteriori state estimates:
_hx = plt.plot(x[:, 0], 'b-', label='Kalman output')
_ht = plt.plot(truth, 'g-', label='true voltage')
plt.legend()
plt.title('Automobile Voltimeter Example')
plt.show('hold')
# for k in range(m):
# [M,P] = kf_predict(M,P,A,Q);
# [M,P] = kf_update(M,P,Y(k),H,R);
#
# MM(:,k) = M;
# PP(:,:,k) = P;
#
# %
# % Animate
# %
# if rem(k,10)==1
# plot(T,X,'b--',...
# T,Y,'ro',...
# T(k),M(1),'k*',...
# T(1:k),MM(1,1:k),'k-');
# legend('Real signal','Measurements','Latest estimate',
# 'Filtered estimate')
# title('Estimating a noisy sine signal with Kalman filter.');
# drawnow;
#
# pause;
# end
# end
#
# clc;
# disp('In this demonstration we estimate a stationary sine signal '
# 'from noisy measurements by using the classical Kalman filter.');
# disp(' ');
# disp('The filtering results are now displayed sequantially for 10 time '
# 'step at a time.');
# disp(' ');
# disp('<push any key to see the filtered and smoothed results together>')
# pause;
# %
# % Apply Kalman smoother
# %
# SM = rts_smooth(MM,PP,A,Q);
# plot(T,X,'b--',...
# T,MM(1,:),'k-',...
# T,SM(1,:),'r-');
# legend('Real signal','Filtered estimate','Smoothed estimate')
# title('Filtered and smoothed estimate of the original signal');
#
# clc;
# disp('The filtered and smoothed estimates of the signal are now '
# 'displayed.')
# disp(' ');
# disp('RMS errors:');
# %
# % Errors
# %
# fprintf('KF = %.3f\nRTS = %.3f\n',...
# sqrt(mean((MM(1,:)-X(1,:)).^2)),...
# sqrt(mean((SM(1,:)-X(1,:)).^2)));
class HampelFilter(object):
"""Hampel Filter.
@ -1322,10 +1051,8 @@ class HampelFilter(object):
@staticmethod
def _check(dx):
if not np.isscalar(dx):
raise ValueError('DX must be a scalar.')
if dx < 0:
raise ValueError('DX must be larger than zero.')
_assert(np.isscalar(dx), 'DX must be a scalar.')
_assert(0 < dx, 'DX must be larger than zero.')
@staticmethod
def localwindow(X, Y, DX, i):
@ -1421,154 +1148,6 @@ class HampelFilter(object):
return YY
def demo_hampel():
randint = np.random.randint
Y = 5000 + np.random.randn(1000)
outliers = randint(0, 1000, size=(10,))
Y[outliers] = Y[outliers] + randint(1000, size=(10,))
YY, res = HampelFilter(dx=3, t=3, fulloutput=True)(Y)
YY1, res1 = HampelFilter(dx=1, t=3, adaptive=0.1, fulloutput=True)(Y)
YY2, res2 = HampelFilter(dx=3, t=0, fulloutput=True)(Y) # median
plt.figure(1)
plot_hampel(Y, YY, res)
plt.title('Standard HampelFilter')
plt.figure(2)
plot_hampel(Y, YY1, res1)
plt.title('Adaptive HampelFilter')
plt.figure(3)
plot_hampel(Y, YY2, res2)
plt.title('Median filter')
plt.show('hold')
def plot_hampel(Y, YY, res):
X = np.arange(len(YY))
plt.plot(X, Y, 'b.') # Original Data
plt.plot(X, YY, 'r') # Hampel Filtered Data
plt.plot(X, res['Y0'], 'b--') # Nominal Data
plt.plot(X, res['LB'], 'r--') # Lower Bounds on Hampel Filter
plt.plot(X, res['UB'], 'r--') # Upper Bounds on Hampel Filter
i = res['outliers']
plt.plot(X[i], Y[i], 'ks') # Identified Outliers
# plt.show('hold')
def test_tide_filter():
# import statsmodels.api as sa
import wafo.spectrum.models as sm
sd = 10
Sj = sm.Jonswap(Hm0=4.*sd)
S = Sj.tospecdata()
q = (0.1 * sd) ** 2 # variance of process noise s the car operates
r = (100 * sd) ** 2 # variance of measurement error
b = 0 # no system input
u = 0 # no system input
from scipy.signal import butter, filtfilt, lfilter_zi # lfilter,
freq_tide = 1. / (12 * 60 * 60)
freq_wave = 1. / 10
freq_filt = freq_wave / 10
dt = 1.
freq = 1. / dt
fn = (freq / 2)
P = 10 * np.diag([1, 0.01])
R = r
H = np.atleast_2d([1, 0])
F = np.atleast_2d([[0, 1],
[0, 0]])
A, Q = lti_disc(F, L=None, Q=np.diag([0, q]), dt=dt)
t = np.arange(0, 60 * 12, 1. / freq)
w = 2 * np.pi * freq # 1 Hz
tide = 100 * np.sin(freq_tide * w * t + 2 * np.pi / 4) + 100
y = tide + S.sim(len(t), dt=1. / freq)[:, 1].ravel()
# lowess = sa.nonparametric.lowess
# y2 = lowess(y, t, frac=0.5)[:,1]
filt = Kalman(R=R, x=np.array([[tide[0]], [0]]), P=P, A=A, Q=Q, H=H, B=b)
filt2 = Kalman(R=R, x=np.array([[tide[0]], [0]]), P=P, A=A, Q=Q, H=H, B=b)
# y = tide + 0.5 * np.sin(freq_wave * w * t)
# Butterworth filter
b, a = butter(9, (freq_filt / fn), btype='low')
# y2 = [lowess(y[max(i-60,0):i + 1], t[max(i-60,0):i + 1], frac=.3)[-1,1]
# for i in range(len(y))]
# y2 = [lfilter(b, a, y[:i + 1])[i] for i in range(len(y))]
# y3 = filtfilt(b, a, y[:16]).tolist() + [filtfilt(b, a, y[:i + 1])[i]
# for i in range(16, len(y))]
# y0 = medfilt(y, 41)
_zi = lfilter_zi(b, a)
# y2 = lfilter(b, a, y)#, zi=y[0]*zi) # standard filter
y3 = filtfilt(b, a, y) # filter with phase shift correction
y4 = []
y5 = []
for _i, j in enumerate(y):
tmp = filt(j, u=u).ravel()
tmp = filt2(tmp[0], u=u).ravel()
# if i==0:
# print(filt.x)
# print(filt2.x)
y4.append(tmp[0])
y5.append(tmp[1])
_y0 = medfilt(y4, 41)
print(filt.P)
# plot
plt.plot(t, y, 'r.-', linewidth=2, label='raw data')
# plt.plot(t, y2, 'b.-', linewidth=2, label='lowess @ %g Hz' % freq_filt)
# plt.plot(t, y2, 'b.-', linewidth=2, label='filter @ %g Hz' % freq_filt)
plt.plot(t, y3, 'g.-', linewidth=2, label='filtfilt @ %g Hz' % freq_filt)
plt.plot(t, y4, 'k.-', linewidth=2, label='kalman')
# plt.plot(t, y5, 'k.', linewidth=2, label='kalman2')
plt.plot(t, tide, 'y-', linewidth=2, label='True tide')
plt.legend(frameon=False, fontsize=14)
plt.xlabel("Time [s]")
plt.ylabel("Amplitude")
plt.show('hold')
def test_smooth():
t = np.linspace(-4, 4, 500)
y = np.exp(-t ** 2) + np.random.normal(0, 0.05, t.shape)
n = 11
ysg = SavitzkyGolay(n, degree=1, diff_order=0)(y)
plt.plot(t, y, t, ysg, '--')
plt.show('hold')
def test_hodrick_cardioid():
t = np.linspace(0, 2 * np.pi, 1000)
cos = np.cos
sin = np.sin
randn = np.random.randn
x0 = 2 * cos(t) * (1 - cos(t))
x = x0 + randn(t.size) * 0.1
y0 = 2 * sin(t) * (1 - cos(t))
y = y0 + randn(t.size) * 0.1
smooth = HodrickPrescott(w=20000)
# smooth = HampelFilter(adaptive=50)
z = smooth(x) + 1j * smooth(y)
plt.plot(x0, y0, 'y',
x, y, 'r.',
z.real, z.imag, 'k', linewidth=2)
plt.show('hold')
def test_docstrings():
import doctest
print('Testing docstrings in %s' % __file__)
doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)
if __name__ == '__main__':
test_docstrings()
# test_kalman_sine()
# test_tide_filter()
# demo_hampel()
# test_kalman()
# test_smooth()
# test_hodrick_cardioid()
# test_smoothn_1d()
# test_smoothn_cardioid()
from wafo.testing import test_docstrings
test_docstrings(__file__)

File diff suppressed because one or more lines are too long
Loading…
Cancel
Save