You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
pywafo/wafo/integrate_oscillating.py

539 lines
17 KiB
Python

8 years ago
"""
Created on 20. aug. 2015
@author: pab
8 years ago
"""
from __future__ import division
from collections import namedtuple
import warnings
import numdifftools as nd
import numdifftools.nd_algopy as nda
from numdifftools.extrapolation import dea3
from numdifftools.limits import Limit
import numpy as np
from numpy import linalg
from numpy.polynomial.chebyshev import chebval, Chebyshev
from numpy.polynomial import polynomial
from wafo.misc import piecewise, findcross, ecross
_FINFO = np.finfo(float)
EPS = _FINFO.eps
_EPS = EPS
_TINY = _FINFO.tiny
_HUGE = _FINFO.max
def _assert(cond, msg):
if not cond:
raise ValueError(msg)
def _assert_warn(cond, msg):
if not cond:
warnings.warn(msg)
class PolyBasis(object):
@staticmethod
def _derivative(c, m):
return polynomial.polyder(c, m)
@staticmethod
def eval(t, c):
return polynomial.polyval(t, c)
@staticmethod
def _coefficients(k):
c = np.zeros(k + 1)
c[k] = 1
return c
def derivative(self, t, k, n=1):
c = self._coefficients(k)
d_c = self._derivative(c, m=n)
return self.eval(t, d_c)
def __call__(self, t, k):
return t**k
7 years ago
poly_basis = PolyBasis()
class ChebyshevBasis(PolyBasis):
@staticmethod
def _derivative(c, m):
cheb = Chebyshev(c)
dcheb = cheb.deriv(m=m)
return dcheb.coef
@staticmethod
def eval(t, c):
return chebval(t, c)
def __call__(self, t, k):
c = self._coefficients(k)
return self.eval(t, c)
7 years ago
chebyshev_basis = ChebyshevBasis()
def richardson(q_val, k):
# license BSD
# Richardson extrapolation with parameter estimation
c = np.real((q_val[k - 1] - q_val[k - 2]) / (q_val[k] - q_val[k - 1])) - 1.
# The lower bound 0.07 admits the singularity x.^-0.9
c = max(c, 0.07)
return q_val[k] + (q_val[k] - q_val[k - 1]) / c
def evans_webster_weights(omega, g, d_g, x, basis, *args, **kwds):
def psi(t, k):
return d_g(t, *args, **kwds) * basis(t, k)
j_w = 1j * omega
n = len(x)
a_matrix = np.zeros((n, n), dtype=complex)
rhs = np.zeros((n,), dtype=complex)
dbasis = basis.derivative
lim_g = Limit(g)
7 years ago
b_1 = np.exp(j_w * lim_g(1, *args, **kwds))
if np.isnan(b_1):
b_1 = 0.0
7 years ago
a_1 = np.exp(j_w * lim_g(-1, *args, **kwds))
if np.isnan(a_1):
a_1 = 0.0
lim_psi = Limit(psi)
for k in range(n):
rhs[k] = basis(1, k) * b_1 - basis(-1, k) * a_1
a_matrix[k] = (dbasis(x, k, n=1) + j_w * lim_psi(x, k))
solution = linalg.lstsq(a_matrix, rhs)
return solution[0]
def osc_weights(omega, g, d_g, x, basis, a_b, *args, **kwds):
def _g(t):
return g(scale * t + offset, *args, **kwds)
def _d_g(t):
return scale * d_g(scale * t + offset, *args, **kwds)
w = []
for a, b in zip(a_b[::2], a_b[1::2]):
scale = (b - a) / 2
offset = (a + b) / 2
w.append(evans_webster_weights(omega, _g, _d_g, x, basis))
return np.asarray(w).ravel()
class _Integrator(object):
info = namedtuple('info', ['error_estimate', 'n'])
def __init__(self, f, g, dg=None, a=-1, b=1, basis=chebyshev_basis, s=1,
precision=10, endpoints=True, full_output=False):
self.f = f
self.g = g
self.dg = nd.Derivative(g) if dg is None else dg
self.basis = basis
self.a = a
self.b = b
self.s = s
self.endpoints = endpoints
self.precision = precision
self.full_output = full_output
class QuadOsc(_Integrator):
def __init__(self, f, g, dg=None, a=-1, b=1, basis=chebyshev_basis, s=15,
precision=10, endpoints=False, full_output=False, maxiter=17):
self.maxiter = maxiter
super(QuadOsc, self).__init__(f, g, dg=dg, a=a, b=b, basis=basis, s=s,
precision=precision, endpoints=endpoints,
full_output=full_output)
@staticmethod
def _change_interval_to_0_1(f, g, d_g, a, _b):
def f_01(t, *args, **kwds):
7 years ago
den = 1 - t
return f(a + t / den, *args, **kwds) / den ** 2
def g_01(t, *args, **kwds):
return g(a + t / (1 - t), *args, **kwds)
def d_g_01(t, *args, **kwds):
7 years ago
den = 1 - t
return d_g(a + t / den, *args, **kwds) / den ** 2
return f_01, g_01, d_g_01, 0., 1.
@staticmethod
def _change_interval_to_m1_0(f, g, d_g, _a, b):
def f_m10(t, *args, **kwds):
den = 1 + t
return f(b + t / den, *args, **kwds) / den ** 2
def g_m10(t, *args, **kwds):
return g(b + t / (1 + t), *args, **kwds)
def d_g_m10(t, *args, **kwds):
den = 1 + t
return d_g(b + t / den, *args, **kwds) / den ** 2
return f_m10, g_m10, d_g_m10, -1.0, 0.0
@staticmethod
def _change_interval_to_m1_1(f, g, d_g, _a, _b):
def f_m11(t, *args, **kwds):
den = (1 - t**2)
7 years ago
return f(t / den, *args, **kwds) * (1 + t**2) / den ** 2
def g_m11(t, *args, **kwds):
den = (1 - t**2)
return g(t / den, *args, **kwds)
def d_g_m11(t, *args, **kwds):
den = (1 - t**2)
7 years ago
return d_g(t / den, *args, **kwds) * (1 + t**2) / den ** 2
return f_m11, g_m11, d_g_m11, -1., 1.
def _get_functions(self):
a, b = self.a, self.b
reverse = np.real(a) > np.real(b)
if reverse:
a, b = b, a
f, g, dg = self.f, self.g, self.dg
if a == b:
pass
elif np.isinf(a) | np.isinf(b):
# Check real limits
if ~np.isreal(a) | ~np.isreal(b) | np.isnan(a) | np.isnan(b):
raise ValueError('Infinite intervals must be real.')
# Change of variable
if np.isfinite(a) & np.isinf(b):
f, g, dg, a, b = self._change_interval_to_0_1(f, g, dg, a, b)
elif np.isinf(a) & np.isfinite(b):
f, g, dg, a, b = self._change_interval_to_m1_0(f, g, dg, a, b)
else: # -inf to inf
f, g, dg, a, b = self._change_interval_to_m1_1(f, g, dg, a, b)
return f, g, dg, a, b, reverse
def __call__(self, omega, *args, **kwds):
f, g, dg, a, b, reverse = self._get_functions()
val, err = self._quad_osc(f, g, dg, a, b, omega, *args, **kwds)
if reverse:
val = -val
if self.full_output:
return val, err
return val
@staticmethod
def _get_best_estimate(k, q_0, q_1, q_2):
if k >= 5:
q_v = np.hstack((q_0[k], q_1[k], q_2[k]))
q_w = np.hstack((q_0[k - 1], q_1[k - 1], q_2[k - 1]))
elif k >= 3:
q_v = np.hstack((q_0[k], q_1[k]))
q_w = np.hstack((q_0[k - 1], q_1[k - 1]))
else:
q_v = np.atleast_1d(q_0[k])
q_w = q_0[k - 1]
errors = np.atleast_1d(abs(q_v - q_w))
j = np.nanargmin(errors)
return q_v[j], errors[j]
def _extrapolate(self, k, q_0, q_1, q_2):
if k >= 4:
q_1[k] = dea3(q_0[k - 2], q_0[k - 1], q_0[k])[0]
q_2[k] = dea3(q_1[k - 2], q_1[k - 1], q_1[k])[0]
elif k >= 2:
q_1[k] = dea3(q_0[k - 2], q_0[k - 1], q_0[k])[0]
# # Richardson extrapolation
# if k >= 4:
# q_1[k] = richardson(q_0, k)
# q_2[k] = richardson(q_1, k)
# elif k >= 2:
# q_1[k] = richardson(q_0, k)
q, err = self._get_best_estimate(k, q_0, q_1, q_2)
return q, err
def _quad_osc(self, f, g, dg, a, b, omega, *args, **kwds):
if a == b:
q_val = b - a
err = np.abs(b - a)
return q_val, err
abseps = 10**-self.precision
max_iter = self.maxiter
basis = self.basis
if self.endpoints:
x_n = chebyshev_extrema(self.s)
else:
x_n = chebyshev_roots(self.s)
# x_n = tanh_sinh_open_nodes(self.s)
# One interval
hh = (b - a) / 2
x = (a + b) / 2 + hh * x_n # Nodes
dtype = complex
val0 = np.zeros((max_iter, 1), dtype=dtype) # Quadrature
val1 = np.zeros((max_iter, 1), dtype=dtype) # First extrapolation
val2 = np.zeros((max_iter, 1), dtype=dtype) # Second extrapolation
lim_f = Limit(f)
a_b = np.hstack([a, b])
wq = osc_weights(omega, g, dg, x_n, basis, a_b, *args, **kwds)
val0[0] = hh * np.sum(wq * lim_f(x, *args, **kwds))
# Successive bisection of intervals
nq = len(x_n)
n = nq
9 years ago
for k in range(1, max_iter):
n += nq * 2**k
hh = hh / 2
x = np.hstack([x + a, x + b]) / 2
a_b = np.hstack([a_b + a, a_b + b]) / 2
wq = osc_weights(omega, g, dg, x_n, basis, a_b, *args, **kwds)
val0[k] = hh * np.sum(wq * lim_f(x, *args, **kwds))
q_val, err = self._extrapolate(k, val0, val1, val2)
converged = (err <= abseps) | ~np.isfinite(q_val)
if converged:
break
_assert_warn(converged, 'Max number of iterations reached '
'without convergence.')
_assert_warn(np.isfinite(q_val),
'Integral approximation is Infinite or NaN.')
# The error estimate should not be zero
err += 2 * np.finfo(q_val).eps
return q_val, self.info(err, n)
def adaptive_levin_points(m, delta):
m_1 = m - 1
prm = 0.5
while prm * m_1 / delta >= 1:
delta = 2 * delta
k = np.arange(m)
x = piecewise([k < prm * m_1, k == np.ceil(prm * m_1)],
[-1 + k / delta, 0 * k, 1 - (m_1 - k) / delta])
return x
def open_levin_points(m, delta):
7 years ago
return adaptive_levin_points(m + 2, delta)[1:-1]
def chebyshev_extrema(m, delta=None):
k = np.arange(m)
7 years ago
x = np.cos(k * np.pi / (m - 1))
return x
def tanh_sinh_nodes(m, delta=None, tol=_EPS):
7 years ago
tmax = np.arcsinh(np.arctanh(1 - _EPS) * 2 / np.pi)
# tmax = 3.18
7 years ago
m_1 = int(np.floor(-np.log2(tmax / max(m - 1, 1)))) - 1
h = 2.0**-m_1
7 years ago
t = np.arange((m + 1) // 2 + 1) * h
x = np.tanh(np.pi / 2 * np.sinh(t))
k = np.flatnonzero(np.abs(x - 1) <= 10 * tol)
y = x[:k[0] + 1] if len(k) else x
return np.hstack((-y[:0:-1], y))
def tanh_sinh_open_nodes(m, delta=None, tol=_EPS):
7 years ago
return tanh_sinh_nodes(m + 1, delta, tol)[1:-1]
8 years ago
def chebyshev_roots(m, delta=None):
7 years ago
k = np.arange(1, 2 * m, 2) * 0.5
8 years ago
x = np.cos(k * np.pi / m)
return x
class AdaptiveLevin(_Integrator):
8 years ago
"""Return integral for the Levin-type and adaptive Levin-type methods"""
@staticmethod
def _a_levin(omega, f, g, d_g, x, s, basis, *args, **kwds):
def psi(t, k):
return d_g(t, *args, **kwds) * basis(t, k)
j_w = 1j * omega
nu = np.ones((len(x),), dtype=int)
nu[0] = nu[-1] = s
S = np.cumsum(np.hstack((nu, 0)))
S[-1] = 0
n = int(S[-2])
a_matrix = np.zeros((n, n), dtype=complex)
rhs = np.zeros((n,))
dff = Limit(nda.Derivative(f))
d_psi = Limit(nda.Derivative(psi))
dbasis = basis.derivative
for r, t in enumerate(x):
for j in range(S[r - 1], S[r]):
order = ((j - S[r - 1]) % nu[r]) # derivative order
dff.fun.n = order
rhs[j] = dff(t, *args, **kwds)
d_psi.fun.n = order
for k in range(n):
7 years ago
a_matrix[j, k] = (dbasis(t, k, n=order + 1) +
j_w * d_psi(t, k))
7 years ago
k1 = np.flatnonzero(1 - np.isfinite(rhs))
if k1.size > 0: # Remove singularities
warnings.warn('Singularities detected! ')
a_matrix[k1] = 0
rhs[k1] = 0
solution = linalg.lstsq(a_matrix, rhs)
v = basis.eval([-1, 1], solution[0])
lim_g = Limit(g)
g_b = np.exp(j_w * lim_g(1, *args, **kwds))
if np.isnan(g_b):
g_b = 0
g_a = np.exp(j_w * lim_g(-1, *args, **kwds))
if np.isnan(g_a):
g_a = 0
return v[1] * g_b - v[0] * g_a
def _get_integration_limits(self, omega, args, kwds):
a, b = self.a, self.b
M = 30
ab = [a]
scale = (b - a) / 2
n = 30
x = np.linspace(a, b, n + 1)
dg_x = np.asarray([scale * omega * self.dg(xi, *args, **kwds)
for xi in x])
i10 = findcross(dg_x, M)
i1 = findcross(dg_x, 1)
i0 = findcross(dg_x, 0)
im1 = findcross(dg_x, -1)
im10 = findcross(dg_x, -M)
x10 = ecross(x, dg_x, i10, M) if len(i10) else ()
x1 = ecross(x, dg_x, i1, 1) if len(i1) else ()
x0 = ecross(x, dg_x, i0, 0) if len(i0) else ()
xm1 = ecross(x, dg_x, im1, -1) if len(im1) else ()
xm10 = ecross(x, dg_x, im10, -M) if len(im10) else ()
for i in np.unique(np.hstack((x10, x1, x0, xm1, xm10))):
if x[0] < i < x[n]:
ab.append(i)
ab.append(b)
return ab
def __call__(self, omega, *args, **kwds):
ab = self._get_integration_limits(omega, args, kwds)
s = self.s
val = 0
n = 0
err = 0
for ai, bi in zip(ab[:-1], ab[1:]):
vali, infoi = self._QaL(s, ai, bi, omega, *args, **kwds)
val += vali
err += infoi.error_estimate
n += infoi.n
if self.full_output:
info = self.info(err, n)
return val, info
return val
@staticmethod
def _get_num_points(s, prec, betam):
return 1 if s > 1 else int(prec / max(np.log10(betam + 1), 1) + 1)
def _QaL(self, s, a, b, omega, *args, **kwds):
8 years ago
"""if s>1,the integral is computed by Q_s^L"""
scale = (b - a) / 2
offset = (a + b) / 2
prec = self.precision # desired precision
def ff(t, *args, **kwds):
return scale * self.f(scale * t + offset, *args, **kwds)
def gg(t, *args, **kwds):
return self.g(scale * t + offset, *args, **kwds)
def dgg(t, *args, **kwds):
return scale * self.dg(scale * t + offset, *args, **kwds)
dg_a = abs(omega * dgg(-1, *args, **kwds))
dg_b = abs(omega * dgg(1, *args, **kwds))
g_a = abs(omega * gg(-1, *args, **kwds))
g_b = abs(omega * gg(1, *args, **kwds))
delta, alpha = min(dg_a, dg_b), min(g_a, g_b)
betam = delta # * scale
if self.endpoints:
8 years ago
if delta < 10 or alpha <= 10 or s > 1:
points = chebyshev_extrema
else:
points = adaptive_levin_points
8 years ago
elif delta < 10 or alpha <= 10 or s > 1:
points = chebyshev_roots
else:
points = open_levin_points # tanh_sinh_open_nodes
m = self._get_num_points(s, prec, betam)
7 years ago
abseps = 10 * 10.0**-prec
num_collocation_point_list = m * 2**np.arange(1, 5) + 1
basis = self.basis
q_val = 1e+300
num_function_evaluations = 0
n = 0
for num_collocation_points in num_collocation_point_list:
n_old = n
q_old = q_val
x = points(num_collocation_points, betam)
n = len(x)
if n > n_old:
q_val = self._a_levin(omega, ff, gg, dgg, x, s, basis, *args,
**kwds)
num_function_evaluations += n
7 years ago
err = np.abs(q_val - q_old)
if err <= abseps:
break
info = self.info(err, num_function_evaluations)
return q_val, info
class EvansWebster(AdaptiveLevin):
8 years ago
"""Return integral for the Evans Webster method"""
def __init__(self, f, g, dg=None, a=-1, b=1, basis=chebyshev_basis, s=8,
precision=10, endpoints=False, full_output=False):
super(EvansWebster,
self).__init__(f, g, dg=dg, a=a, b=b, basis=basis, s=s,
precision=precision, endpoints=endpoints,
full_output=full_output)
def _a_levin(self, omega, ff, gg, dgg, x, s, basis, *args, **kwds):
w = evans_webster_weights(omega, gg, dgg, x, basis, *args, **kwds)
f = Limit(ff)(x, *args, **kwds)
7 years ago
return np.sum(f * w)
def _get_num_points(self, s, prec, betam):
return 8 if s > 1 else int(prec / max(np.log10(betam + 1), 1) + 1)
if __name__ == '__main__':
tanh_sinh_nodes(16)