From 5e6cfbe5ee6b977c9ced3ea7625b0f1d82db86ed Mon Sep 17 00:00:00 2001 From: Per A Brodtkorb Date: Sun, 4 Jun 2017 02:58:19 +0200 Subject: [PATCH] Refactored to reduce cyclomatic complexity --- wafo/interpolate.py | 314 +++++++++++++++++++++++--------------------- 1 file changed, 167 insertions(+), 147 deletions(-) diff --git a/wafo/interpolate.py b/wafo/interpolate.py index 498631b..7265287 100644 --- a/wafo/interpolate.py +++ b/wafo/interpolate.py @@ -6,7 +6,7 @@ import scipy.signal import scipy.sparse as sparse from numpy import ones, zeros, prod, sin, diff, pi, inf, vstack, linspace from scipy.interpolate import BPoly, interp1d - +from scipy.signal import fftconvolve from wafo import polynomial as pl @@ -16,6 +16,13 @@ __all__ = [ 'StinemanInterp', 'CubicHermiteSpline'] +def _check_window_and_order(window_size, order): + if window_size % 2 != 1 or window_size < 1: + raise TypeError("window_size size must be a positive odd number") + if window_size < order + 2: + raise TypeError("window_size is too small for the polynomials order") + + def savitzky_golay(y, window_size, order, deriv=0): """Smooth (and optionally differentiate) data with a Savitzky-Golay filter. The Savitzky-Golay filter removes high frequency noise from data. @@ -76,15 +83,11 @@ def savitzky_golay(y, window_size, order, deriv=0): W.H. Press, S.A. Teukolsky, W.T. Vetterling, B.P. Flannery Cambridge University Press ISBN-13: 9780521880688 """ - try: - window_size = np.abs(np.int(window_size)) - order = np.abs(np.int(order)) - except ValueError: - raise ValueError("window_size and order have to be of type int") - if window_size % 2 != 1 or window_size < 1: - raise TypeError("window_size size must be a positive odd number") - if window_size < order + 2: - raise TypeError("window_size is too small for the polynomials order") + + window_size = np.abs(np.int(window_size)) + order = np.abs(np.int(order)) + + _check_window_and_order(window_size, order) order_range = range(order + 1) half_window = (window_size - 1) // 2 # precompute coefficients @@ -99,6 +102,22 @@ def savitzky_golay(y, window_size, order, deriv=0): return np.convolve(m, y, mode='valid') +def _get_turnpoint(xvals): + turnpoint = 0 + last = len(xvals) + if xvals[0] < xvals[1]: # x is increasing? + compare = lambda a, b: a < b + else: # no, x is decreasing + compare = lambda a, b: a > b + + for i in range(1, last): # yes + if compare(xvals[i], xvals[i - 1]): # search where x starts to fall or rise + turnpoint = i + break + + return turnpoint + + def savitzky_golay_piecewise(xvals, data, kernel=11, order=4): ''' One of the most popular applications of S-G filter, apart from smoothing @@ -132,27 +151,17 @@ def savitzky_golay_piecewise(xvals, data, kernel=11, order=4): h=plt.plot(x, yn, 'r', x, y, 'k', x, yr, 'b.') ''' - turnpoint = 0 - last = len(xvals) - if xvals[1] > xvals[0]: # x is increasing? - for i in range(1, last): # yes - if xvals[i] < xvals[i - 1]: # search where x starts to fall - turnpoint = i - break - else: # no, x is decreasing - for i in range(1, last): # search where it starts to rise - if xvals[i] > xvals[i - 1]: - turnpoint = i - break + turnpoint = _get_turnpoint(xvals) + if turnpoint == 0: # no change in direction of x return savitzky_golay(data, kernel, order) - else: - # smooth the first piece - firstpart = savitzky_golay(data[0:turnpoint], kernel, order) - # recursively smooth the rest - rest = savitzky_golay_piecewise( - xvals[turnpoint:], data[turnpoint:], kernel, order) - return np.concatenate((firstpart, rest)) + + # smooth the first piece + firstpart = savitzky_golay(data[0:turnpoint], kernel, order) + # recursively smooth the rest + rest = savitzky_golay_piecewise( + xvals[turnpoint:], data[turnpoint:], kernel, order) + return np.concatenate((firstpart, rest)) def sgolay2d(z, window_size, order, derivative=None): @@ -236,60 +245,46 @@ def sgolay2d(z, window_size, order, derivative=None): Z = np.zeros((new_shape)) # top band band = z[0, :] - Z[:half_size, half_size:-half_size] = band - \ - np.abs(np.flipud(z[1:half_size + 1, :]) - band) + Z[:half_size, half_size:-half_size] = band - np.abs(np.flipud(z[1:half_size + 1, :]) - band) # bottom band band = z[-1, :] - Z[-half_size:, half_size:-half_size] = band + \ - np.abs(np.flipud(z[-half_size - 1:-1, :]) - band) + Z[-half_size:, half_size:-half_size] = band + np.abs(np.flipud(z[-half_size - 1:-1, :]) - band) # left band band = np.tile(z[:, 0].reshape(-1, 1), [1, half_size]) - Z[half_size:-half_size, :half_size] = band - \ - np.abs(np.fliplr(z[:, 1:half_size + 1]) - band) + Z[half_size:-half_size, :half_size] = band - np.abs(np.fliplr(z[:, 1:half_size + 1]) - band) # right band band = np.tile(z[:, -1].reshape(-1, 1), [1, half_size]) - Z[half_size:-half_size, -half_size:] = band + \ - np.abs(np.fliplr(z[:, -half_size - 1:-1]) - band) + Z[half_size:-half_size, -half_size:] = band + np.abs(np.fliplr(z[:, -half_size - 1:-1]) - band) # central band Z[half_size:-half_size, half_size:-half_size] = z # top left corner band = z[0, 0] Z[:half_size, :half_size] = band - \ - np.abs( - np.flipud(np.fliplr(z[1:half_size + 1, 1:half_size + 1])) - band) + np.abs(np.flipud(np.fliplr(z[1:half_size + 1, 1:half_size + 1])) - band) # bottom right corner band = z[-1, -1] Z[-half_size:, -half_size:] = band + \ - np.abs(np.flipud(np.fliplr(z[-half_size - 1:-1, -half_size - 1:-1])) - - band) + np.abs(np.flipud(np.fliplr(z[-half_size - 1:-1, -half_size - 1:-1])) - band) # top right corner band = Z[half_size, -half_size:] Z[:half_size, -half_size:] = band - \ - np.abs( - np.flipud(Z[half_size + 1:2 * half_size + 1, -half_size:]) - band) + np.abs(np.flipud(Z[half_size + 1:2 * half_size + 1, -half_size:]) - band) # bottom left corner band = Z[-half_size:, half_size].reshape(-1, 1) Z[-half_size:, :half_size] = band - \ - np.abs( - np.fliplr(Z[-half_size:, half_size + 1:2 * half_size + 1]) - band) + np.abs(np.fliplr(Z[-half_size:, half_size + 1:2 * half_size + 1]) - band) # solve system and convolve - if derivative is None: - m = np.linalg.pinv(A)[0].reshape((window_size, -1)) - return scipy.signal.fftconvolve(Z, m, mode='valid') - elif derivative == 'col': - c = np.linalg.pinv(A)[1].reshape((window_size, -1)) - return scipy.signal.fftconvolve(Z, -c, mode='valid') - elif derivative == 'row': - r = np.linalg.pinv(A)[2].reshape((window_size, -1)) - return scipy.signal.fftconvolve(Z, -r, mode='valid') - elif derivative == 'both': - c = np.linalg.pinv(A)[1].reshape((window_size, -1)) - r = np.linalg.pinv(A)[2].reshape((window_size, -1)) - return (scipy.signal.fftconvolve(Z, -r, mode='valid'), - scipy.signal.fftconvolve(Z, -c, mode='valid')) + sgn = {None:1}.get(derivative , -1) + dims = {None: (0,), 'col': (1,), 'row': (2,), 'both':(1, 2)}[derivative] + + res = tuple(fftconvolve(Z, sgn * np.linalg.pinv(A)[i].reshape((window_size, -1)), mode='valid') + for i in dims) + if len(dims)>1: + return res + return res[0] class PPform(object): @@ -545,7 +540,17 @@ class SmoothSpline(PPform): if lin_extrap: self.linear_extrapolate(output=False) - def _compute_coefs(self, xx, yy, p=None, var=1): + @staticmethod + def _check(dx, n, ny): + if n < 2: + raise ValueError('There must be >=2 data points.') + elif (dx <= 0).any(): + raise ValueError('Two consecutive values in x can not be equal.') + elif n != ny: + raise ValueError('x and y must have the same length.') + + @staticmethod + def _spacing(xx, yy, var): x, y, var = np.atleast_1d(xx, yy, var) x = x.ravel() dx = np.diff(x) @@ -555,98 +560,95 @@ class SmoothSpline(PPform): x = x[ind] y = y[..., ind] dx = np.diff(x) + return x, y, dx + + def _poly_coefs(self, y, dx, dydx, n, nd, p, var): + dx1 = 1. / dx + D = sparse.spdiags(var * ones(n), 0, n, n) # The variance + R = self._compute_r(dx, n) + qdq = self._compute_qdq(D, dx1, n) + if p is None or p < 0 or 1 < p: + p = self._estimate_p(qdq, R) + qq = self._compute_qq(p, qdq, R) + u = self._compute_u(qq, p, dydx, n) + dx1.shape = (n - 1, -1) + dx.shape = (n - 1, -1) + zrs = zeros(nd) + if p < 1: + # faster than yi-6*(1-p)*Q*u + Qu = D * diff(vstack([zrs, diff(vstack([zrs, u, zrs]), + axis=0) * dx1, zrs]), axis=0) + ai = (y - (6 * (1 - p) * Qu).T).T + else: + ai = y.reshape(n, -1) + + # The piecewise polynominals are written as + # fi=ai+bi*(x-xi)+ci*(x-xi)^2+di*(x-xi)^3 + # where the derivatives in the knots according to Carl de Boor are: + # ddfi = 6*p*[0;u] = 2*ci; + # dddfi = 2*diff([ci;0])./dx = 6*di; + # dfi = diff(ai)./dx-(ci+di.*dx).*dx = bi; + + ci = np.vstack([zrs, 3 * p * u]) + di = (diff(vstack([ci, zrs]), axis=0) * dx1 / 3) + bi = (diff(ai, axis=0) * dx1 - (ci + di * dx) * dx) + ai = ai[:n - 1, ...] + if nd > 1: + di = di.T + ci = ci.T + ai = ai.T + coefs = vstack([val.ravel() for val in [di, ci, bi, ai] if val.size>0]) + return coefs + def _compute_coefs(self, xx, yy, p=None, var=1): + x, y, dx = self._spacing(xx, yy, var) n = len(x) - # ndy = y.ndim szy = y.shape nd = np.int(prod(szy[:-1])) ny = szy[-1] - if n < 2: - raise ValueError('There must be >=2 data points.') - elif (dx <= 0).any(): - raise ValueError('Two consecutive values in x can not be equal.') - elif n != ny: - raise ValueError('x and y must have the same length.') + self._check(dx, n, ny) dydx = np.diff(y) / dx if (n == 2): # straight line coefs = np.vstack([dydx.ravel(), y[0, :]]) - else: + return coefs, x + coefs = self._poly_coefs(y, dx, dydx, n, nd, p, var) + return coefs, x - dx1 = 1. / dx - D = sparse.spdiags(var * ones(n), 0, n, n) # The variance - - u, p = self._compute_u(p, D, dydx, dx, dx1, n) - dx1.shape = (n - 1, -1) - dx.shape = (n - 1, -1) - zrs = zeros(nd) - if p < 1: - # faster than yi-6*(1-p)*Q*u - Qu = D * diff(vstack([zrs, diff(vstack([zrs, u, zrs]), - axis=0) * dx1, zrs]), axis=0) - ai = (y - (6 * (1 - p) * Qu).T).T - else: - ai = y.reshape(n, -1) - - # The piecewise polynominals are written as - # fi=ai+bi*(x-xi)+ci*(x-xi)^2+di*(x-xi)^3 - # where the derivatives in the knots according to Carl de Boor are: - # ddfi = 6*p*[0;u] = 2*ci; - # dddfi = 2*diff([ci;0])./dx = 6*di; - # dfi = diff(ai)./dx-(ci+di.*dx).*dx = bi; - - ci = np.vstack([zrs, 3 * p * u]) - di = (diff(vstack([ci, zrs]), axis=0) * dx1 / 3) - bi = (diff(ai, axis=0) * dx1 - (ci + di * dx) * dx) - ai = ai[:n - 1, ...] - if nd > 1: - di = di.T - ci = ci.T - ai = ai.T - if not any(di): - if not any(ci): - coefs = vstack([bi.ravel(), ai.ravel()]) - else: - coefs = vstack([ci.ravel(), bi.ravel(), ai.ravel()]) - else: - coefs = vstack( - [di.ravel(), ci.ravel(), bi.ravel(), ai.ravel()]) + @staticmethod + def _compute_qdq(D, dx1, n): + Q = sparse.spdiags( + [dx1[:n - 2], -(dx1[:n - 2] + dx1[1:n - 1]), dx1[1:n - 1]], + [0, -1, -2], n, n - 2) + QDQ = Q.T * D * Q + return QDQ - return coefs, x + @staticmethod + def _compute_r(dx, n): + data = [dx[1:n - 1], 2 * (dx[:n - 2] + dx[1:n - 1]), dx[:n - 2]] + R = sparse.spdiags(data, [-1, 0, 1], n - 2, n - 2) + return R @staticmethod - def _compute_u(p, D, dydx, dx, dx1, n): - if p is None or p != 0: - data = [dx[1:n - 1], 2 * (dx[:n - 2] + dx[1:n - 1]), dx[:n - 2]] - R = sparse.spdiags(data, [-1, 0, 1], n - 2, n - 2) - - if p is None or p < 1: - Q = sparse.spdiags( - [dx1[:n - 2], -(dx1[:n - 2] + dx1[1:n - 1]), dx1[1:n - 1]], - [0, -1, -2], n, n - 2) - QDQ = (Q.T * D * Q) - if p is None or p < 0: - # Estimate p - p = 1. / \ - (1. + QDQ.diagonal().sum() / - (100. * R.diagonal().sum() ** 2)) - - if p == 0: - QQ = 6 * QDQ - else: - QQ = (6 * (1 - p)) * (QDQ) + p * R - else: - QQ = R + def _estimate_p(QDQ, R): + p = 1. / (1. + QDQ.diagonal().sum() / (100. * R.diagonal().sum() ** 2)) + return np.clip(p, 0, 1) + @staticmethod + def _compute_qq(p, QDQ, R): + QQ = (6 * (1 - p)) * (QDQ) + p * R + return QQ + + def _compute_u(self,QQ, p, dydx, n): # Make sure it uses symmetric matrix solver ddydx = diff(dydx, axis=0) # sp.linalg.use_solver(useUmfpack=True) u = 2 * sparse.linalg.spsolve((QQ + QQ.T), ddydx) # @UndefinedVariable - return u.reshape(n - 2, -1), p + return u.reshape(n - 2, -1) def _edge_case(m0, d1): @@ -685,6 +687,38 @@ def pchip_slopes(x, y): return dk +def _parabola_slope(x, y, dx, dydx, *args): + yp = np.zeros(y.shape, np.float_) + yp[1:-1] = (dydx[:-1] * dx[1:] + dydx[1:] * dx[:-1]) / (dx[1:] + dx[:-1]) + yp[0] = 2.0 * dydx[0] - yp[1] + yp[-1] = 2.0 * dydx[-1] - yp[-2] + return yp + + +def _secant_slope(x, y, dx, dydx, *args): + yp = np.zeros(y.shape, np.float_) + # At the endpoints - use one-sided differences + yp[0] = dydx[0] + yp[-1] = dydx[-1] + # In the middle - use the average of the secants + yp[1:-1] = (dydx[:-1] + dydx[1:]) / 2.0 + return yp + + +def _catmull_rom_slope(x, y, dx, dydx, *args): + yp = np.zeros(y.shape, np.float_) + # At the endpoints - use one-sided differences + yp[0] = dydx[0] + yp[-1] = dydx[-1] + yp[1:-1] = (y[2:] - y[:-2]) / (x[2:] - x[:-2]) + return yp + + +def _cardinal_slope(x, y, dx, dydx, tension): + yp = (1-tension) * _catmull_rom_slope(x, y, dx, dydx) + return yp + + def slopes(x, y, method='parabola', tension=0, monotone=False): ''' Return estimated slopes y'(x) @@ -720,29 +754,15 @@ def slopes(x, y, method='parabola', tension=0, monotone=False): ''' x = np.asarray(x, np.float_) y = np.asarray(y, np.float_) - yp = np.zeros(y.shape, np.float_) dx = x[1:] - x[:-1] # Compute the slopes of the secant lines between successive points dydx = (y[1:] - y[:-1]) / dx method = method.lower() - if method.startswith('p'): # parabola'): - yp[1:-1] = (dydx[:-1] * dx[1:] + dydx[1:] * dx[:-1]) / \ - (dx[1:] + dx[:-1]) - yp[0] = 2.0 * dydx[0] - yp[1] - yp[-1] = 2.0 * dydx[-1] - yp[-2] - else: - # At the endpoints - use one-sided differences - yp[0] = dydx[0] - yp[-1] = dydx[-1] - if method.startswith('s'): # secant'): - # In the middle - use the average of the secants - yp[1:-1] = (dydx[:-1] + dydx[1:]) / 2.0 - else: # Cardinal or Catmull-Rom method - yp[1:-1] = (y[2:] - y[:-2]) / (x[2:] - x[:-2]) - if method.startswith('car'): # cardinal'): - yp = (1 - tension) * yp + slope_fun = dict(par=_parabola_slope, sec=_secant_slope, car=_cardinal_slope, + cat=_catmull_rom_slope)[method[:3]] + yp = slope_fun(x, y, dx, dydx, tension) if monotone: # Special case: intervals where y[k] == y[k+1]