updated Kalman and HampelFilter

master
Per A Brodtkorb 9 years ago
parent 6257bc6a96
commit 8e411b384d

@ -572,7 +572,7 @@ def smoothn(data, s=None, weight=None, robust=False, z0=None, tolz=1e-3,
def gcv(p, aow, Lambda, DCTy, y, Wtot, IsFinite, nof, noe):
# Search the smoothing parameter s that minimizes the GCV score
s = 10 ** p
s = 10.0 ** p
Gamma = 1.0 / (1 + s * Lambda ** 2)
# RSS = Residual sum-of-squares
if aow > 0.9: # aow = 1 means that all of the data are equally weighted
@ -589,7 +589,6 @@ def gcv(p, aow, Lambda, DCTy, y, Wtot, IsFinite, nof, noe):
return GCVscore
# Robust weights
def RobustWeights(r, I, h, wstr):
# weights for robust smoothing.
MAD = np.median(abs(r[I] - np.median(r[I]))) # median absolute deviation
@ -882,7 +881,7 @@ class Kalman(object):
self.x = x # Initial state estimate.
self.P = P # Initial covariance estimate.
self.A = A # State transition matrix.
self.B = B # Control matrix.
self.B = B # Control matrix.
self.Q = Q # Estimated error in process.
self.H = H # Observation matrix.
self.reset()
@ -890,38 +889,55 @@ class Kalman(object):
def reset(self):
self._filter = self._filter_first
def _filter_first(self, z, u):
self._filter = self._filter_main
auto_init = self.x is None
if auto_init:
n = np.size(z)
else:
n = np.size(self.x)
def _set_A(self, n):
if self.A is None:
self.A = np.eye(n)
self.A = np.atleast_2d(self.A)
def _set_Q(self, n):
if self.Q is None:
self.Q = np.zeros((n, n))
self.Q = np.atleast_2d(self.Q)
def _set_H(self, n):
if self.H is None:
self.H = np.eye(n)
self.H = np.atleast_2d(self.H)
def _set_P(self, HI):
if self.P is None:
self.P = np.dot(np.dot(HI, self.R), HI.T)
self.P = np.atleast_2d(self.P)
def _init_first(self, n):
self._set_A(n)
self._set_Q(n)
self._set_H(n)
try:
HI = np.linalg.inv(self.H)
except:
HI = np.eye(n)
if self.P is None:
self.P = np.dot(np.dot(HI, self.R), HI.T)
self._set_P(HI)
return HI
self.P = np.atleast_2d(self.P)
if auto_init:
# initialize state estimate from first observation
self.x = np.dot(HI, z)
def _first_state(self, z):
n = np.size(z)
HI = self._init_first(n)
# initialize state estimate from first observation
x = np.dot(HI, z)
return x
def _filter_first(self, z, u):
self._filter = self._filter_main
if self.x is None:
self.x = self._first_state(z)
return self.x
else:
return self._filter_main(z, u)
n = np.size(self.x)
self._init_first(n)
return self._filter_main(z, u)
def _predict_state(self, x, u):
return np.dot(self.A, x) + np.dot(self.B, u)
@ -1043,12 +1059,12 @@ def lti_disc(F, L=None, Q=None, dt=1):
def test_kalman_sine():
"""Kalman Filter demonstration with sine signal."""
sd = 1.
sd = 0.5
dt = 0.1
w = 1
T = np.arange(0, 30 + dt / 2, dt)
n = len(T)
X = np.sin(w * T)
X = 3*np.sin(w * T)
Y = X + sd * np.random.randn(n)
''' Initialize KF to values
@ -1091,7 +1107,7 @@ def test_kalman_sine():
_ht = plt.plot(truth, 'g-', label='true voltage')
plt.legend()
plt.title('Automobile Voltimeter Example')
plt.show()
plt.show('hold')
# for k in range(m):
# [M,P] = kf_predict(M,P,A,Q);
@ -1274,6 +1290,22 @@ class HampelFilter(object):
ADX = dx * np.ones(Y.shape)
return Y0, S0, ADX
def _fixed(self, Y, X, dx):
localwindow = self.localwindow
Y0, S0, ADX = self._init(Y, dx)
for i in range(len(Y)):
Y0[i], S0[i] = localwindow(X, Y, dx, i)
return Y0, S0, ADX
def _filter(self, Y, X, dx):
if len(X) <= 1:
Y0, S0, ADX = self._init(Y, dx)
elif self.adaptive is None:
Y0, S0, ADX = self._fixed(Y, X, dx)
else:
Y0, S0, ADX = self._adaptive(Y, X, dx) # 'adaptive'
return Y0, S0, ADX
def __call__(self, y, x=None):
Y = np.atleast_1d(y).ravel()
if x is None:
@ -1283,21 +1315,14 @@ class HampelFilter(object):
dx = 3 * np.median(np.diff(X)) if self.dx is None else self.dx
self._check(dx)
if len(X) > 1:
if self.adaptive is None:
localwindow = self.localwindow
Y0, S0, ADX = self._init(Y, dx)
for i in range(len(Y)):
Y0[i], S0[i] = localwindow(X, Y, dx, i)
else: # 'adaptive'
Y0, S0, ADX = self._adaptive(Y, X, dx)
Y0, S0, ADX = self._filter(Y, X, dx)
YY = Y.copy()
T = self.t
# Prepare Output
self.UB = Y0 + T * S0
self.LB = Y0 - T * S0
outliers = np.abs(Y - Y0) > T * S0 # possible outliers
YY[outliers] = Y0[outliers]
np.putmask(YY, outliers, Y0) # YY[outliers] = Y0[outliers]
self.outliers = outliers
self.num_outliers = outliers.sum()
self.ADX = ADX
@ -1308,7 +1333,7 @@ class HampelFilter(object):
return YY
def test_hampel():
def demo_hampel():
randint = np.random.randint
Y = 5000 + np.random.randn(1000)
outliers = randint(0, 1000, size=(10,))
@ -1450,12 +1475,12 @@ def test_docstrings():
doctest.testmod(optionflags=doctest.NORMALIZE_WHITESPACE)
if __name__ == '__main__':
test_docstrings()
# test_kalman_sine()
# test_docstrings()
test_kalman_sine()
# test_tide_filter()
# test_hampel()
# demo_hampel()
# test_kalman()
# test_smooth()
# test_hodrick_cardioid()
test_smoothn_1d()
# test_smoothn_1d()
# test_smoothn_cardioid()

Loading…
Cancel
Save