Refactored to reduce cyclomatic complexity

master
Per A Brodtkorb 8 years ago
parent 8178eaa720
commit 5e6cfbe5ee

@ -6,7 +6,7 @@ import scipy.signal
import scipy.sparse as sparse
from numpy import ones, zeros, prod, sin, diff, pi, inf, vstack, linspace
from scipy.interpolate import BPoly, interp1d
from scipy.signal import fftconvolve
from wafo import polynomial as pl
@ -16,6 +16,13 @@ __all__ = [
'StinemanInterp', 'CubicHermiteSpline']
def _check_window_and_order(window_size, order):
if window_size % 2 != 1 or window_size < 1:
raise TypeError("window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("window_size is too small for the polynomials order")
def savitzky_golay(y, window_size, order, deriv=0):
"""Smooth (and optionally differentiate) data with a Savitzky-Golay filter.
The Savitzky-Golay filter removes high frequency noise from data.
@ -76,15 +83,11 @@ def savitzky_golay(y, window_size, order, deriv=0):
W.H. Press, S.A. Teukolsky, W.T. Vetterling, B.P. Flannery
Cambridge University Press ISBN-13: 9780521880688
"""
try:
window_size = np.abs(np.int(window_size))
order = np.abs(np.int(order))
except ValueError:
raise ValueError("window_size and order have to be of type int")
if window_size % 2 != 1 or window_size < 1:
raise TypeError("window_size size must be a positive odd number")
if window_size < order + 2:
raise TypeError("window_size is too small for the polynomials order")
_check_window_and_order(window_size, order)
order_range = range(order + 1)
half_window = (window_size - 1) // 2
# precompute coefficients
@ -99,6 +102,22 @@ def savitzky_golay(y, window_size, order, deriv=0):
return np.convolve(m, y, mode='valid')
def _get_turnpoint(xvals):
turnpoint = 0
last = len(xvals)
if xvals[0] < xvals[1]: # x is increasing?
compare = lambda a, b: a < b
else: # no, x is decreasing
compare = lambda a, b: a > b
for i in range(1, last): # yes
if compare(xvals[i], xvals[i - 1]): # search where x starts to fall or rise
turnpoint = i
break
return turnpoint
def savitzky_golay_piecewise(xvals, data, kernel=11, order=4):
'''
One of the most popular applications of S-G filter, apart from smoothing
@ -132,21 +151,11 @@ def savitzky_golay_piecewise(xvals, data, kernel=11, order=4):
h=plt.plot(x, yn, 'r', x, y, 'k', x, yr, 'b.')
'''
turnpoint = 0
last = len(xvals)
if xvals[1] > xvals[0]: # x is increasing?
for i in range(1, last): # yes
if xvals[i] < xvals[i - 1]: # search where x starts to fall
turnpoint = i
break
else: # no, x is decreasing
for i in range(1, last): # search where it starts to rise
if xvals[i] > xvals[i - 1]:
turnpoint = i
break
turnpoint = _get_turnpoint(xvals)
if turnpoint == 0: # no change in direction of x
return savitzky_golay(data, kernel, order)
else:
# smooth the first piece
firstpart = savitzky_golay(data[0:turnpoint], kernel, order)
# recursively smooth the rest
@ -236,60 +245,46 @@ def sgolay2d(z, window_size, order, derivative=None):
Z = np.zeros((new_shape))
# top band
band = z[0, :]
Z[:half_size, half_size:-half_size] = band - \
np.abs(np.flipud(z[1:half_size + 1, :]) - band)
Z[:half_size, half_size:-half_size] = band - np.abs(np.flipud(z[1:half_size + 1, :]) - band)
# bottom band
band = z[-1, :]
Z[-half_size:, half_size:-half_size] = band + \
np.abs(np.flipud(z[-half_size - 1:-1, :]) - band)
Z[-half_size:, half_size:-half_size] = band + np.abs(np.flipud(z[-half_size - 1:-1, :]) - band)
# left band
band = np.tile(z[:, 0].reshape(-1, 1), [1, half_size])
Z[half_size:-half_size, :half_size] = band - \
np.abs(np.fliplr(z[:, 1:half_size + 1]) - band)
Z[half_size:-half_size, :half_size] = band - np.abs(np.fliplr(z[:, 1:half_size + 1]) - band)
# right band
band = np.tile(z[:, -1].reshape(-1, 1), [1, half_size])
Z[half_size:-half_size, -half_size:] = band + \
np.abs(np.fliplr(z[:, -half_size - 1:-1]) - band)
Z[half_size:-half_size, -half_size:] = band + np.abs(np.fliplr(z[:, -half_size - 1:-1]) - band)
# central band
Z[half_size:-half_size, half_size:-half_size] = z
# top left corner
band = z[0, 0]
Z[:half_size, :half_size] = band - \
np.abs(
np.flipud(np.fliplr(z[1:half_size + 1, 1:half_size + 1])) - band)
np.abs(np.flipud(np.fliplr(z[1:half_size + 1, 1:half_size + 1])) - band)
# bottom right corner
band = z[-1, -1]
Z[-half_size:, -half_size:] = band + \
np.abs(np.flipud(np.fliplr(z[-half_size - 1:-1, -half_size - 1:-1])) -
band)
np.abs(np.flipud(np.fliplr(z[-half_size - 1:-1, -half_size - 1:-1])) - band)
# top right corner
band = Z[half_size, -half_size:]
Z[:half_size, -half_size:] = band - \
np.abs(
np.flipud(Z[half_size + 1:2 * half_size + 1, -half_size:]) - band)
np.abs(np.flipud(Z[half_size + 1:2 * half_size + 1, -half_size:]) - band)
# bottom left corner
band = Z[-half_size:, half_size].reshape(-1, 1)
Z[-half_size:, :half_size] = band - \
np.abs(
np.fliplr(Z[-half_size:, half_size + 1:2 * half_size + 1]) - band)
np.abs(np.fliplr(Z[-half_size:, half_size + 1:2 * half_size + 1]) - band)
# solve system and convolve
if derivative is None:
m = np.linalg.pinv(A)[0].reshape((window_size, -1))
return scipy.signal.fftconvolve(Z, m, mode='valid')
elif derivative == 'col':
c = np.linalg.pinv(A)[1].reshape((window_size, -1))
return scipy.signal.fftconvolve(Z, -c, mode='valid')
elif derivative == 'row':
r = np.linalg.pinv(A)[2].reshape((window_size, -1))
return scipy.signal.fftconvolve(Z, -r, mode='valid')
elif derivative == 'both':
c = np.linalg.pinv(A)[1].reshape((window_size, -1))
r = np.linalg.pinv(A)[2].reshape((window_size, -1))
return (scipy.signal.fftconvolve(Z, -r, mode='valid'),
scipy.signal.fftconvolve(Z, -c, mode='valid'))
sgn = {None:1}.get(derivative , -1)
dims = {None: (0,), 'col': (1,), 'row': (2,), 'both':(1, 2)}[derivative]
res = tuple(fftconvolve(Z, sgn * np.linalg.pinv(A)[i].reshape((window_size, -1)), mode='valid')
for i in dims)
if len(dims)>1:
return res
return res[0]
class PPform(object):
@ -545,7 +540,17 @@ class SmoothSpline(PPform):
if lin_extrap:
self.linear_extrapolate(output=False)
def _compute_coefs(self, xx, yy, p=None, var=1):
@staticmethod
def _check(dx, n, ny):
if n < 2:
raise ValueError('There must be >=2 data points.')
elif (dx <= 0).any():
raise ValueError('Two consecutive values in x can not be equal.')
elif n != ny:
raise ValueError('x and y must have the same length.')
@staticmethod
def _spacing(xx, yy, var):
x, y, var = np.atleast_1d(xx, yy, var)
x = x.ravel()
dx = np.diff(x)
@ -555,32 +560,17 @@ class SmoothSpline(PPform):
x = x[ind]
y = y[..., ind]
dx = np.diff(x)
return x, y, dx
n = len(x)
# ndy = y.ndim
szy = y.shape
nd = np.int(prod(szy[:-1]))
ny = szy[-1]
if n < 2:
raise ValueError('There must be >=2 data points.')
elif (dx <= 0).any():
raise ValueError('Two consecutive values in x can not be equal.')
elif n != ny:
raise ValueError('x and y must have the same length.')
dydx = np.diff(y) / dx
if (n == 2): # straight line
coefs = np.vstack([dydx.ravel(), y[0, :]])
else:
def _poly_coefs(self, y, dx, dydx, n, nd, p, var):
dx1 = 1. / dx
D = sparse.spdiags(var * ones(n), 0, n, n) # The variance
u, p = self._compute_u(p, D, dydx, dx, dx1, n)
R = self._compute_r(dx, n)
qdq = self._compute_qdq(D, dx1, n)
if p is None or p < 0 or 1 < p:
p = self._estimate_p(qdq, R)
qq = self._compute_qq(p, qdq, R)
u = self._compute_u(qq, p, dydx, n)
dx1.shape = (n - 1, -1)
dx.shape = (n - 1, -1)
zrs = zeros(nd)
@ -607,46 +597,58 @@ class SmoothSpline(PPform):
di = di.T
ci = ci.T
ai = ai.T
if not any(di):
if not any(ci):
coefs = vstack([bi.ravel(), ai.ravel()])
else:
coefs = vstack([ci.ravel(), bi.ravel(), ai.ravel()])
else:
coefs = vstack(
[di.ravel(), ci.ravel(), bi.ravel(), ai.ravel()])
coefs = vstack([val.ravel() for val in [di, ci, bi, ai] if val.size>0])
return coefs
def _compute_coefs(self, xx, yy, p=None, var=1):
x, y, dx = self._spacing(xx, yy, var)
n = len(x)
szy = y.shape
nd = np.int(prod(szy[:-1]))
ny = szy[-1]
self._check(dx, n, ny)
dydx = np.diff(y) / dx
if (n == 2): # straight line
coefs = np.vstack([dydx.ravel(), y[0, :]])
return coefs, x
coefs = self._poly_coefs(y, dx, dydx, n, nd, p, var)
return coefs, x
@staticmethod
def _compute_u(p, D, dydx, dx, dx1, n):
if p is None or p != 0:
data = [dx[1:n - 1], 2 * (dx[:n - 2] + dx[1:n - 1]), dx[:n - 2]]
R = sparse.spdiags(data, [-1, 0, 1], n - 2, n - 2)
if p is None or p < 1:
def _compute_qdq(D, dx1, n):
Q = sparse.spdiags(
[dx1[:n - 2], -(dx1[:n - 2] + dx1[1:n - 1]), dx1[1:n - 1]],
[0, -1, -2], n, n - 2)
QDQ = (Q.T * D * Q)
if p is None or p < 0:
# Estimate p
p = 1. / \
(1. + QDQ.diagonal().sum() /
(100. * R.diagonal().sum() ** 2))
if p == 0:
QQ = 6 * QDQ
else:
QDQ = Q.T * D * Q
return QDQ
@staticmethod
def _compute_r(dx, n):
data = [dx[1:n - 1], 2 * (dx[:n - 2] + dx[1:n - 1]), dx[:n - 2]]
R = sparse.spdiags(data, [-1, 0, 1], n - 2, n - 2)
return R
@staticmethod
def _estimate_p(QDQ, R):
p = 1. / (1. + QDQ.diagonal().sum() / (100. * R.diagonal().sum() ** 2))
return np.clip(p, 0, 1)
@staticmethod
def _compute_qq(p, QDQ, R):
QQ = (6 * (1 - p)) * (QDQ) + p * R
else:
QQ = R
return QQ
def _compute_u(self,QQ, p, dydx, n):
# Make sure it uses symmetric matrix solver
ddydx = diff(dydx, axis=0)
# sp.linalg.use_solver(useUmfpack=True)
u = 2 * sparse.linalg.spsolve((QQ + QQ.T), ddydx) # @UndefinedVariable
return u.reshape(n - 2, -1), p
return u.reshape(n - 2, -1)
def _edge_case(m0, d1):
@ -685,6 +687,38 @@ def pchip_slopes(x, y):
return dk
def _parabola_slope(x, y, dx, dydx, *args):
yp = np.zeros(y.shape, np.float_)
yp[1:-1] = (dydx[:-1] * dx[1:] + dydx[1:] * dx[:-1]) / (dx[1:] + dx[:-1])
yp[0] = 2.0 * dydx[0] - yp[1]
yp[-1] = 2.0 * dydx[-1] - yp[-2]
return yp
def _secant_slope(x, y, dx, dydx, *args):
yp = np.zeros(y.shape, np.float_)
# At the endpoints - use one-sided differences
yp[0] = dydx[0]
yp[-1] = dydx[-1]
# In the middle - use the average of the secants
yp[1:-1] = (dydx[:-1] + dydx[1:]) / 2.0
return yp
def _catmull_rom_slope(x, y, dx, dydx, *args):
yp = np.zeros(y.shape, np.float_)
# At the endpoints - use one-sided differences
yp[0] = dydx[0]
yp[-1] = dydx[-1]
yp[1:-1] = (y[2:] - y[:-2]) / (x[2:] - x[:-2])
return yp
def _cardinal_slope(x, y, dx, dydx, tension):
yp = (1-tension) * _catmull_rom_slope(x, y, dx, dydx)
return yp
def slopes(x, y, method='parabola', tension=0, monotone=False):
'''
Return estimated slopes y'(x)
@ -720,29 +754,15 @@ def slopes(x, y, method='parabola', tension=0, monotone=False):
'''
x = np.asarray(x, np.float_)
y = np.asarray(y, np.float_)
yp = np.zeros(y.shape, np.float_)
dx = x[1:] - x[:-1]
# Compute the slopes of the secant lines between successive points
dydx = (y[1:] - y[:-1]) / dx
method = method.lower()
if method.startswith('p'): # parabola'):
yp[1:-1] = (dydx[:-1] * dx[1:] + dydx[1:] * dx[:-1]) / \
(dx[1:] + dx[:-1])
yp[0] = 2.0 * dydx[0] - yp[1]
yp[-1] = 2.0 * dydx[-1] - yp[-2]
else:
# At the endpoints - use one-sided differences
yp[0] = dydx[0]
yp[-1] = dydx[-1]
if method.startswith('s'): # secant'):
# In the middle - use the average of the secants
yp[1:-1] = (dydx[:-1] + dydx[1:]) / 2.0
else: # Cardinal or Catmull-Rom method
yp[1:-1] = (y[2:] - y[:-2]) / (x[2:] - x[:-2])
if method.startswith('car'): # cardinal'):
yp = (1 - tension) * yp
slope_fun = dict(par=_parabola_slope, sec=_secant_slope, car=_cardinal_slope,
cat=_catmull_rom_slope)[method[:3]]
yp = slope_fun(x, y, dx, dydx, tension)
if monotone:
# Special case: intervals where y[k] == y[k+1]

Loading…
Cancel
Save