From 9a2b9f6921befc0d3392452b5b8896db330fc91e Mon Sep 17 00:00:00 2001 From: "Per.Andreas.Brodtkorb" Date: Mon, 29 Sep 2014 20:13:00 +0000 Subject: [PATCH] added new files --- pywafo/src/wafo/covariance/estimation.py | 174 +++++++ .../wafo/covariance/test/test_covariance.py | 17 + pywafo/src/wafo/transform/estimation.py | 448 ++++++++++++++++++ pywafo/src/wafo/win32_utils.py | 258 ++++++++++ 4 files changed, 897 insertions(+) create mode 100644 pywafo/src/wafo/covariance/estimation.py create mode 100644 pywafo/src/wafo/covariance/test/test_covariance.py create mode 100644 pywafo/src/wafo/transform/estimation.py create mode 100644 pywafo/src/wafo/win32_utils.py diff --git a/pywafo/src/wafo/covariance/estimation.py b/pywafo/src/wafo/covariance/estimation.py new file mode 100644 index 0000000..4a2c87c --- /dev/null +++ b/pywafo/src/wafo/covariance/estimation.py @@ -0,0 +1,174 @@ +''' +Created on 10. mai 2014 + +@author: pab +''' +import numpy as np +from numpy.fft import fft +from wafo.misc import nextpow2 +from scipy.signal.windows import get_window +from wafo.containers import PlotData +from wafo.covariance import CovData1D +import warnings + + +def sampling_period(t_vec): + ''' + Returns sampling interval + + Returns + ------- + dt : scalar + sampling interval, unit: + [s] if lagtype=='t' + [m] otherwise + + See also + ''' + dt1 = t_vec[1] - t_vec[0] + n = len(t_vec) - 1 + t = t_vec[-1] - t_vec[0] + dt = t / n + if abs(dt - dt1) > 1e-10: + warnings.warn('Data is not uniformly sampled!') + return dt + + +class CovarianceEstimatior(object): + ''' + Class for estimating AutoCovariance from timeseries + + Parameters + ---------- + lag : scalar, int + maximum time-lag for which the ACF is estimated. + (Default lag where ACF is zero) + tr : transformation object + the transformation assuming that x is a sample of a transformed + Gaussian process. If g is None then x is a sample of a Gaussian + process (Default) + detrend : function + defining detrending performed on the signal before estimation. + (default detrend_mean) + window : vector of length NFFT or function + To create window vectors see numpy.blackman, numpy.hamming, + numpy.bartlett, scipy.signal, scipy.signal.get_window etc. + flag : string, 'biased' or 'unbiased' + If 'unbiased' scales the raw correlation by 1/(n-abs(k)), + where k is the index into the result, otherwise scales the raw + cross-correlation by 1/n. (default) + norm : bool + True if normalize output to one + dt : scalar + time-step between data points (default see sampling_period). + ''' + def __init__(self, lag=None, tr=None, detrend=None, window='boxcar', + flag='biased', norm=False, dt=None): + self.lag + self.tr = tr + self.detrend = detrend + self.window = window + self.flag = flag + self.norm = norm + self.dt = dt + + def _estimate_lag(self, R, Ncens): + Lmax = min(300, len(R) - 1) # maximum lag if L is undetermined + # finding where ACF is less than 2 st. deviations. + sigma = np.sqrt(np.r_[0, R[0] ** 2, + R[0] ** 2 + 2 * np.cumsum(R[1:] ** 2)] / Ncens) + lag = Lmax + 2 - (np.abs(R[Lmax::-1]) > 2 * sigma[Lmax::-1]).argmax() + if self.window == 'parzen': + lag = int(4 * lag / 3) + # print('The default L is set to %d' % L) + return lag + + def tocovdata(self, timeseries): + ''' + Return auto covariance function from data. + + Return + ------- + R : CovData1D object + with attributes: + data : ACF vector length L+1 + args : time lags length L+1 + sigma : estimated large lag standard deviation of the estimate + assuming x is a Gaussian process: + if R(k)=0 for all lags k>q then an approximation + of the variance for large samples due to Bartlett + var(R(k))=1/N*(R(0)^2+2*R(1)^2+2*R(2)^2+ ..+2*R(q)^2) + for k>q and where N=length(x). Special case is + white noise where it equals R(0)^2/N for k>0 + norm : bool + If false indicating that R is not normalized + + Example: + -------- + >>> import wafo.data + >>> import wafo.objects as wo + >>> x = wafo.data.sea() + >>> ts = wo.mat2timeseries(x) + >>> acf = ts.tocovdata(150) + >>> h = acf.plot() + ''' + lag = self.lag + window = self.window + detrend = self.detrend + + try: + x = timeseries.data.flatten('F') + dt = timeseries.sampling_period() + except Exception: + x = timeseries[:, 1:].flatten('F') + dt = sampling_period(timeseries[:, 0]) + if not (self.dt is None): + dt = self.dt + + if not (self.tr is None): + x = self.tr.dat2gauss(x) + + n = len(x) + indnan = np.isnan(x) + if any(indnan): + x = x - x[1 - indnan].mean() + Ncens = n - indnan.sum() + x[indnan] = 0. + else: + Ncens = n + x = x - x.mean() + if hasattr(detrend, '__call__'): + x = detrend(x) + + nfft = 2 ** nextpow2(n) + Rper = abs(fft(x, nfft)) ** 2 / Ncens # Raw periodogram + R = np.real(fft(Rper)) / nfft # ifft = fft/nfft since Rper is real! + + if self.flag.startswith('unbiased'): + # unbiased result, i.e. divide by n-abs(lag) + R = R[:Ncens] * Ncens / np.arange(Ncens, 1, -1) + + if self.norm: + R = R / R[0] + + if lag is None: + lag = self._estimate_lag(R, Ncens) + lag = min(lag, n - 2) + if isinstance(window, str) or type(window) is tuple: + win = get_window(window, 2 * lag - 1) + else: + win = np.asarray(window) + R[:lag] = R[:lag] * win[lag - 1::] + R[lag] = 0 + lags = slice(0, lag + 1) + t = np.linspace(0, lag * dt, lag + 1) + acf = CovData1D(R[lags], t) + acf.sigma = np.sqrt(np.r_[0, R[0] ** 2, + R[0] ** 2 + 2 * np.cumsum(R[1:] ** 2)] / Ncens) + acf.children = [PlotData(-2. * acf.sigma[lags], t), + PlotData(2. * acf.sigma[lags], t)] + acf.plot_args_children = ['r:'] + acf.norm = self.norm + return acf + + __call__ = tocovdata diff --git a/pywafo/src/wafo/covariance/test/test_covariance.py b/pywafo/src/wafo/covariance/test/test_covariance.py new file mode 100644 index 0000000..3b6aa2b --- /dev/null +++ b/pywafo/src/wafo/covariance/test/test_covariance.py @@ -0,0 +1,17 @@ +from numpy.testing import (run_module_suite, assert_equal, assert_almost_equal, + assert_array_equal, assert_array_almost_equal) +import numpy as np +from numpy import array, cos, exp, linspace, pi, sin, diff, arange, ones + +import wafo.spectrum.models as sm +from wafo.covariance import CovData1D + +def test_covariance(): + Sj = sm.Jonswap() + S = Sj.tospecdata() #Make spec + R = S.tocovdata() + + x = R.sim(ns=1000,dt=0.2) + +if __name__ == '__main__': + run_module_suite() diff --git a/pywafo/src/wafo/transform/estimation.py b/pywafo/src/wafo/transform/estimation.py new file mode 100644 index 0000000..1d8737c --- /dev/null +++ b/pywafo/src/wafo/transform/estimation.py @@ -0,0 +1,448 @@ +''' +Created on 8. mai 2014 + +@author: pab +''' +from wafo.transform.core import TrData +from wafo.transform.models import TrHermite, TrOchi, TrLinear +from wafo.stats import edf, skew, kurtosis +from wafo.interpolate import SmoothSpline +from scipy.special import ndtri as invnorm +from scipy.integrate import cumtrapz +import warnings +import numpy as np +floatinfo = np.finfo(float) + + +class TransformEstimator(object): + ''' + Estimate transformation, g, from ovserved data. + Assumption: a Gaussian process, Y, is related to the + non-Gaussian process, X, by Y = g(X). + + Parameters + ---------- + method : string + estimation method. Options are: + 'nonlinear' : smoothed crossing intensity (default) + 'mnonlinear': smoothed marginal cumulative distribution + 'hermite' : cubic Hermite polynomial + 'ochi' : exponential function + 'linear' : identity. + chkDer : bool + False: No check on the derivative of the transform. + True: Check if transform have positive derivative + csm, gsm : real scalars + defines the smoothing of the logarithm of crossing intensity and + the transformation g, respectively. Valid values must be + 0<=csm,gsm<=1. (default csm=0.9, gsm=0.05) + Smaller values gives smoother functions. + param : vector (default (-5, 5, 513)) + defines the region of variation of the data X. If X(t) is likely to + cross levels higher than 5 standard deviations then the vector param + has to be modified. For example if X(t) is unlikely to cross a level + of 7 standard deviations one can use param = (-7, 7, 513). + crossdef : string + Crossing definition used in the crossing spectrum: + 'u' or 1: only upcrossings + 'uM' or 2: upcrossings and Maxima (default) + 'umM' or 3: upcrossings, minima, and Maxima. + 'um' or 4: upcrossings and minima. + plotflag : int + 0 no plotting (Default) + 1 plots empirical and smoothed g(u) and the theoretical for a + Gaussian model. + 2 monitor the development of the estimation + Delay : real scalar + Delay time for each plot when PLOTFLAG==2. + linextrap: int + 0 use a regular smoothing spline + 1 use a smoothing spline with a constraint on the ends to ensure + linear extrapolation outside the range of the data. (default) + cvar: real scalar + Variances for the the crossing intensity. (default 1) + gvar: real scalar + Variances for the empirical transformation, g. (default 1) + ne : int + Number of extremes (maxima & minima) to remove from the estimation + of the transformation. This makes the estimation more robust + against outliers. (default 7) + ntr : int + Maximum length of empirical crossing intensity or CDF. The + empirical crossing intensity or CDF is interpolated linearly before + smoothing if their lengths exceeds Ntr. A reasonable NTR will + significantly speed up the estimation for long time series without + loosing any accuracy. NTR should be chosen greater than PARAM(3). + (default 10000) + multip : Bool + False: the data in columns belong to the same seastate (default). + True: the data in columns are from separate seastates. + ''' + + def __init__(self, method='nonlinear', chkder=True, plotflag=False, + csm=.95, gsm=.05, param=(-5, 5, 513), delay=2, ntr=10000, + linextrap=True, ne=7, cvar=1, gvar=1, multip=False, + crossdef='uM'): + self.method = method + self.chkder = chkder + self.plotflag = plotflag + self.csm = csm + self.gsm = gsm + self.param = param + self.delay = delay + self.ntr = ntr + self.linextrap = linextrap + self.ne = ne + self.cvar = cvar + self.gvar = gvar + self.multip = multip + self.crossdef = crossdef + + def _check_tr(self, tr, tr_raw): + eps = floatinfo.eps + x = tr.args + mean = tr.mean + sigma = tr.sigma + for ix in xrange(5): + dy = np.diff(tr.data) + if (dy <= 0).any(): + dy[dy > 0] = eps + gvar = -(np.hstack((dy, 0)) + np.hstack((0, dy))) / 2 + eps + pp_tr = SmoothSpline(tr_raw.args, tr_raw.data, p=1, + lin_extrap=self.linextrap, + var=ix * gvar) + tr = TrData(pp_tr(x), x, mean=mean, sigma=sigma) + else: + break + else: + msg = ''' + The estimated transfer function, g, is not + a strictly increasing function. + The transfer function is possibly not sufficiently smoothed. + ''' + warnings.warn(msg) + return tr + + def _trdata_lc(self, level_crossings, mean=None, sigma=None): + ''' + Estimate transformation, g, from observed crossing intensity. + + Assumption: a Gaussian process, Y, is related to the + non-Gaussian process, X, by Y = g(X). + + Parameters + ---------- + mean, sigma : real scalars + mean and standard deviation of the process + **options : + csm, gsm : real scalars + defines the smoothing of the crossing intensity and the + transformation g. + Valid values must be 0<=csm,gsm<=1. (default csm = 0.9 gsm=0.05) + Smaller values gives smoother functions. + param : + vector which defines the region of variation of the data X. + (default [-5, 5, 513]). + monitor : bool + if true monitor development of estimation + linextrap : bool + if true use a smoothing spline with a constraint on the ends to + ensure linear extrapolation outside the range of data. (default) + otherwise use a regular smoothing spline + cvar, gvar : real scalars + Variances for the crossing intensity and the empirical + transformation, g. (default 1) + ne : scalar integer + Number of extremes (maxima & minima) to remove from the estimation + of the transformation. This makes the estimation more robust + against outliers. (default 7) + ntr : scalar integer + Maximum length of empirical crossing intensity. The empirical + crossing intensity is interpolated linearly before smoothing if + the length exceeds ntr. A reasonable NTR (eg. 1000) will + significantly speed up the estimation for long time series without + loosing any accuracy. NTR should be chosen greater than PARAM(3). + (default inf) + + Returns + ------- + gs, ge : TrData objects + smoothed and empirical estimate of the transformation g. + + Notes + ----- + The empirical crossing intensity is usually very irregular. + More than one local maximum of the empirical crossing intensity + may cause poor fit of the transformation. In such case one + should use a smaller value of GSM or set a larger variance for GVAR. + If X(t) is likely to cross levels higher than 5 standard deviations + then the vector param has to be modified. For example if X(t) is + unlikely to cross a level of 7 standard deviations one can use + param = [-7 7 513]. + + Example + ------- + >>> import wafo.spectrum.models as sm + >>> import wafo.transform.models as tm + >>> from wafo.objects import mat2timeseries + >>> Hs = 7.0 + >>> Sj = sm.Jonswap(Hm0=Hs) + >>> S = Sj.tospecdata() #Make spectrum object from numerical values + >>> S.tr = tm.TrOchi(mean=0, skew=0.16, kurt=0, + ... sigma=Hs/4, ysigma=Hs/4) + >>> xs = S.sim(ns=2**16, iseed=10) + >>> ts = mat2timeseries(xs) + >>> tp = ts.turning_points() + >>> mm = tp.cycle_pairs() + >>> lc = mm.level_crossings() + >>> g0, g0emp = lc.trdata(monitor=True) # Monitor the development + >>> g1, g1emp = lc.trdata(gvar=0.5 ) # Equal weight on all points + >>> g2, g2emp = lc.trdata(gvar=[3.5, 0.5, 3.5]) # Less weight on ends + >>> int(S.tr.dist2gauss()*100) + 141 + >>> int(g0emp.dist2gauss()*100) + 380995 + >>> int(g0.dist2gauss()*100) + 143 + >>> int(g1.dist2gauss()*100) + 162 + >>> int(g2.dist2gauss()*100) + 120 + + g0.plot() # Check the fit. + + See also + troptset, dat2tr, trplot, findcross, smooth + + NB! the transformated data will be N(0,1) + + Reference + --------- + Rychlik , I., Johannesson, P., and Leadbetter, M.R. (1997) + "Modelling and statistical analysis of ocean wavedata + using a transformed Gaussian process", + Marine structures, Design, Construction and Safety, + Vol 10, pp 13--47 + ''' + if mean is None: + mean = level_crossings.mean + if sigma is None: + sigma = level_crossings.sigma + lc1, lc2 = level_crossings.args, level_crossings.data + intensity = level_crossings.intensity + + Ne = self.ne + ncr = len(lc2) + if ncr > self.ntr and self.ntr > 0: + x0 = np.linspace(lc1[Ne], lc1[-1 - Ne], self.ntr) + lc1, lc2 = x0, np.interp(x0, lc1, lc2) + Ne = 0 + Ner = self.ne + ncr = self.ntr + else: + Ner = 0 + + ng = len(np.atleast_1d(self.gvar)) + if ng == 1: + gvar = self.gvar * np.ones(ncr) + else: + gvar = np.interp(np.linspace(0, 1, ncr), + np.linspace(0, 1, ng), self.gvar) + + uu = np.linspace(*self.param) + g1 = sigma * uu + mean + + if Ner > 0: # Compute correction factors + cor1 = np.trapz(lc2[0:Ner + 1], lc1[0:Ner + 1]) + cor2 = np.trapz(lc2[-Ner - 1::], lc1[-Ner - 1::]) + else: + cor1 = 0 + cor2 = 0 + + lc22 = np.hstack((0, cumtrapz(lc2, lc1) + cor1)) + + if intensity: + lc22 = (lc22 + 0.5 / ncr) / (lc22[-1] + cor2 + 1. / ncr) + else: + lc22 = (lc22 + 0.5) / (lc22[-1] + cor2 + 1) + + lc11 = (lc1 - mean) / sigma + + lc22 = invnorm(lc22) # - ymean + + g2 = TrData(lc22.copy(), lc1.copy(), mean=mean, sigma=sigma) + g2.setplotter('step') + # NB! the smooth function does not always extrapolate well outside the + # edges causing poor estimate of g + # We may alleviate this problem by: forcing the extrapolation + # to be linear outside the edges or choosing a lower value for csm2. + + inds = slice(Ne, ncr - Ne) # indices to points we are smoothing over + slc22 = SmoothSpline(lc11[inds], lc22[inds], self.gsm, self.linextrap, + gvar[inds])(uu) + + g = TrData(slc22.copy(), g1.copy(), mean=mean, sigma=sigma) + + if self.chkder: + tr_raw = TrData(lc22[inds], lc11[inds], mean=mean, sigma=sigma) + g = self._check_tr(g, tr_raw) + + if self.plotflag > 0: + g.plot() + g2.plot() + + return g, g2 + + def _trdata_cdf(self, data): + ''' + Estimate transformation, g, from observed marginal CDF. + Assumption: a Gaussian process, Y, is related to the + non-Gaussian process, X, by Y = g(X). + Parameters + ---------- + options = options structure defining how the smoothing is done. + (See troptset for default values) + Returns + ------- + tr, tr_emp = smoothed and empirical estimate of the transformation g. + + The empirical CDF is usually very irregular. More than one local + maximum of the empirical CDF may cause poor fit of the transformation. + In such case one should use a smaller value of GSM or set a larger + variance for GVAR. If X(t) is likely to cross levels higher than 5 + standard deviations then the vector param has to be modified. For + example if X(t) is unlikely to cross a level of 7 standard deviations + one can use param = [-7 7 513]. + ''' + mean = data.mean() + sigma = data.std() + cdf = edf(data.ravel()) + Ne = self.ne + nd = len(cdf.data) + if nd > self.ntr and self.ntr > 0: + x0 = np.linspace(cdf.args[Ne], cdf.args[nd - 1 - Ne], self.ntr) + cdf.data = np.interp(x0, cdf.args, cdf.data) + cdf.args = x0 + Ne = 0 + uu = np.linspace(*self.param) + + ncr = len(cdf.data) + ng = len(np.atleast_1d(self.gvar)) + if ng == 1: + gvar = self.gvar * np.ones(ncr) + else: + self.gvar = np.atleast_1d(self.gvar) + gvar = np.interp(np.linspace(0, 1, ncr), + np.linspace(0, 1, ng), self.gvar.ravel()) + + ind = np.flatnonzero(np.diff(cdf.args) > 0) # remove equal points + nd = len(ind) + ind1 = ind[Ne:nd - Ne] + tmp = invnorm(cdf.data[ind]) + + x = sigma * uu + mean + pp_tr = SmoothSpline(cdf.args[ind1], tmp[Ne:nd - Ne], p=self.gsm, + lin_extrap=self.linextrap, var=gvar[ind1]) + tr = TrData(pp_tr(x), x, mean=mean, sigma=sigma) + tr_emp = TrData(tmp, cdf.args[ind], mean=mean, sigma=sigma) + tr_emp.setplotter('step') + + if self.chkder: + tr_raw = TrData(tmp[Ne:nd - Ne], cdf.args[ind1], mean=mean, + sigma=sigma) + tr = self._check_tr(tr, tr_raw) + + if self.plotflag > 0: + tr.plot() + tr_emp.plot() + return tr, tr_emp + + def trdata(self, timeseries): + ''' + + Returns + ------- + tr, tr_emp : TrData objects + with the smoothed and empirical transformation, respectively. + + TRDATA estimates the transformation in a transformed Gaussian model. + Assumption: a Gaussian process, Y, is related to the + non-Gaussian process, X, by Y = g(X). + + The empirical crossing intensity is usually very irregular. + More than one local maximum of the empirical crossing intensity may + cause poor fit of the transformation. In such case one should use a + smaller value of CSM. In order to check the effect of smoothing it is + recomended to also plot g and g2 in the same plot or plot the smoothed + g against an interpolated version of g (when CSM=GSM=1). + + Example + ------- + >>> import wafo.spectrum.models as sm + >>> import wafo.transform.models as tm + >>> from wafo.objects import mat2timeseries + >>> Hs = 7.0 + >>> Sj = sm.Jonswap(Hm0=Hs) + >>> S = Sj.tospecdata() #Make spectrum object from numerical values + >>> S.tr = tm.TrOchi(mean=0, skew=0.16, kurt=0, + ... sigma=Hs/4, ysigma=Hs/4) + >>> xs = S.sim(ns=2**16, iseed=10) + >>> ts = mat2timeseries(xs) + >>> g0, g0emp = ts.trdata(monitor=True) + >>> g1, g1emp = ts.trdata(method='m', gvar=0.5 ) + >>> g2, g2emp = ts.trdata(method='n', gvar=[3.5, 0.5, 3.5]) + >>> int(S.tr.dist2gauss()*100) + 141 + >>> int(g0emp.dist2gauss()*100) + 217949 + >>> int(g0.dist2gauss()*100) + 93 + >>> int(g1.dist2gauss()*100) + 66 + >>> int(g2.dist2gauss()*100) + 84 + + See also + -------- + LevelCrossings.trdata + wafo.transform.models + + References + ---------- + Rychlik, I. , Johannesson, P and Leadbetter, M. R. (1997) + "Modelling and statistical analysis of ocean wavedata using + transformed Gaussian process." + Marine structures, Design, Construction and Safety, Vol. 10, No. 1, + pp 13--47 + + Brodtkorb, P, Myrhaug, D, and Rue, H (1999) + "Joint distribution of wave height and crest velocity from + reconstructed data" + in Proceedings of 9th ISOPE Conference, Vol III, pp 66-73 + ''' + + data = np.atleast_1d(timeseries.data) + ma = data.mean() + sa = data.std() + method = self.method[0] + if method == 'l': + return TrLinear(mean=ma, sigma=sa), TrLinear(mean=ma, sigma=sa) + if method == 'n': + tp = timeseries.turning_points() + mM = tp.cycle_pairs() + lc = mM.level_crossings(self.crossdef) + return self._trdata_lc(lc) + elif method == 'm': + return self._trdata_cdf(data) + elif method == 'h': + ga1 = skew(data) + ga2 = kurtosis(data, fisher=True) # kurt(xx(n+1:end))-3; + up = min(4 * (4 * ga1 / 3) ** 2, 13) + lo = (ga1 ** 2) * 3 / 2 + kurt1 = min(up, max(ga2, lo)) + 3 + return TrHermite(mean=ma, var=sa ** 2, skew=ga1, kurt=kurt1) + elif method[0] == 'o': + ga1 = skew(data) + return TrOchi(mean=ma, var=sa ** 2, skew=ga1) + + __call__ = trdata diff --git a/pywafo/src/wafo/win32_utils.py b/pywafo/src/wafo/win32_utils.py new file mode 100644 index 0000000..086e0ef --- /dev/null +++ b/pywafo/src/wafo/win32_utils.py @@ -0,0 +1,258 @@ +from __future__ import division +from numpy import round +from threading import Thread +from time import sleep +from win32gui import (InitCommonControls, CallWindowProc, CreateWindowEx, + CreateWindow, SetWindowLong, SendMessage, ShowWindow, + PumpWaitingMessages, PostQuitMessage, DestroyWindow, + MessageBox, EnumWindows, GetClassName) +from win32api import GetModuleHandle, GetSystemMetrics # @UnresolvedImport +from win32api import SetWindowLong as api_SetWindowLong # @UnresolvedImport +from commctrl import (TOOLTIPS_CLASS, TTM_GETDELAYTIME, TTM_SETDELAYTIME, + TTDT_INITIAL, TTDT_AUTOPOP) +import win32con + +WM_USER = win32con.WM_USER +PBM_SETRANGE = (WM_USER + 1) +PBM_SETPOS = (WM_USER + 2) +PBM_DELTAPOS = (WM_USER + 3) +PBM_SETSTEP = (WM_USER + 4) +PBM_STEPIT = (WM_USER + 5) +PBM_SETRANGE32 = (WM_USER + 6) +PBM_GETRANGE = (WM_USER + 7) +PBM_GETPOS = (WM_USER + 8) +PBM_SETBARCOLOR = (WM_USER + 9) +PBM_SETMARQUEE = (WM_USER + 10) +PBM_GETSTEP = (WM_USER + 13) +PBM_GETBKCOLOR = (WM_USER + 14) +PBM_GETBARCOLOR = (WM_USER + 15) +PBM_SETSTATE = (WM_USER + 16) +PBM_GETSTATE = (WM_USER + 17) +PBS_SMOOTH = 0x01 +PBS_VERTICAL = 0x04 +PBS_MARQUEE = 0x08 +PBS_SMOOTHREVERSE = 0x10 +PBST_NORMAL = 1 +PBST_ERROR = 2 +PBST_PAUSED = 3 +WC_DIALOG = 32770 +WM_SETTEXT = win32con.WM_SETTEXT + + +def MAKELPARAM(a, b): + return (a & 0xffff) | ((b & 0xffff) << 16) + + +def _get_tooltip_handles(hwnd, resultList): + ''' + Adds a window handle to resultList if its class-name is 'tooltips_class32', + i.e. the window is a tooltip. + ''' + if GetClassName(hwnd) == TOOLTIPS_CLASS: + resultList.append(hwnd) + + +def set_max_pop_delay_on_tooltip(tooltip): + ''' + Sets maximum auto-pop delay (delay before hiding) on an instance of + wx.ToolTip. + NOTE: The tooltip's SetDelay method is used just to identify the correct + tooltip. + ''' + test_value = 12345 + # Set initial delay just to identify tooltip. + tooltip.SetDelay(test_value) + handles = [] + EnumWindows(_get_tooltip_handles, handles) + for hwnd in handles: + if SendMessage(hwnd, TTM_GETDELAYTIME, TTDT_INITIAL) == test_value: + SendMessage(hwnd, TTM_SETDELAYTIME, TTDT_AUTOPOP, 32767) + tooltip.SetDelay(500) # Restore default value + + +class Waitbar(Thread): + + def __init__(self, title='Waitbar', can_abort=True, max_val=100): + Thread.__init__(self) # Initialize thread + self.title = title + self.can_abort = can_abort + self.max_val = max_val + InitCommonControls() + self.hinst = GetModuleHandle(None) + self.started = False + self.position = 0 + self.do_update = False + self.start() # Run the thread + while not self.started: + sleep(0.1) # Wait until the dialog is ready + + def DlgProc(self, hwnd, uMsg, wParam, lParam): + if uMsg == win32con.WM_DESTROY: + api_SetWindowLong(self.dialog, + win32con.GWL_WNDPROC, + self.oldWndProc) + if uMsg == win32con.WM_CLOSE: + self.started = False + if uMsg == win32con.WM_COMMAND and self.can_abort: + self.started = False + return CallWindowProc(self.oldWndProc, hwnd, uMsg, wParam, lParam) + + def BuildWindow(self): + width = 400 + height = 100 + self.dialog = CreateWindowEx( + win32con.WS_EX_TOPMOST, + WC_DIALOG, + self.title + ' (0%)', + win32con.WS_VISIBLE | win32con.WS_OVERLAPPEDWINDOW, + int(round( + GetSystemMetrics(win32con.SM_CXSCREEN) * .5 - width * .5)), + int(round( + GetSystemMetrics(win32con.SM_CYSCREEN) * .5 - height * .5)), + width, + height, + 0, + 0, + self.hinst, + None) + self.progbar = CreateWindow( + # win32con.WS_EX_DLGMODALFRAME, + 'msctls_progress32', + '', + win32con.WS_VISIBLE | win32con.WS_CHILD, + 10, + 10, + width - 30, + 20, + self.dialog, + 0, + 0, + None) + if self.can_abort: + self.button = CreateWindow( + # win32con.WS_EX_DLGMODALFRAME, + 'BUTTON', + 'Cancel', + win32con.WS_VISIBLE | win32con.WS_CHILD | win32con.BS_PUSHBUTTON, # @IgnorePep8 + int(width / 2.75), + 40, + 100, + 20, + self.dialog, + 0, + 0, + None) + self.oldWndProc = SetWindowLong( + self.dialog, + win32con.GWL_WNDPROC, + self.DlgProc) + SendMessage(self.progbar, PBM_SETRANGE, 0, MAKELPARAM(0, self.max_val)) +# win32gui.SendMessage(self.progbar, PBM_SETSTEP, 0, 10) +# win32gui.SendMessage(self.progbar, PBM_SETMARQUEE, 0, 0) + ShowWindow(self.progbar, win32con.SW_SHOW) + + def run(self): + self.BuildWindow() + self.started = True + while self.started: + PumpWaitingMessages() + if self.do_update: + SendMessage(self.progbar, PBM_SETPOS, + int(self.position % self.max_val), 0) + percentage = int(round(100.0 * self.position / self.max_val)) + SendMessage(self.dialog, WM_SETTEXT, 0, + self.title + ' (%d%%)' % percentage) + # SendMessage(self.progbar, PBM_STEPIT, 0, 0) + self.do_update = False + sleep(0.1) + PostQuitMessage(0) + DestroyWindow(self.dialog) + + def update(self, pos): + if self.started: + if not self.do_update: + self.position = pos + self.do_update = True + return True + return False + + def close(self): + self.started = False + + +# class Waitbar2(Dialog, Thread): +# def __init__(self, title='Waitbar'): +# template = [[title, (0, 0, 215, 36), +# (win32con.DS_MODALFRAME | win32con.WS_POPUP | +# win32con.WS_VISIBLE | win32con.WS_CAPTION | +# win32con.WS_SYSMENU | win32con.DS_SETFONT | +# win32con.WS_GROUP | win32con.WS_EX_TOPMOST), +# | win32con.DS_SYSMODAL), +# None, (8, "MS Sans Serif")], ] +# Dialog.__init__(self, id=template) +# Thread.__init__(self) # Initialize thread +# self.started = False +# self.start() # Run the thread +# while not self.started: +# sleep(0.1) # Wait until the dialog is ready +# +# def OnInitDialog(self): +# rc = Dialog.OnInitDialog(self) +# self.pbar = CreateProgressCtrl() +# self.pbar.CreateWindow (win32con.WS_CHILD | win32con.WS_VISIBLE, +# (10, 10, 310, 24), self, 1001) +# self.started = True +# return rc +# +# def run(self): +# self.DoModal() +# +# def update(self, pos): +# self.pbar.SetPos(int(pos)) +# +# def close(self): +# self.OnCancel() + + +class WarnDlg(Thread): + + def __init__(self, message='', title='Warning!'): + Thread.__init__(self) # Initialize thread + self.title = title + self.message = message + self.start() # Run the thread + + def run(self): +# MessageBox(self.message, self.title, win32con.MB_ICONWARNING) + MessageBox(0, self.message, self.title, + win32con.MB_ICONWARNING | win32con.MB_SYSTEMMODAL) + + +class ErrorDlg(Thread): + + def __init__(self, message='', title='Error!', blocking=False): + Thread.__init__(self) # Initialize thread + self.title = title + self.message = message + if blocking: + self.run() # Run without threading + else: + self.start() # Run in thread + + def run(self): +# MessageBox(self.message, self.title, win32con.MB_ICONERROR) + MessageBox(0, self.message, self.title, + win32con.MB_ICONERROR | win32con.MB_SYSTEMMODAL) + + +if __name__ == '__main__': + WarnDlg('This is an example of a warning', 'Warning!') + ErrorDlg('This is an example of an error message') + wb = Waitbar('Waitbar example') +# wb2 = Waitbar2('Waitbar example') + for i in xrange(20): + print wb.update(i * 5) +# wb2.update(i) + sleep(0.1) + wb.close() +# wb2.close()