Refactored smoothn into SmoothNd and _Filter classes

master
Per A Brodtkorb 9 years ago
parent 8e411b384d
commit f207791b6e

@ -286,6 +286,238 @@ def evar(y):
return noisevar return noisevar
class _Filter(object):
def __init__(self, y, z0, weightstr, weights, s, robust, maxiter, tolz):
self.y = y
self.z0 = z0
self.weightstr = weightstr
self.s = s
self.robust = robust
self.maxiter = maxiter
self.tolz = tolz
self.auto_smooth = s is None
self.is_finite = np.isfinite(y)
self.nof = self.is_finite.sum() # number of finite elements
self.W = self._normalized_weights(weights, self.is_finite)
self.gamma = self._gamma_fun(y)
self.N = self._tensor_rank(y)
self.s_min, self.s_max = self._smoothness_limits(self.N)
# Initialize before iterating
self.Wtot = self.W
self.is_weighted = (self.W < 1).any() # Weighted or missing data?
self.z0 = self._get_start_condition(y, z0)
self.y[~self.is_finite] = 0 # arbitrary values for missing y-data
# Error on p. Smoothness parameter s = 10^p
self.errp = 0.1
# Relaxation factor RF: to speedup convergence
self.RF = 1.75 if self.is_weighted else 1.0
@staticmethod
def _tensor_rank(y):
"""tensor rank of the y-array"""
return (np.array(y.shape) != 1).sum()
@staticmethod
def _smoothness_limits(n):
"""
Return upper and lower bound for the smoothness parameter
The average leverage (h) is by definition in [0 1]. Weak smoothing
occurs if h is close to 1, while over-smoothing appears when h is
near 0. Upper and lower bounds for h are given to avoid under- or
over-smoothing. See equation relating h to the smoothness parameter
(Equation #12 in the referenced CSDA paper).
"""
h_min = 1e-6 ** (2. / n)
h_max = 0.99 ** (2. / n)
s_min = (((1 + sqrt(1 + 8 * h_max)) / 4. / h_max) ** 2 - 1) / 16
s_max = (((1 + sqrt(1 + 8 * h_min)) / 4. / h_min) ** 2 - 1) / 16
return s_min, s_max
@staticmethod
def _lambda_tensor(y):
"""
Return the Lambda tensor
Lambda contains the eigenvalues of the difference matrix used in this
penalized least squares process.
"""
d = y.ndim
Lambda = np.zeros(y.shape)
shape0 = [1, ] * d
for i in range(d):
shape0[i] = y.shape[i]
Lambda = Lambda + \
np.cos(pi * np.arange(y.shape[i]) / y.shape[i]).reshape(shape0)
shape0[i] = 1
Lambda = -2 * (d - Lambda)
return Lambda
def _gamma_fun(self, y):
Lambda = self._lambda_tensor(y)
def gamma(s):
return 1. / (1 + s * Lambda ** 2)
return gamma
@staticmethod
def _initial_guess(y, I):
# Initial Guess with weighted/missing data
# nearest neighbor interpolation (in case of missing values)
z = y
if (1 - I).any():
notI = ~I
z, L = distance_transform_edt(notI, return_indices=True)
z[notI] = y[L.flat[notI]]
# coarse fast smoothing using one-tenth of the DCT coefficients
shape = z.shape
d = z.ndim
z = dctn(z)
for k in range(d):
z[int((shape[k] + 0.5) / 10) + 1::, ...] = 0
z = z.reshape(np.roll(shape, -k))
z = z.transpose(np.roll(range(d), -1))
# z = shiftdim(z,1);
return idctn(z)
def _get_start_condition(self, y, z0):
# Initial conditions for z
if self.is_weighted:
# With weighted/missing data
# An initial guess is provided to ensure faster convergence. For
# that purpose, a nearest neighbor interpolation followed by a
# coarse smoothing are performed.
if z0 is None:
z = self._initial_guess(y, self.is_finite)
else:
z = z0 # an initial guess (z0) has been provided
else:
z = np.zeros(y.shape)
return z
@staticmethod
def _normalized_weights(weight, is_finite):
""" Return normalized weights.
Zero weights are assigned to not finite values (Inf or NaN),
(Inf/NaN values = missing data).
"""
weights = weight * is_finite
if (weights < 0).any():
raise ValueError('Weights must all be >=0')
return weights / weights.max()
@staticmethod
def _studentized_residuals(r, I, h):
median_abs_deviation = np.median(abs(r[I] - np.median(r[I])))
return abs(r / (1.4826 * median_abs_deviation) / sqrt(1 - h))
def robust_weights(self, r, I, h):
"""Return weights for robust smoothing."""
def bisquare(u):
c = 4.685
return (1 - (u / c) ** 2) ** 2 * ((u / c) < 1)
def talworth(u):
c = 2.795
return u < c
def cauchy(u):
c = 2.385
return 1. / (1 + (u / c) ** 2)
u = self._studentized_residuals(r, I, h)
wfun = {'cauchy': cauchy, 'talworth': talworth}.get(self.weightstr,
bisquare)
weights = wfun(u)
weights[np.isnan(weights)] = 0
return weights
@staticmethod
def _average_leverage(s, N):
h = sqrt(1 + 16 * s)
h = sqrt(1 + h) / sqrt(2) / h
return h ** N
def check_smooth_parameter(self, s):
if self.auto_smooth:
if abs(np.log10(s) - np.log10(self.s_min)) < self.errp:
warnings.warn('''s = %g: the lower bound for s has been reached.
Put s as an input variable if required.''' % s)
elif abs(np.log10(s) - np.log10(self.s_max)) < self.errp:
warnings.warn('''s = %g: the Upper bound for s has been reached.
Put s as an input variable if required.''' % s)
def gcv(self, p, aow, DCTy, y, Wtot):
# Search the smoothing parameter s that minimizes the GCV score
s = 10.0 ** p
Gamma = self.gamma(s)
if aow > 0.9: # aow = 1 means that all of the data are equally weighted
# very much faster: does not require any inverse DCT
residual = DCTy.ravel() * (Gamma.ravel() - 1)
else:
# take account of the weights to calculate RSS:
is_finite = self.is_finite
yhat = idctn(Gamma * DCTy)
residual = sqrt(Wtot[is_finite]) * (y[is_finite] - yhat[is_finite])
TrH = Gamma.sum()
RSS = linalg.norm(residual)**2 # Residual sum-of-squares
GCVscore = RSS / self.nof / (1.0 - TrH / y.size) ** 2
return GCVscore
def __call__(self, z, s):
auto_smooth = self.auto_smooth
norm = linalg.norm
y = self.y
Wtot = self.Wtot
Gamma = 1
if s is not None:
Gamma = self.gamma(s)
# "amount" of weights (see the function GCVscore)
aow = Wtot.sum() / y.size # 0 < aow <= 1
for nit in range(self.maxiter):
DCTy = dctn(Wtot * (y - z) + z)
if auto_smooth and not np.remainder(np.log2(nit + 1), 1):
# The generalized cross-validation (GCV) method is used.
# We seek the smoothing parameter s that minimizes the GCV
# score i.e. s = Argmin(GCVscore).
# Because this process is time-consuming, it is performed from
# time to time (when nit is a power of 2)
log10s = optimize.fminbound(
self.gcv, np.log10(self.s_min), np.log10(self.s_max),
args=(aow, DCTy, y, Wtot),
xtol=self.errp, full_output=False, disp=False)
s = 10 ** log10s
Gamma = self.gamma(s)
z0 = z
z = self.RF * idctn(Gamma * DCTy) + (1 - self.RF) * z
# if no weighted/missing data => tol=0 (no iteration)
tol = norm(z0.ravel() - z.ravel()) / norm(z.ravel())
converged = tol <= self.tolz or not self.is_weighted
if converged:
break
if self.robust:
# -- Robust Smoothing: iteratively re-weighted process
h = self._average_leverage(s, self.N)
self.Wtot = self.W * self.robust_weights(y - z, self.is_finite, h)
# re-initialize for another iterative weighted process
self.is_weighted = True
return z, s, converged
def smoothn(data, s=None, weight=None, robust=False, z0=None, tolz=1e-3, def smoothn(data, s=None, weight=None, robust=False, z0=None, tolz=1e-3,
maxiter=100, fulloutput=False): maxiter=100, fulloutput=False):
''' '''
@ -303,7 +535,7 @@ def smoothn(data, s=None, weight=None, robust=False, z0=None, tolz=1e-3,
weight : string or array weights weight : string or array weights
weighting array of real positive values, that must have the same size weighting array of real positive values, that must have the same size
as DATA. Note that a zero weight corresponds to a missing value. as DATA. Note that a zero weight corresponds to a missing value.
robust : bool robust : bool
If true carry out a robust smoothing that minimizes the influence of If true carry out a robust smoothing that minimizes the influence of
outlying data. outlying data.
tolz : real positive scalar tolz : real positive scalar
@ -414,222 +646,68 @@ def smoothn(data, s=None, weight=None, robust=False, z0=None, tolz=1e-3,
http://www.biomecardio.com/matlab/smoothn.html http://www.biomecardio.com/matlab/smoothn.html
for more details about SMOOTHN for more details about SMOOTHN
''' '''
return SmoothNd(s, weight, robust, z0, tolz, maxiter, fulloutput)(data)
class SmoothNd(object):
def __init__(self, s=None, weight=None, robust=False, z0=None, tolz=1e-3,
maxiter=100, fulloutput=False):
self.s = s
self.weight = weight
self.robust = robust
self.z0 = z0
self.tolz = tolz
self.maxiter = maxiter
self.fulloutput = fulloutput
y = np.atleast_1d(data) @property
sizy = y.shape def weightstr(self):
noe = y.size if isinstance(self._weight, str):
if noe < 2: return self._weight.lower()
return data return 'bisquare'
weightstr = 'bisquare' @property
W = np.ones(sizy) def weight(self):
# Smoothness parameter and weights if self._weight is None or isinstance(self._weight, str):
if weight is None: return 1.0
pass return self._weight
elif isinstance(weight, str):
weightstr = weight.lower() @weight.setter
else: def weight(self, weight):
W = weight self._weight = weight
# Weights. Zero weights are assigned to not finite values (Inf or NaN), def _init_filter(self, y):
# (Inf/NaN values = missing data). return _Filter(y, self.z0, self.weightstr, self.weight, self.s,
IsFinite = np.isfinite(y) self.robust, self.maxiter, self.tolz)
nof = IsFinite.sum() # number of finite elements
W = W * IsFinite def __call__(self, data):
if (W < 0).any():
raise ValueError('Weights must all be >=0') y = np.atleast_1d(data)
if y.size < 2:
W = W / W.max() return data
isweighted = (W < 1).any() # Weighted or missing data? _filter = self._init_filter(y)
isauto = s is None # Automatic smoothing? z = _filter.z0
# Creation of the Lambda tensor s = _filter.s
# Lambda contains the eingenvalues of the difference matrix used in this num_steps = 3 if self.robust else 1
# penalized least squares process. converged = False
d = y.ndim for i in range(num_steps):
Lambda = np.zeros(sizy) z, s, converged = _filter(z, s)
siz0 = [1, ] * d
for i in range(d): if converged and num_steps <= i+1:
siz0[i] = sizy[i]
Lambda = Lambda + \
np.cos(pi * np.arange(sizy[i]) / sizy[i]).reshape(siz0)
siz0[i] = 1
Lambda = -2 * (d - Lambda)
if not isauto:
Gamma = 1. / (1 + s * Lambda ** 2)
# Upper and lower bound for the smoothness parameter
# The average leverage (h) is by definition in [0 1]. Weak smoothing occurs
# if h is close to 1, while over-smoothing appears when h is near 0. Upper
# and lower bounds for h are given to avoid under- or over-smoothing. See
# equation relating h to the smoothness parameter (Equation #12 in the
# referenced CSDA paper).
N = (np.array(sizy) != 1).sum() # tensor rank of the y-array
hMin = 1e-6
hMax = 0.99
sMinBnd = (((1 + sqrt(1 + 8 * hMax ** (2. / N))) / 4. /
hMax ** (2. / N)) ** 2 - 1) / 16
sMaxBnd = (((1 + sqrt(1 + 8 * hMin ** (2. / N))) / 4. /
hMin ** (2. / N)) ** 2 - 1) / 16
# Initialize before iterating
Wtot = W
# Initial conditions for z
if isweighted:
# With weighted/missing data
# An initial guess is provided to ensure faster convergence. For that
# purpose, a nearest neighbor interpolation followed by a coarse
# smoothing are performed.
if z0 is None:
z = InitialGuess(y, IsFinite)
else:
# an initial guess (z0) has been provided
z = z0
else:
z = np.zeros(sizy)
z0 = z
y[~IsFinite] = 0 # arbitrary values for missing y-data
tol = 1
RobustIterativeProcess = True
RobustStep = 1
# Error on p. Smoothness parameter s = 10^p
errp = 0.1
# Relaxation factor RF: to speedup convergence
RF = 1.75 if isweighted else 1.0
norm = linalg.norm
# Main iterative process
while RobustIterativeProcess:
# "amount" of weights (see the function GCVscore)
aow = Wtot.sum() / noe # 0 < aow <= 1
exitflag = True
for nit in range(1, maxiter + 1):
DCTy = dctn(Wtot * (y - z) + z)
if isauto and not np.remainder(np.log2(nit), 1):
# The generalized cross-validation (GCV) method is used.
# We seek the smoothing parameter s that minimizes the GCV
# score i.e. s = Argmin(GCVscore).
# Because this process is time-consuming, it is performed from
# time to time (when nit is a power of 2)
log10s = optimize.fminbound(
gcv, np.log10(sMinBnd), np.log10(sMaxBnd),
args=(aow, Lambda, DCTy, y, Wtot, IsFinite, nof, noe),
xtol=errp, full_output=False, disp=False)
s = 10 ** log10s
Gamma = 1.0 / (1 + s * Lambda ** 2)
z = RF * idctn(Gamma * DCTy) + (1 - RF) * z
# if no weighted/missing data => tol=0 (no iteration)
tol = norm(z0.ravel() - z.ravel()) / norm(
z.ravel()) if isweighted else 0.0
if tol <= tolz:
break break
z0 = z # re-initialization
else:
exitflag = False # nit<MaxIter;
if robust:
# -- Robust Smoothing: iteratively re-weighted process
# --- average leverage
h = sqrt(1 + 16 * s)
h = sqrt(1 + h) / sqrt(2) / h
h = h ** N
# take robust weights into account
Wtot = W * RobustWeights(y - z, IsFinite, h, weightstr)
# re-initialize for another iterative weighted process
isweighted = True
tol = 1
RobustStep = RobustStep + 1
# 3 robust steps are enough.
RobustIterativeProcess = RobustStep < 4
else: else:
RobustIterativeProcess = False # stop the whole process msg = '''Maximum number of iterations (%d) has been exceeded.
Increase MaxIter option or decrease TolZ value.''' % (self.maxiter)
warnings.warn(msg)
# Warning messages _filter.check_smooth_parameter(s)
if isauto:
if abs(np.log10(s) - np.log10(sMinBnd)) < errp:
warnings.warn('''s = %g: the lower bound for s has been reached.
Put s as an input variable if required.''' % s)
elif abs(np.log10(s) - np.log10(sMaxBnd)) < errp:
warnings.warn('''s = %g: the Upper bound for s has been reached.
Put s as an input variable if required.''' % s)
if not exitflag: if self.fulloutput:
warnings.warn('''Maximum number of iterations (%d) has been exceeded. return z, s
Increase MaxIter option or decrease TolZ value.''' % (maxiter))
if fulloutput:
return z, s
else:
return z return z
def gcv(p, aow, Lambda, DCTy, y, Wtot, IsFinite, nof, noe):
# Search the smoothing parameter s that minimizes the GCV score
s = 10.0 ** p
Gamma = 1.0 / (1 + s * Lambda ** 2)
# RSS = Residual sum-of-squares
if aow > 0.9: # aow = 1 means that all of the data are equally weighted
# very much faster: does not require any inverse DCT
RSS = linalg.norm(DCTy.ravel() * (Gamma.ravel() - 1)) ** 2
else:
# take account of the weights to calculate RSS:
yhat = idctn(Gamma * DCTy)
RSS = linalg.norm(sqrt(Wtot[IsFinite]) *
(y[IsFinite] - yhat[IsFinite])) ** 2
TrH = Gamma.sum()
GCVscore = RSS / nof / (1.0 - TrH / noe) ** 2
return GCVscore
def RobustWeights(r, I, h, wstr):
# weights for robust smoothing.
MAD = np.median(abs(r[I] - np.median(r[I]))) # median absolute deviation
u = abs(r / (1.4826 * MAD) / sqrt(1 - h)) # studentized residuals
if wstr == 'cauchy':
c = 2.385
W = 1. / (1 + (u / c) ** 2) # Cauchy weights
elif wstr == 'talworth':
c = 2.795
W = u < c # Talworth weights
else: # bisquare weights
c = 4.685
W = (1 - (u / c) ** 2) ** 2 * ((u / c) < 1)
W[np.isnan(W)] = 0
return W
def InitialGuess(y, I):
# Initial Guess with weighted/missing data
# nearest neighbor interpolation (in case of missing values)
z = y
if (1 - I).any():
notI = ~I
z, L = distance_transform_edt(notI, return_indices=True)
z[notI] = y[L.flat[notI]]
# coarse fast smoothing using one-tenth of the DCT coefficients
siz = z.shape
d = z.ndim
z = dctn(z)
for k in range(d):
z[int((siz[k] + 0.5) / 10) + 1::, ...] = 0
z = z.reshape(np.roll(siz, -k))
z = z.transpose(np.roll(range(z.ndim), -1))
# z = shiftdim(z,1);
z = idctn(z)
return z
def test_smoothn_1d(): def test_smoothn_1d():
x = np.linspace(0, 100, 2 ** 8) x = np.linspace(0, 100, 2 ** 8)
y = np.cos(x / 10) + (x / 50) ** 2 + np.random.randn(x.size) / 10 y = np.cos(x / 10) + (x / 50) ** 2 + np.random.randn(x.size) / 10
@ -1476,11 +1554,11 @@ def test_docstrings():
if __name__ == '__main__': if __name__ == '__main__':
# test_docstrings() # test_docstrings()
test_kalman_sine() # test_kalman_sine()
# test_tide_filter() # test_tide_filter()
# demo_hampel() # demo_hampel()
# test_kalman() # test_kalman()
# test_smooth() # test_smooth()
# test_hodrick_cardioid() # test_hodrick_cardioid()
# test_smoothn_1d() test_smoothn_1d()
# test_smoothn_cardioid() # test_smoothn_cardioid()

Loading…
Cancel
Save