Simplified TKDE class

master
pbrod 8 years ago
parent 01d01144bb
commit 0460dd4fb7

@ -53,52 +53,19 @@ def _invnorm(q):
class _KDE(object): class _KDE(object):
""" Kernel-Density Estimator base class. def __init__(self, data, kernel=None, xmin=None, xmax=None):
self.dataset = data
Parameters
----------
data : (# of dims, # of data)-array
datapoints to estimate from
hs : array-like (optional)
smooting parameter vector/matrix.
(default compute from data using kernel.get_smoothing function)
kernel : kernel function object.
kernel must have get_smoothing method
alpha : real scalar (optional)
sensitivity parameter (default 0 regular KDE)
A good choice might be alpha = 0.5 ( or 1/D)
alpha = 0 Regular KDE (hs is constant)
0 < alpha <= 1 Adaptive KDE (Make hs change)
Members
-------
d : int
number of dimensions
n : int
number of datapoints
Methods
-------
kde.eval_grid_fast(x0, x1,..., xd) : array
evaluate the estimated pdf on meshgrid(x0, x1,..., xd)
kde.eval_grid(x0, x1,..., xd) : array
evaluate the estimated pdf on meshgrid(x0, x1,..., xd)
kde.eval_points(points) : array
evaluate the estimated pdf on a provided set of points
kde(x0, x1,..., xd) : array
same as kde.eval_grid(x0, x1,..., xd)
"""
def __init__(self, data, hs=None, kernel=None, alpha=0.0, xmin=None,
xmax=None, inc=512):
self.dataset = atleast_2d(data)
self.kernel = kernel if kernel else Kernel('gauss')
self.xmin = xmin self.xmin = xmin
self.xmax = xmax self.xmax = xmax
self.hs = hs self.kernel = kernel if kernel else Kernel('gauss')
self.inc = inc
self.alpha = alpha @property
self.initialize() def dataset(self):
return self._dataset
@dataset.setter
def dataset(self, data):
self._dataset = atleast_2d(data)
@property @property
def n(self): def n(self):
@ -122,9 +89,11 @@ class _KDE(object):
@xmin.setter @xmin.setter
def xmin(self, xmin): def xmin(self, xmin):
if xmin is None: if xmin is None:
self._xmin = self.dataset.min(axis=-1) - 2 * self.sigma xmin = self.dataset.min(axis=-1) - 2 * self.sigma
else: self._xmin = self._check_xmin(xmin*np.ones(self.d))
self._xmin = xmin * np.ones(self.d)
def _check_xmin(self, xmin):
return xmin
@property @property
def xmax(self): def xmax(self):
@ -133,83 +102,12 @@ class _KDE(object):
@xmax.setter @xmax.setter
def xmax(self, xmax): def xmax(self, xmax):
if xmax is None: if xmax is None:
self._xmax = self.dataset.max(axis=-1) + 2 * self.sigma xmax = self.dataset.max(axis=-1) + 2 * self.sigma
else:
self._xmax = xmax * np.ones(self.d)
def _replace_negatives_with_default_hs(self, h):
get_default_hs = self.kernel.get_smoothing
ind, = np.where(h <= 0)
for i in ind.tolist():
h[i] = get_default_hs(self.dataset[i])
def _check_hs(self, h):
"""make sure it has the correct dimension and replace negative vals"""
h = np.atleast_1d(h)
if (len(h.shape) == 1) or (self.d == 1):
h = h * np.ones(self.d) if max(h.shape) == 1 else h.reshape(self.d)
self._replace_negatives_with_default_hs(h)
return h
def _invert_hs(self, h):
if (len(h.shape) == 1) or (self.d == 1):
determinant = h.prod()
inv_hs = np.diag(1.0 / h)
else: # fully general smoothing matrix
determinant = linalg.det(h)
_assert(0 < determinant,
'bandwidth matrix h must be positive definit!')
inv_hs = linalg.inv(h)
return inv_hs, determinant
@property self._xmax = self._check_xmax(xmax * np.ones(self.d))
def hs(self):
return self._hs
@hs.setter def _check_xmax(self, xmax):
def hs(self, h): return xmax
if h is None:
h = self.kernel.get_smoothing(self.dataset)
h = self._check_hs(h)
inv_hs, deth = self._invert_hs(h)
self._norm_factor = deth * self.n
self._inv_hs = inv_hs
self._hs = h
@property
def inc(self):
return self._inc
@inc.setter
def inc(self, inc):
if inc is None:
_tau, tau = self.kernel.effective_support()
xyzrange = 8 * self.sigma
L1 = 10
inc = max(48, (L1 * xyzrange / (tau * self.hs)).max())
inc = 2 ** nextpow2(inc)
self._inc = inc
@property
def alpha(self):
return self._alpha
@alpha.setter
def alpha(self, alpha):
self._alpha = alpha
self._lambda = np.ones(self.n)
if alpha > 0:
f = self.eval_points(self.dataset) # pilot estimate
g = np.exp(np.mean(np.log(f)))
self._lambda = (f / g) ** (-alpha)
def initialize(self):
if self.n > 1:
self._initialize()
def _initialize(self):
pass
def get_args(self, xmin=None, xmax=None): def get_args(self, xmin=None, xmax=None):
sxmin = self.xmin sxmin = self.xmin
@ -437,42 +335,35 @@ class TKDE(_KDE):
def __init__(self, data, hs=None, kernel=None, alpha=0.0, def __init__(self, data, hs=None, kernel=None, alpha=0.0,
xmin=None, xmax=None, inc=512, L2=None): xmin=None, xmax=None, inc=512, L2=None):
self.L2 = L2 self.L2 = L2
# self.dataset = data super(TKDE, self).__init__(data, kernel, xmin, xmax)
# self.tkde =
super(TKDE, self).__init__(data, hs, kernel, alpha, xmin, xmax, inc)
# @property
# def dataset(self):
# return self._dataset
#
# @dataset.setter
# def dataset(self, data):
# self._dataset = atleast_2d(data)
# self._tdataset = self._dat2gaus(self._dataset)
def _initialize(self):
self._check_xmin()
tdataset = self._dat2gaus(self.dataset) tdataset = self._dat2gaus(self.dataset)
xmin = self.xmin txmin = np.ravel(self._dat2gaus(np.reshape(self.xmin, (-1, 1))))
if xmin is not None: txmax = np.ravel(self._dat2gaus(np.reshape(self.xmax, (-1, 1))))
xmin = self._dat2gaus(np.reshape(xmin, (-1, 1))) self.tkde = KDE(tdataset, hs, self.kernel, alpha, txmin, txmax, inc)
xmax = self.xmax
if xmax is not None: def _check_xmin(self, xmin):
xmax = self._dat2gaus(np.reshape(xmax, (-1, 1)))
self.tkde = KDE(tdataset, self.hs, self.kernel, self.alpha,
np.ravel(xmin), np.ravel(xmax),
self.inc)
if self.inc is None:
self.inc = self.tkde.inc
def _check_xmin(self):
if self.L2 is not None: if self.L2 is not None:
amin = self.dataset.min(axis=-1) amin = self.dataset.min(axis=-1)
# default no transformation
L2 = np.atleast_1d(self.L2) * np.ones(self.d) L2 = np.atleast_1d(self.L2) * np.ones(self.d)
self.xmin = np.where(L2 != 1, xmin = np.where(L2 != 1, np.maximum(xmin, amin / 100.0), xmin)
np.maximum(self.xmin, amin / 100.0), return xmin
self.xmin)
@property
def inc(self):
return self.tkde.inc
@inc.setter
def inc(self, inc):
self.tkde.inc = inc
@property
def hs(self):
return self.tkde.hs
@hs.setter
def hs(self, hs):
self.tkde.hs = hs
def _dat2gaus(self, points): def _dat2gaus(self, points):
if self.L2 is None: if self.L2 is None:
@ -713,6 +604,80 @@ class KDE(_KDE):
h1 = plb.plot(x, f) # 1D probability density plot h1 = plb.plot(x, f) # 1D probability density plot
t = np.trapz(f, x) t = np.trapz(f, x)
""" """
def __init__(self, data, hs=None, kernel=None, alpha=0.0, xmin=None,
xmax=None, inc=512):
super(KDE, self).__init__(data, kernel, xmin, xmax)
self.hs = hs
self.inc = inc
self.alpha = alpha
def _replace_negatives_with_default_hs(self, h):
get_default_hs = self.kernel.get_smoothing
ind, = np.where(h <= 0)
for i in ind.tolist():
h[i] = get_default_hs(self.dataset[i])
def _check_hs(self, h):
"""make sure it has the correct dimension and replace negative vals"""
h = np.atleast_1d(h)
if (len(h.shape) == 1) or (self.d == 1):
h = h * np.ones(self.d) if max(h.shape) == 1 else h.reshape(self.d)
self._replace_negatives_with_default_hs(h)
return h
def _invert_hs(self, h):
if (len(h.shape) == 1) or (self.d == 1):
determinant = h.prod()
inv_hs = np.diag(1.0 / h)
else: # fully general smoothing matrix
determinant = linalg.det(h)
_assert(0 < determinant,
'bandwidth matrix h must be positive definit!')
inv_hs = linalg.inv(h)
return inv_hs, determinant
@property
def hs(self):
return self._hs
@hs.setter
def hs(self, h):
if h is None:
h = self.kernel.get_smoothing(self.dataset)
h = self._check_hs(h)
self._inv_hs, deth = self._invert_hs(h)
self._norm_factor = deth * self.n
self._hs = h
@property
def inc(self):
return self._inc
@inc.setter
def inc(self, inc):
if inc is None:
_tau, tau = self.kernel.effective_support()
xyzrange = 8 * self.sigma
L1 = 10
inc = max(48, (L1 * xyzrange / (tau * self.hs)).max())
inc = 2 ** nextpow2(inc)
self._inc = inc
@property
def alpha(self):
return self._alpha
@alpha.setter
def alpha(self, alpha):
self._alpha = alpha
self._lambda = np.ones(self.n)
if alpha > 0:
f = self.eval_points(self.dataset) # pilot estimate
g = np.exp(np.mean(np.log(f)))
self._lambda = (f / g) ** (-alpha)
@staticmethod @staticmethod
def _make_flat_grid(dx, d, inc): def _make_flat_grid(dx, d, inc):
Xn = [] Xn = []
@ -980,7 +945,6 @@ class BKRegression(object):
def _set_smoothing(self, hs): def _set_smoothing(self, hs):
self.kreg.tkde.hs = hs self.kreg.tkde.hs = hs
self.kreg.tkde.initialize()
x = property(fget=lambda cls: cls.kreg.tkde.dataset.squeeze()) x = property(fget=lambda cls: cls.kreg.tkde.dataset.squeeze())
y = property(fget=lambda cls: cls.kreg.y) y = property(fget=lambda cls: cls.kreg.y)
@ -1697,7 +1661,7 @@ def check_kreg_demo3():
n, symmetric=True, loc1=1.0, scale1=0.6, scale2=1.25) n, symmetric=True, loc1=1.0, scale1=0.6, scale2=1.25)
k0 = k k0 = k
for fun in ['hste', ]: for fun in ['hisj', ]:
hsmax, _hs1, _hs2 = _get_regression_smooting(x, y, fun=fun) hsmax, _hs1, _hs2 = _get_regression_smooting(x, y, fun=fun)
for hi in np.linspace(hsmax * 0.25, hsmax, 9): for hi in np.linspace(hsmax * 0.25, hsmax, 9):
plt.figure(k) plt.figure(k)
@ -1705,7 +1669,7 @@ def check_kreg_demo3():
unused = kreg_demo3(x, y, fun1, hs=hi, fun=fun, plotlog=False) unused = kreg_demo3(x, y, fun1, hs=hi, fun=fun, plotlog=False)
# kreg_demo2(n=n,symmetric=True,fun='hste', plotlog=False) # kreg_demo2(n=n,symmetric=True,fun='hste', plotlog=False)
fig.tile(range(k0, k)) # fig.tile(range(k0, k))
plt.ioff() plt.ioff()
plt.show() plt.show()
@ -1960,13 +1924,13 @@ def regressionbin(x, y, alpha=0.05, color='r', label=''):
if __name__ == '__main__': if __name__ == '__main__':
if True: if False:
test_docstrings(__file__) test_docstrings(__file__)
else: else:
# kde_demo5() # kde_demo5()
check_bkregression() # check_bkregression()
# check_regression_bin() # check_regression_bin()
# check_kreg_demo3() check_kreg_demo3()
# check_kreg_demo4() # check_kreg_demo4()
# kreg_demo1(fast=True) # kreg_demo1(fast=True)

Loading…
Cancel
Save