fixed a bug in Kernel.name and simplified glevels and made it general.

master
Per A Brodtkorb 8 years ago
parent 40bfa22d46
commit 669fadc9a5

@ -22,6 +22,12 @@ def _assert(cond, msg):
if not cond: if not cond:
raise ValueError(msg) raise ValueError(msg)
def _assert_warn(cond, msg):
if not cond:
warnings.warn(msg)
# stats = (mu2, R, Rdd) where # stats = (mu2, R, Rdd) where
# mu2 : 2'nd order moment, i.e.,int(x^2*kernel(x)) # mu2 : 2'nd order moment, i.e.,int(x^2*kernel(x))
# R : integral of squared kernel, i.e., int(kernel(x)^2) # R : integral of squared kernel, i.e., int(kernel(x)^2)
@ -36,31 +42,39 @@ _stats_logi = (pi ** 2 / 3, 1. / 6, 1 / 42)
_stats_gaus = (1, 1. / (2 * sqrt(pi)), 3. / (8 * sqrt(pi))) _stats_gaus = (1, 1. / (2 * sqrt(pi)), 3. / (8 * sqrt(pi)))
def qlevels(pdf, p=(10, 30, 50, 70, 90, 95, 99, 99.9), x1=None, x2=None): def qlevels(pdf, p=(10, 30, 50, 70, 90, 95, 99, 99.9), xi=(), indexing='xy'):
"""QLEVELS Calculates quantile levels which encloses P% of PDF. """QLEVELS Calculates quantile levels which encloses P% of pdf.
CALL: [ql PL] = qlevels(pdf,PL,x1,x2); Parameters
----------
pdf: array-like
joint point density function given as array or vector
p : float in range of [0,100] (or sequence of floats)
Percentage to compute which must be between 0 and 100 inclusive.
xi : tuple
input arguments to the pdf, i.e., (x0, x1,...., xn)
indexing : {'xy', 'ij'}, optional
Cartesian ('xy', default) or matrix ('ij') indexing of pdf.
See numpy.meshgrid for more details.
ql = the discrete quantile levels. Returns
pdf = joint point density function matrix or vector ------
PL = percent level (default [10:20:90 95 99 99.9]) levels: array-like
x1,x2 = vectors of the spacing of the variables discrete levels which encloses P% of pdf
(Default unit spacing)
QLEVELS numerically integrates PDF by decreasing height and find the QLEVELS numerically integrates PDF by decreasing height and find the
quantile levels which encloses P% of the distribution. If X1 and quantile levels which encloses P% of the distribution.
(or) X2 is unspecified it is assumed that dX1 and dX2 is constant.
NB! QLEVELS normalizes the integral of PDF to N/(N+0.001) before If Xi is unspecified it is assumed that dX0, dX1,..., and dXn is constant.
calculating QL in order to reflect the sampling of PDF is finite. NB! QLEVELS normalizes the integral of PDF to n/(n+0.001) before
Currently only able to handle 1D and 2D PDF's if dXi is not constant calculating 'levels' in order to reflect the sampling of PDF is finite.
(i=1,2).
Example Example
------- -------
>>> import wafo.stats as ws >>> import wafo.stats as ws
>>> x = np.linspace(-8,8,2001); >>> x = np.linspace(-8,8,2001);
>>> PL = np.r_[10:90:20, 90, 95, 99, 99.9] >>> PL = np.r_[10:90:20, 90, 95, 99, 99.9]
>>> qlevels(ws.norm.pdf(x),p=PL, x1=x); >>> qlevels(ws.norm.pdf(x),p=PL, xi=(x,));
array([ 0.39591707, 0.37058719, 0.31830968, 0.23402133, 0.10362052, array([ 0.39591707, 0.37058719, 0.31830968, 0.23402133, 0.10362052,
0.05862129, 0.01449505, 0.00178806]) 0.05862129, 0.01449505, 0.00178806])
@ -74,73 +88,58 @@ def qlevels(pdf, p=(10, 30, 50, 70, 90, 95, 99, 99.9), x1=None, x2=None):
qlevels2, tranproc qlevels2, tranproc
""" """
def _dx(x):
dx = np.diff(x.ravel()) * 0.5
return np.r_[0, dx] + np.r_[dx, 0]
def _init(pdf, xi, indexing):
if not xi:
return pdf.ravel()
if not isinstance(xi, tuple):
xi = (xi,)
dx = np.meshgrid(*[_dx(x) for x in xi], sparse=True, indexing=indexing)
dxij = np.ones((1))
for dxi in dx:
dxij = dxij * dxi
_assert(dxij.shape == pdf.shape,
'Shape of pdf does not match the arguments')
return (pdf * dxij).ravel()
def _check_levels(levels, pdf):
_assert_warn(not np.any(levels >= max(pdf.ravel())),
'The lowest percent level is too close to 0%')
_assert_warn(not np.any(levels <= min(pdf.ravel())),
'The given pdf is too sparsely sampled or the highest '
'percent level is too close to 100%')
pdf, p = np.atleast_1d(pdf, p)
_assert(not any(pdf.ravel() < 0),
'This is not a pdf since one or more values of pdf is negative')
_assert(not np.any((p < 0) | (100 < p)), 'PL must satisfy 0 <= PL <= 100')
norm = 1 # normalize cdf to unity if min(pdf.shape) == 0:
pdf = np.atleast_1d(pdf)
_assert(not any(pdf.ravel() < 0), 'This is not a pdf since one or more '
'values of pdf is negative')
fsiz = pdf.shape
fsizmin = min(fsiz)
if fsizmin == 0:
return [] return []
N = np.prod(fsiz)
d = len(fsiz)
if x1 is None or ((x2 is None) and d > 2):
fdfi = pdf.ravel()
else:
if d == 1: # pdf in one dimension
dx22 = np.ones(1)
else: # % pdf in two dimensions
dx2 = np.diff(x2.ravel()) * 0.5
dx22 = np.r_[0, dx2] + np.r_[dx2, 0]
dx1 = np.diff(x1.ravel()) * 0.5
dx11 = np.r_[0, dx1] + np.r_[dx1, 0]
dx1x2 = dx22[:, None] * dx11
fdfi = (pdf * dx1x2).ravel()
p = np.atleast_1d(p)
_assert(not np.any((p < 0) | (100 < p)), 'PL must satisfy 0 <= PL <= 100')
p2 = p / 100.0
ind = np.argsort(pdf.ravel()) # sort by height of pdf ind = np.argsort(pdf.ravel()) # sort by height of pdf
ind = ind[::-1] ind = ind[::-1]
fi = pdf.flat[ind] sorted_pdf = pdf.flat[ind]
pdf_dx = _init(pdf, xi, indexing=indexing)
# integration in the order of decreasing height of pdf # integration in the order of decreasing height of pdf
Fi = np.cumsum(fdfi[ind]) cdf = np.cumsum(pdf_dx[ind])
n = pdf_dx.size
# normalize cdf to make sure int pdf dx1 dx2 approx 1
cdf = cdf / cdf[-1] * n / (n + 1.5e-8)
if norm: # normalize Fi to make sure int pdf dx1 dx2 approx 1 # make sure cdf is strictly increasing by not considering duplicate values
Fi = Fi / Fi[-1] * N / (N + 1.5e-8) ind, = np.where(np.diff(np.r_[cdf, 1]) > 0)
maxFi = np.max(Fi) # calculating the inverse of cdf to find the levels
if maxFi > 1: levels = tranproc(cdf[ind], sorted_pdf[ind], p / 100.0)
warnings.warn('this is not a pdf since cdf>1! normalizing')
Fi = Fi / Fi[-1] * N / (N + 1.5e-8) _check_levels(levels, pdf)
levels[levels < 0] = 0.0
elif maxFi < .95: return levels
msg = '''The given pdf is too sparsely sampled since cdf<.95.
Thus QL is questionable'''
warnings.warn(msg)
# make sure Fi is strictly increasing by not considering duplicate values
ind, = np.where(np.diff(np.r_[Fi, 1]) > 0)
# calculating the inverse of Fi to find the index
ui = tranproc(Fi[ind], fi[ind], p2)
if np.any(ui >= max(pdf.ravel())):
warnings.warn('The lowest percent level is too close to 0%')
if np.any(ui <= min(pdf.ravel())):
msg = '''The given pdf is too sparsely sampled or
the highest percent level is too close to 100%'''
warnings.warn(msg)
ui[ui < 0] = 0.0
return ui
def qlevels2(data, p=(10, 30, 50, 70, 90, 95, 99, 99.9), method=1): def qlevels2(data, p=(10, 30, 50, 70, 90, 95, 99, 99.9), method=1):
@ -262,9 +261,16 @@ def sphere_volume(d, r=1.0):
class _Kernel(object): class _Kernel(object):
__metaclass__ = ABCMeta __metaclass__ = ABCMeta
def __init__(self, r=1.0, stats=None): def __init__(self, r=1.0, stats=None, name=''):
self.r = r # radius of kernel self.r = r # radius of effective support of kernel
self.stats = stats self.stats = stats
if not name:
name = self.__class__.__name__.replace('_Kernel', '')
self._name = name
@property
def name(self):
return self._name
def norm_factor(self, d=1, n=None): def norm_factor(self, d=1, n=None):
_assert(0 < d, "D") _assert(0 < d, "D")
@ -308,9 +314,9 @@ class _KernelMulti(_Kernel):
p=3; Multivariate Tri-weight Kernel p=3; Multivariate Tri-weight Kernel
p=4; Multivariate Four-weight Kernel p=4; Multivariate Four-weight Kernel
""" """
def __init__(self, r=1.0, p=1, stats=None): def __init__(self, r=1.0, p=1, stats=None, name=''):
self.p = p self.p = p
super(_KernelMulti, self).__init__(r, stats) super(_KernelMulti, self).__init__(r, stats, name)
def norm_factor(self, d=1, n=None): def norm_factor(self, d=1, n=None):
r = self.r r = self.r
@ -325,9 +331,10 @@ class _KernelMulti(_Kernel):
x2 = x ** 2 x2 = x ** 2
return ((1.0 - x2.sum(axis=0) / r ** 2).clip(min=0.0)) ** p return ((1.0 - x2.sum(axis=0) / r ** 2).clip(min=0.0)) ** p
mkernel_epanechnikov = _KernelMulti(p=1, stats=_stats_epan) mkernel_epanechnikov = _KernelMulti(p=1, stats=_stats_epan,
mkernel_biweight = _KernelMulti(p=2, stats=_stats_biwe) name='epanechnikov')
mkernel_triweight = _KernelMulti(p=3, stats=_stats_triw) mkernel_biweight = _KernelMulti(p=2, stats=_stats_biwe, name='biweight')
mkernel_triweight = _KernelMulti(p=3, stats=_stats_triw, name='triweight')
class _KernelProduct(_KernelMulti): class _KernelProduct(_KernelMulti):
@ -350,9 +357,11 @@ class _KernelProduct(_KernelMulti):
pdf = (1 - (x / r) ** 2).clip(min=0.0) ** self.p pdf = (1 - (x / r) ** 2).clip(min=0.0) ** self.p
return pdf.prod(axis=0) return pdf.prod(axis=0)
mkernel_p1epanechnikov = _KernelProduct(p=1, stats=_stats_epan) mkernel_p1epanechnikov = _KernelProduct(p=1, stats=_stats_epan,
mkernel_p1biweight = _KernelProduct(p=2, stats=_stats_biwe) name='p1epanechnikov')
mkernel_p1triweight = _KernelProduct(p=3, stats=_stats_triw) mkernel_p1biweight = _KernelProduct(p=2, stats=_stats_biwe, name='p1biweight')
mkernel_p1triweight = _KernelProduct(p=3, stats=_stats_triw,
name='p1triweight')
class _KernelRectangular(_Kernel): class _KernelRectangular(_Kernel):
@ -404,11 +413,6 @@ class _KernelGaussian(_Kernel):
mkernel_gaussian = _KernelGaussian(r=4.0, stats=_stats_gaus) mkernel_gaussian = _KernelGaussian(r=4.0, stats=_stats_gaus)
# def mkernel_gaussian(X):
# x2 = X ** 2
# d = X.shape[0]
# return (2 * pi) ** (-d / 2) * exp(-0.5 * x2.sum(axis=0))
class _KernelLaplace(_Kernel): class _KernelLaplace(_Kernel):
@ -439,8 +443,8 @@ _MKERNEL_DICT = dict(
tria=mkernel_triangular, tria=mkernel_triangular,
lapl=mkernel_laplace, lapl=mkernel_laplace,
logi=mkernel_logistic, logi=mkernel_logistic,
gaus=mkernel_gaussian gaus=mkernel_gaussian)
)
_KERNEL_EXPONENT_DICT = dict( _KERNEL_EXPONENT_DICT = dict(
re=0, sp=0, ep=1, bi=2, tr=3, fo=4, fi=5, si=6, se=7) re=0, sp=0, ep=1, bi=2, tr=3, fo=4, fi=5, si=6, se=7)
@ -530,7 +534,7 @@ class Kernel(object):
@property @property
def name(self): def name(self):
return self.kernel.__class__.__name__.replace('_Kernel', '').title() return self.kernel.name
def stats(self): def stats(self):
"""Return some 1D statistics of the kernel. """Return some 1D statistics of the kernel.
@ -586,8 +590,13 @@ class Kernel(object):
visual check by eye. visual check by eye.
Example: Example:
data = rndnorm(0, 1,20,1) -------
h = hns(data,'epan') >>> import numpy as np
>>> import wafo.kdetools as wk
>>> import wafo.stats as ws
>>> kernel = wk.Kernel('epan')
>>> data = ws.norm.rvs(0, 1, size=(1,20))
>>> h = kernel.hns(data)
See also: See also:
--------- ---------
@ -601,7 +610,6 @@ class Kernel(object):
Wand,M.P. and Jones, M.C. (1995) Wand,M.P. and Jones, M.C. (1995)
'Kernel smoothing' 'Kernel smoothing'
Chapman and Hall, pp 60--63 Chapman and Hall, pp 60--63
""" """
a = np.atleast_2d(data) a = np.atleast_2d(data)
@ -611,13 +619,13 @@ class Kernel(object):
mu2, R, _Rdd = self.stats() mu2, R, _Rdd = self.stats()
amise_constant = (8 * sqrt(pi) * R / (3 * mu2 ** 2 * n)) ** (1. / 5) amise_constant = (8 * sqrt(pi) * R / (3 * mu2 ** 2 * n)) ** (1. / 5)
iqr = iqrange(a, axis=1) # interquartile range iqr = iqrange(a, axis=1) # interquartile range
stdA = np.std(a, axis=1, ddof=1) std_a = np.std(a, axis=1, ddof=1)
# use of interquartile range guards against outliers. # use of interquartile range guards against outliers.
# the use of interquartile range is better if # the use of interquartile range is better if
# the distribution is skew or have heavy tails # the distribution is skew or have heavy tails
# This lessen the chance of oversmoothing. # This lessen the chance of oversmoothing.
return np.where(iqr > 0, return np.where(iqr > 0,
np.minimum(stdA, iqr / 1.349), stdA) * amise_constant np.minimum(std_a, iqr / 1.349), std_a) * amise_constant
def hos(self, data): def hos(self, data):
"""Returns Oversmoothing Parameter. """Returns Oversmoothing Parameter.
@ -680,7 +688,8 @@ class Kernel(object):
elif name == 'gaus': # Gaussian kernel elif name == 'gaus': # Gaussian kernel
a = (4.0 / (d + 2.0)) ** (1. / (d + 4.0)) a = (4.0 / (d + 2.0)) ** (1. / (d + 4.0))
else: else:
raise ValueError('Unknown kernel.') raise NotImplementedError('Hmns bandwidth not implemented for '
'kernel {}.'.format(name))
return a return a
def hmns(self, data): def hmns(self, data):
@ -696,7 +705,6 @@ class Kernel(object):
'triweight' - Tri-weight kernel. 'triweight' - Tri-weight kernel.
'gaussian' - Gaussian kernel 'gaussian' - Gaussian kernel
Note that only the first 4 letters of the kernel name is needed.
HMNS only gives a optimal value with respect to mean integrated HMNS only gives a optimal value with respect to mean integrated
square error, when the true underlying distribution is Multivariate square error, when the true underlying distribution is Multivariate
@ -725,7 +733,6 @@ class Kernel(object):
Chapman and Hall, pp 60--63, 86--88 Chapman and Hall, pp 60--63, 86--88
""" """
# TODO: implement more kernels
a = np.atleast_2d(data) a = np.atleast_2d(data)
d, n = a.shape d, n = a.shape
@ -1081,35 +1088,53 @@ class Kernel(object):
warnings.warn('The obtained value did not converge.') warnings.warn('The obtained value did not converge.')
h[dim] = h1 * s h[dim] = h1 * s
# end % for dim loop # end # for dim loop
return h return h
def hscv(self, data, hvec=None, inc=128, maxit=100, fulloutput=False): def hscv(self, data, hvec=None, inc=128, maxit=100, fulloutput=False):
''' '''
HSCV Smoothed cross-validation estimate of smoothing parameter. HSCV Smoothed cross-validation estimate of smoothing parameter.
CALL: [hs,hvec,score] = hscv(data,kernel,hvec)
hs = smoothing parameter Parameters
----------
data = data vector
hvec = vector defining possible values of hs hvec = vector defining possible values of hs
(default linspace(0.25*h0,h0,100), h0=0.62) (default linspace(0.25*h0,h0,100), h0=0.62)
inc = length of estimated kerneldensity estimate
maxit = maximum number of iterations
fulloutput = True if fulloutput is wanted
Returns
-------
hs = smoothing parameter
hvec = vector defining possible values of hs
score = score vector score = score vector
data = data vector
kernel = 'gaussian' - Gaussian kernel the only supported
Note that only the first 4 letters of the kernel name is needed. Example
------
>>> import wafo.kdetools as wk
>>> import wafo.stats as ws
>>> data = ws.norm.rvs(0,1, size=(1,20))
>>> kernel = wk.Kernel('epan')
>>> hs0 = kernel.hscv(data, fulloutput=False)
>>> hs, hvec, score = kernel.hscv(data, fulloutput=True)
>>> np.allclose(hs, hs0)
True
Example: import matplotlib.pyplot as plt
data = rndnorm(0,1,20,1) plt.plot(hvec,score)
[hs hvec score] = hscv(data,'epan');
plot(hvec,score)
See also hste, hbcv, hboot, hos, hldpi, hlscv, hstt, kde, kdefun
See also:
hste, hbcv, hboot, hos, hldpi, hlscv, hstt, kde, kdefun
Reference
---------
Wand,M.P. and Jones, M.C. (1986) Wand,M.P. and Jones, M.C. (1986)
'Kernel smoothing' 'Kernel smoothing'
Chapman and Hall, pp 75--79 Chapman and Hall, pp 75--79
''' '''
# TODO: Add support for other kernels than Gaussian
A = np.atleast_2d(data) A = np.atleast_2d(data)
d, n = A.shape d, n = A.shape
@ -1209,17 +1234,16 @@ class Kernel(object):
idx = score.argmin() idx = score.argmin()
# Kernel other than Gaussian scale bandwidth # Kernel other than Gaussian scale bandwidth
h[dim] = hvec[idx] * (ste_constant / ste_constant2) ** (1 / 5) h[dim] = hvec[idx] * (ste_constant / ste_constant2) ** (1 / 5)
if idx == 0: _assert_warn(0 < idx,
warnings.warn("Optimum is probably lower than " "Optimum is probably lower than "
"hs={0:g} for dim={1:d}".format(h[dim] * s, dim))
_assert_warn(idx < maxit - 1,
"Optimum is probably higher than "
"hs={0:g} for dim={1:d}".format(h[dim] * s, dim)) "hs={0:g} for dim={1:d}".format(h[dim] * s, dim))
elif idx == maxit - 1:
msg = "Optimum is probably higher than hs={0:g] for dim={1:d}"
warnings.warn(msg.format(h[dim] * s, dim))
hvec = hvec * (ste_constant / ste_constant2) ** (1 / 5) hvec = hvec * (ste_constant / ste_constant2) ** (1 / 5)
if fulloutput: if fulloutput:
return h * sigmaA, score, hvec, sigmaA return h * sigmaA, score, hvec
else:
return h * sigmaA return h * sigmaA
def hldpi(self, data, L=2, inc=128): def hldpi(self, data, L=2, inc=128):
@ -1361,7 +1385,7 @@ def mkernel(X, kernel):
See also See also
-------- --------
kde, kdefun, kdebin KDE
References References
---------- ----------

@ -1131,7 +1131,7 @@ class SpecData1D(PlotData):
title='Joint density of maximum and minimum') title='Joint density of maximum and minimum')
try: try:
pl = [10, 30, 50, 70, 90, 95, 99, 99.9] pl = [10, 30, 50, 70, 90, 95, 99, 99.9]
mmpdf.cl = qlevels(uvdens, pl, h, h) mmpdf.cl = qlevels(uvdens, pl, xi=(h, h))
mmpdf.pl = pl mmpdf.pl = pl
except: except:
pass pass

@ -449,6 +449,27 @@ class TestSmoothing(unittest.TestCase):
assert_allclose(hs, [[3.25196193e-01, -2.68892467e-02, 3.18932448e-04], assert_allclose(hs, [[3.25196193e-01, -2.68892467e-02, 3.18932448e-04],
[-2.68892467e-02, 3.91283306e-01, 2.38654678e-02], [-2.68892467e-02, 3.91283306e-01, 2.38654678e-02],
[3.18932448e-04, 2.38654678e-02, 4.05123874e-01]]) [3.18932448e-04, 2.38654678e-02, 4.05123874e-01]])
hs = self.gauss.hmns(self.data[0])
assert_allclose(hs, self.gauss.hns(self.data[0]))
hs = wk.Kernel('epan').hmns(self.data)
assert_allclose(hs,
[[8.363847e-01, -6.915749e-02, 8.202747e-04],
[-6.915749e-02, 1.006357e+00, 6.138052e-02],
[8.202747e-04, 6.138052e-02, 1.041954e+00]],
rtol=1e-5)
hs = wk.Kernel('biwe').hmns(self.data[:2])
assert_allclose(hs, [[0.868428, -0.071705],
[-0.071705, 1.04685]], rtol=1e-5)
hs = wk.Kernel('triwe').hmns(self.data[:2])
assert_allclose(hs, [[0.975375, -0.080535],
[-0.080535, 1.17577]], rtol=1e-5)
self.assertRaises(NotImplementedError,
wk.Kernel('biwe').hmns, self.data)
self.assertRaises(NotImplementedError,
wk.Kernel('triwe').hmns, self.data)
self.assertRaises(NotImplementedError,
wk.Kernel('triangular').hmns, self.data)
def test_hscv(self): def test_hscv(self):
hs = self.gauss.hscv(self.data) hs = self.gauss.hscv(self.data)

Loading…
Cancel
Save