You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
5.8 KiB
Python

"""
Created on 10. mai 2014
@author: pab
"""
import numpy as np
from numpy.fft import fft
from wafo.misc import nextpow2
from scipy.signal.windows import get_window
from wafo.containers import PlotData
from wafo.covariance import CovData1D
import warnings
def sampling_period(t_vec):
"""
Returns sampling interval
Returns
-------
dt : scalar
sampling interval, unit:
[s] if lagtype=='t'
[m] otherwise
See also
"""
dt1 = t_vec[1] - t_vec[0]
n = len(t_vec) - 1
t = t_vec[-1] - t_vec[0]
dt = t / n
if abs(dt - dt1) > 1e-10:
warnings.warn('Data is not uniformly sampled!')
return dt
class CovarianceEstimator(object):
"""
Class for estimating AutoCovariance from timeseries
Parameters
----------
lag : scalar, int
maximum time-lag for which the ACF is estimated.
(Default lag where ACF is zero)
tr : transformation object
the transformation assuming that x is a sample of a transformed
Gaussian process. If g is None then x is a sample of a Gaussian
process (Default)
detrend : function
defining detrending performed on the signal before estimation.
(default detrend_mean)
window : vector of length NFFT or function
To create window vectors see numpy.blackman, numpy.hamming,
numpy.bartlett, scipy.signal, scipy.signal.get_window etc.
flag : string, 'biased' or 'unbiased'
If 'unbiased' scales the raw correlation by 1/(n-abs(k)),
where k is the index into the result, otherwise scales the raw
cross-correlation by 1/n. (default)
norm : bool
True if normalize output to one
dt : scalar
time-step between data points (default see sampling_period).
"""
def __init__(self, lag=None, tr=None, detrend=None, window='boxcar',
flag='biased', norm=False, dt=None):
self.lag = lag
self.tr = tr
self.detrend = detrend
self.window = window
self.flag = flag
self.norm = norm
self.dt = dt
def _estimate_lag(self, R, Ncens):
Lmax = min(300, len(R) - 1) # maximum lag if L is undetermined
# finding where ACF is less than 2 st. deviations.
sigma = np.sqrt(np.r_[0, R[0] ** 2,
R[0] ** 2 + 2 * np.cumsum(R[1:] ** 2)] / Ncens)
lag = Lmax + 2 - (np.abs(R[Lmax::-1]) > 2 * sigma[Lmax::-1]).argmax()
if self.window == 'parzen':
lag = int(4 * lag / 3)
# print('The default L is set to %d' % L)
return lag
def tocovdata(self, timeseries):
"""
Return auto covariance function from data.
Return
-------
acf : CovData1D object
with attributes:
data : ACF vector length L+1
args : time lags length L+1
sigma : estimated large lag standard deviation of the estimate
assuming x is a Gaussian process:
if acf[k]=0 for all lags k>q then an approximation
of the variance for large samples due to Bartlett
var(acf[k])=1/N*(acf[0]**2+2*acf[1]**2+2*acf[2]**2+ ..+2*acf[q]**2)
for k>q and where N=length(x). Special case is
white noise where it equals acf[0]**2/N for k>0
norm : bool
If false indicating that auto_cov is not normalized
Example:
--------
>>> import wafo.data
>>> import wafo.objects as wo
>>> x = wafo.data.sea()
>>> ts = wo.mat2timeseries(x)
>>> acf = ts.tocovdata(150)
h = acf.plot()
"""
lag = self.lag
window = self.window
detrend = self.detrend
try:
x = timeseries.data.flatten('F')
dt = timeseries.sampling_period()
except Exception:
x = timeseries[:, 1:].flatten('F')
dt = sampling_period(timeseries[:, 0])
if self.dt is not None:
dt = self.dt
if self.tr is not None:
x = self.tr.dat2gauss(x)
n = len(x)
indnan = np.isnan(x)
if any(indnan):
x = x - x[1 - indnan].mean()
Ncens = n - indnan.sum()
x[indnan] = 0.
else:
Ncens = n
x = x - x.mean()
if hasattr(detrend, '__call__'):
x = detrend(x)
nfft = 2 ** nextpow2(n)
raw_periodogram = abs(fft(x, nfft)) ** 2 / Ncens
auto_cov = np.real(fft(raw_periodogram)) / nfft # ifft = fft/nfft since raw_periodogram is real!
if self.flag.startswith('unbiased'):
# unbiased result, i.e. divide by n-abs(lag)
auto_cov = auto_cov[:Ncens] * Ncens / np.arange(Ncens, 1, -1)
if self.norm:
auto_cov = auto_cov / auto_cov[0]
if lag is None:
lag = self._estimate_lag(auto_cov, Ncens)
lag = min(lag, n - 2)
if isinstance(window, str) or type(window) is tuple:
win = get_window(window, 2 * lag - 1)
else:
win = np.asarray(window)
auto_cov[:lag] = auto_cov[:lag] * win[lag - 1::]
auto_cov[lag] = 0
lags = slice(0, lag + 1)
t = np.linspace(0, lag * dt, lag + 1)
acf = CovData1D(auto_cov[lags], t)
acf.sigma = np.sqrt(np.r_[0, auto_cov[0] ** 2,
auto_cov[0] ** 2 + 2 * np.cumsum(auto_cov[1:] ** 2)] / Ncens)
acf.children = [PlotData(-2. * acf.sigma[lags], t),
PlotData(2. * acf.sigma[lags], t)]
acf.plot_args_children = ['r:']
acf.norm = self.norm
return acf
__call__ = tocovdata