added new files

master
Per.Andreas.Brodtkorb 10 years ago
parent 6aec932677
commit 9a2b9f6921

@ -0,0 +1,174 @@
'''
Created on 10. mai 2014
@author: pab
'''
import numpy as np
from numpy.fft import fft
from wafo.misc import nextpow2
from scipy.signal.windows import get_window
from wafo.containers import PlotData
from wafo.covariance import CovData1D
import warnings
def sampling_period(t_vec):
'''
Returns sampling interval
Returns
-------
dt : scalar
sampling interval, unit:
[s] if lagtype=='t'
[m] otherwise
See also
'''
dt1 = t_vec[1] - t_vec[0]
n = len(t_vec) - 1
t = t_vec[-1] - t_vec[0]
dt = t / n
if abs(dt - dt1) > 1e-10:
warnings.warn('Data is not uniformly sampled!')
return dt
class CovarianceEstimatior(object):
'''
Class for estimating AutoCovariance from timeseries
Parameters
----------
lag : scalar, int
maximum time-lag for which the ACF is estimated.
(Default lag where ACF is zero)
tr : transformation object
the transformation assuming that x is a sample of a transformed
Gaussian process. If g is None then x is a sample of a Gaussian
process (Default)
detrend : function
defining detrending performed on the signal before estimation.
(default detrend_mean)
window : vector of length NFFT or function
To create window vectors see numpy.blackman, numpy.hamming,
numpy.bartlett, scipy.signal, scipy.signal.get_window etc.
flag : string, 'biased' or 'unbiased'
If 'unbiased' scales the raw correlation by 1/(n-abs(k)),
where k is the index into the result, otherwise scales the raw
cross-correlation by 1/n. (default)
norm : bool
True if normalize output to one
dt : scalar
time-step between data points (default see sampling_period).
'''
def __init__(self, lag=None, tr=None, detrend=None, window='boxcar',
flag='biased', norm=False, dt=None):
self.lag
self.tr = tr
self.detrend = detrend
self.window = window
self.flag = flag
self.norm = norm
self.dt = dt
def _estimate_lag(self, R, Ncens):
Lmax = min(300, len(R) - 1) # maximum lag if L is undetermined
# finding where ACF is less than 2 st. deviations.
sigma = np.sqrt(np.r_[0, R[0] ** 2,
R[0] ** 2 + 2 * np.cumsum(R[1:] ** 2)] / Ncens)
lag = Lmax + 2 - (np.abs(R[Lmax::-1]) > 2 * sigma[Lmax::-1]).argmax()
if self.window == 'parzen':
lag = int(4 * lag / 3)
# print('The default L is set to %d' % L)
return lag
def tocovdata(self, timeseries):
'''
Return auto covariance function from data.
Return
-------
R : CovData1D object
with attributes:
data : ACF vector length L+1
args : time lags length L+1
sigma : estimated large lag standard deviation of the estimate
assuming x is a Gaussian process:
if R(k)=0 for all lags k>q then an approximation
of the variance for large samples due to Bartlett
var(R(k))=1/N*(R(0)^2+2*R(1)^2+2*R(2)^2+ ..+2*R(q)^2)
for k>q and where N=length(x). Special case is
white noise where it equals R(0)^2/N for k>0
norm : bool
If false indicating that R is not normalized
Example:
--------
>>> import wafo.data
>>> import wafo.objects as wo
>>> x = wafo.data.sea()
>>> ts = wo.mat2timeseries(x)
>>> acf = ts.tocovdata(150)
>>> h = acf.plot()
'''
lag = self.lag
window = self.window
detrend = self.detrend
try:
x = timeseries.data.flatten('F')
dt = timeseries.sampling_period()
except Exception:
x = timeseries[:, 1:].flatten('F')
dt = sampling_period(timeseries[:, 0])
if not (self.dt is None):
dt = self.dt
if not (self.tr is None):
x = self.tr.dat2gauss(x)
n = len(x)
indnan = np.isnan(x)
if any(indnan):
x = x - x[1 - indnan].mean()
Ncens = n - indnan.sum()
x[indnan] = 0.
else:
Ncens = n
x = x - x.mean()
if hasattr(detrend, '__call__'):
x = detrend(x)
nfft = 2 ** nextpow2(n)
Rper = abs(fft(x, nfft)) ** 2 / Ncens # Raw periodogram
R = np.real(fft(Rper)) / nfft # ifft = fft/nfft since Rper is real!
if self.flag.startswith('unbiased'):
# unbiased result, i.e. divide by n-abs(lag)
R = R[:Ncens] * Ncens / np.arange(Ncens, 1, -1)
if self.norm:
R = R / R[0]
if lag is None:
lag = self._estimate_lag(R, Ncens)
lag = min(lag, n - 2)
if isinstance(window, str) or type(window) is tuple:
win = get_window(window, 2 * lag - 1)
else:
win = np.asarray(window)
R[:lag] = R[:lag] * win[lag - 1::]
R[lag] = 0
lags = slice(0, lag + 1)
t = np.linspace(0, lag * dt, lag + 1)
acf = CovData1D(R[lags], t)
acf.sigma = np.sqrt(np.r_[0, R[0] ** 2,
R[0] ** 2 + 2 * np.cumsum(R[1:] ** 2)] / Ncens)
acf.children = [PlotData(-2. * acf.sigma[lags], t),
PlotData(2. * acf.sigma[lags], t)]
acf.plot_args_children = ['r:']
acf.norm = self.norm
return acf
__call__ = tocovdata

@ -0,0 +1,17 @@
from numpy.testing import (run_module_suite, assert_equal, assert_almost_equal,
assert_array_equal, assert_array_almost_equal)
import numpy as np
from numpy import array, cos, exp, linspace, pi, sin, diff, arange, ones
import wafo.spectrum.models as sm
from wafo.covariance import CovData1D
def test_covariance():
Sj = sm.Jonswap()
S = Sj.tospecdata() #Make spec
R = S.tocovdata()
x = R.sim(ns=1000,dt=0.2)
if __name__ == '__main__':
run_module_suite()

@ -0,0 +1,448 @@
'''
Created on 8. mai 2014
@author: pab
'''
from wafo.transform.core import TrData
from wafo.transform.models import TrHermite, TrOchi, TrLinear
from wafo.stats import edf, skew, kurtosis
from wafo.interpolate import SmoothSpline
from scipy.special import ndtri as invnorm
from scipy.integrate import cumtrapz
import warnings
import numpy as np
floatinfo = np.finfo(float)
class TransformEstimator(object):
'''
Estimate transformation, g, from ovserved data.
Assumption: a Gaussian process, Y, is related to the
non-Gaussian process, X, by Y = g(X).
Parameters
----------
method : string
estimation method. Options are:
'nonlinear' : smoothed crossing intensity (default)
'mnonlinear': smoothed marginal cumulative distribution
'hermite' : cubic Hermite polynomial
'ochi' : exponential function
'linear' : identity.
chkDer : bool
False: No check on the derivative of the transform.
True: Check if transform have positive derivative
csm, gsm : real scalars
defines the smoothing of the logarithm of crossing intensity and
the transformation g, respectively. Valid values must be
0<=csm,gsm<=1. (default csm=0.9, gsm=0.05)
Smaller values gives smoother functions.
param : vector (default (-5, 5, 513))
defines the region of variation of the data X. If X(t) is likely to
cross levels higher than 5 standard deviations then the vector param
has to be modified. For example if X(t) is unlikely to cross a level
of 7 standard deviations one can use param = (-7, 7, 513).
crossdef : string
Crossing definition used in the crossing spectrum:
'u' or 1: only upcrossings
'uM' or 2: upcrossings and Maxima (default)
'umM' or 3: upcrossings, minima, and Maxima.
'um' or 4: upcrossings and minima.
plotflag : int
0 no plotting (Default)
1 plots empirical and smoothed g(u) and the theoretical for a
Gaussian model.
2 monitor the development of the estimation
Delay : real scalar
Delay time for each plot when PLOTFLAG==2.
linextrap: int
0 use a regular smoothing spline
1 use a smoothing spline with a constraint on the ends to ensure
linear extrapolation outside the range of the data. (default)
cvar: real scalar
Variances for the the crossing intensity. (default 1)
gvar: real scalar
Variances for the empirical transformation, g. (default 1)
ne : int
Number of extremes (maxima & minima) to remove from the estimation
of the transformation. This makes the estimation more robust
against outliers. (default 7)
ntr : int
Maximum length of empirical crossing intensity or CDF. The
empirical crossing intensity or CDF is interpolated linearly before
smoothing if their lengths exceeds Ntr. A reasonable NTR will
significantly speed up the estimation for long time series without
loosing any accuracy. NTR should be chosen greater than PARAM(3).
(default 10000)
multip : Bool
False: the data in columns belong to the same seastate (default).
True: the data in columns are from separate seastates.
'''
def __init__(self, method='nonlinear', chkder=True, plotflag=False,
csm=.95, gsm=.05, param=(-5, 5, 513), delay=2, ntr=10000,
linextrap=True, ne=7, cvar=1, gvar=1, multip=False,
crossdef='uM'):
self.method = method
self.chkder = chkder
self.plotflag = plotflag
self.csm = csm
self.gsm = gsm
self.param = param
self.delay = delay
self.ntr = ntr
self.linextrap = linextrap
self.ne = ne
self.cvar = cvar
self.gvar = gvar
self.multip = multip
self.crossdef = crossdef
def _check_tr(self, tr, tr_raw):
eps = floatinfo.eps
x = tr.args
mean = tr.mean
sigma = tr.sigma
for ix in xrange(5):
dy = np.diff(tr.data)
if (dy <= 0).any():
dy[dy > 0] = eps
gvar = -(np.hstack((dy, 0)) + np.hstack((0, dy))) / 2 + eps
pp_tr = SmoothSpline(tr_raw.args, tr_raw.data, p=1,
lin_extrap=self.linextrap,
var=ix * gvar)
tr = TrData(pp_tr(x), x, mean=mean, sigma=sigma)
else:
break
else:
msg = '''
The estimated transfer function, g, is not
a strictly increasing function.
The transfer function is possibly not sufficiently smoothed.
'''
warnings.warn(msg)
return tr
def _trdata_lc(self, level_crossings, mean=None, sigma=None):
'''
Estimate transformation, g, from observed crossing intensity.
Assumption: a Gaussian process, Y, is related to the
non-Gaussian process, X, by Y = g(X).
Parameters
----------
mean, sigma : real scalars
mean and standard deviation of the process
**options :
csm, gsm : real scalars
defines the smoothing of the crossing intensity and the
transformation g.
Valid values must be 0<=csm,gsm<=1. (default csm = 0.9 gsm=0.05)
Smaller values gives smoother functions.
param :
vector which defines the region of variation of the data X.
(default [-5, 5, 513]).
monitor : bool
if true monitor development of estimation
linextrap : bool
if true use a smoothing spline with a constraint on the ends to
ensure linear extrapolation outside the range of data. (default)
otherwise use a regular smoothing spline
cvar, gvar : real scalars
Variances for the crossing intensity and the empirical
transformation, g. (default 1)
ne : scalar integer
Number of extremes (maxima & minima) to remove from the estimation
of the transformation. This makes the estimation more robust
against outliers. (default 7)
ntr : scalar integer
Maximum length of empirical crossing intensity. The empirical
crossing intensity is interpolated linearly before smoothing if
the length exceeds ntr. A reasonable NTR (eg. 1000) will
significantly speed up the estimation for long time series without
loosing any accuracy. NTR should be chosen greater than PARAM(3).
(default inf)
Returns
-------
gs, ge : TrData objects
smoothed and empirical estimate of the transformation g.
Notes
-----
The empirical crossing intensity is usually very irregular.
More than one local maximum of the empirical crossing intensity
may cause poor fit of the transformation. In such case one
should use a smaller value of GSM or set a larger variance for GVAR.
If X(t) is likely to cross levels higher than 5 standard deviations
then the vector param has to be modified. For example if X(t) is
unlikely to cross a level of 7 standard deviations one can use
param = [-7 7 513].
Example
-------
>>> import wafo.spectrum.models as sm
>>> import wafo.transform.models as tm
>>> from wafo.objects import mat2timeseries
>>> Hs = 7.0
>>> Sj = sm.Jonswap(Hm0=Hs)
>>> S = Sj.tospecdata() #Make spectrum object from numerical values
>>> S.tr = tm.TrOchi(mean=0, skew=0.16, kurt=0,
... sigma=Hs/4, ysigma=Hs/4)
>>> xs = S.sim(ns=2**16, iseed=10)
>>> ts = mat2timeseries(xs)
>>> tp = ts.turning_points()
>>> mm = tp.cycle_pairs()
>>> lc = mm.level_crossings()
>>> g0, g0emp = lc.trdata(monitor=True) # Monitor the development
>>> g1, g1emp = lc.trdata(gvar=0.5 ) # Equal weight on all points
>>> g2, g2emp = lc.trdata(gvar=[3.5, 0.5, 3.5]) # Less weight on ends
>>> int(S.tr.dist2gauss()*100)
141
>>> int(g0emp.dist2gauss()*100)
380995
>>> int(g0.dist2gauss()*100)
143
>>> int(g1.dist2gauss()*100)
162
>>> int(g2.dist2gauss()*100)
120
g0.plot() # Check the fit.
See also
troptset, dat2tr, trplot, findcross, smooth
NB! the transformated data will be N(0,1)
Reference
---------
Rychlik , I., Johannesson, P., and Leadbetter, M.R. (1997)
"Modelling and statistical analysis of ocean wavedata
using a transformed Gaussian process",
Marine structures, Design, Construction and Safety,
Vol 10, pp 13--47
'''
if mean is None:
mean = level_crossings.mean
if sigma is None:
sigma = level_crossings.sigma
lc1, lc2 = level_crossings.args, level_crossings.data
intensity = level_crossings.intensity
Ne = self.ne
ncr = len(lc2)
if ncr > self.ntr and self.ntr > 0:
x0 = np.linspace(lc1[Ne], lc1[-1 - Ne], self.ntr)
lc1, lc2 = x0, np.interp(x0, lc1, lc2)
Ne = 0
Ner = self.ne
ncr = self.ntr
else:
Ner = 0
ng = len(np.atleast_1d(self.gvar))
if ng == 1:
gvar = self.gvar * np.ones(ncr)
else:
gvar = np.interp(np.linspace(0, 1, ncr),
np.linspace(0, 1, ng), self.gvar)
uu = np.linspace(*self.param)
g1 = sigma * uu + mean
if Ner > 0: # Compute correction factors
cor1 = np.trapz(lc2[0:Ner + 1], lc1[0:Ner + 1])
cor2 = np.trapz(lc2[-Ner - 1::], lc1[-Ner - 1::])
else:
cor1 = 0
cor2 = 0
lc22 = np.hstack((0, cumtrapz(lc2, lc1) + cor1))
if intensity:
lc22 = (lc22 + 0.5 / ncr) / (lc22[-1] + cor2 + 1. / ncr)
else:
lc22 = (lc22 + 0.5) / (lc22[-1] + cor2 + 1)
lc11 = (lc1 - mean) / sigma
lc22 = invnorm(lc22) # - ymean
g2 = TrData(lc22.copy(), lc1.copy(), mean=mean, sigma=sigma)
g2.setplotter('step')
# NB! the smooth function does not always extrapolate well outside the
# edges causing poor estimate of g
# We may alleviate this problem by: forcing the extrapolation
# to be linear outside the edges or choosing a lower value for csm2.
inds = slice(Ne, ncr - Ne) # indices to points we are smoothing over
slc22 = SmoothSpline(lc11[inds], lc22[inds], self.gsm, self.linextrap,
gvar[inds])(uu)
g = TrData(slc22.copy(), g1.copy(), mean=mean, sigma=sigma)
if self.chkder:
tr_raw = TrData(lc22[inds], lc11[inds], mean=mean, sigma=sigma)
g = self._check_tr(g, tr_raw)
if self.plotflag > 0:
g.plot()
g2.plot()
return g, g2
def _trdata_cdf(self, data):
'''
Estimate transformation, g, from observed marginal CDF.
Assumption: a Gaussian process, Y, is related to the
non-Gaussian process, X, by Y = g(X).
Parameters
----------
options = options structure defining how the smoothing is done.
(See troptset for default values)
Returns
-------
tr, tr_emp = smoothed and empirical estimate of the transformation g.
The empirical CDF is usually very irregular. More than one local
maximum of the empirical CDF may cause poor fit of the transformation.
In such case one should use a smaller value of GSM or set a larger
variance for GVAR. If X(t) is likely to cross levels higher than 5
standard deviations then the vector param has to be modified. For
example if X(t) is unlikely to cross a level of 7 standard deviations
one can use param = [-7 7 513].
'''
mean = data.mean()
sigma = data.std()
cdf = edf(data.ravel())
Ne = self.ne
nd = len(cdf.data)
if nd > self.ntr and self.ntr > 0:
x0 = np.linspace(cdf.args[Ne], cdf.args[nd - 1 - Ne], self.ntr)
cdf.data = np.interp(x0, cdf.args, cdf.data)
cdf.args = x0
Ne = 0
uu = np.linspace(*self.param)
ncr = len(cdf.data)
ng = len(np.atleast_1d(self.gvar))
if ng == 1:
gvar = self.gvar * np.ones(ncr)
else:
self.gvar = np.atleast_1d(self.gvar)
gvar = np.interp(np.linspace(0, 1, ncr),
np.linspace(0, 1, ng), self.gvar.ravel())
ind = np.flatnonzero(np.diff(cdf.args) > 0) # remove equal points
nd = len(ind)
ind1 = ind[Ne:nd - Ne]
tmp = invnorm(cdf.data[ind])
x = sigma * uu + mean
pp_tr = SmoothSpline(cdf.args[ind1], tmp[Ne:nd - Ne], p=self.gsm,
lin_extrap=self.linextrap, var=gvar[ind1])
tr = TrData(pp_tr(x), x, mean=mean, sigma=sigma)
tr_emp = TrData(tmp, cdf.args[ind], mean=mean, sigma=sigma)
tr_emp.setplotter('step')
if self.chkder:
tr_raw = TrData(tmp[Ne:nd - Ne], cdf.args[ind1], mean=mean,
sigma=sigma)
tr = self._check_tr(tr, tr_raw)
if self.plotflag > 0:
tr.plot()
tr_emp.plot()
return tr, tr_emp
def trdata(self, timeseries):
'''
Returns
-------
tr, tr_emp : TrData objects
with the smoothed and empirical transformation, respectively.
TRDATA estimates the transformation in a transformed Gaussian model.
Assumption: a Gaussian process, Y, is related to the
non-Gaussian process, X, by Y = g(X).
The empirical crossing intensity is usually very irregular.
More than one local maximum of the empirical crossing intensity may
cause poor fit of the transformation. In such case one should use a
smaller value of CSM. In order to check the effect of smoothing it is
recomended to also plot g and g2 in the same plot or plot the smoothed
g against an interpolated version of g (when CSM=GSM=1).
Example
-------
>>> import wafo.spectrum.models as sm
>>> import wafo.transform.models as tm
>>> from wafo.objects import mat2timeseries
>>> Hs = 7.0
>>> Sj = sm.Jonswap(Hm0=Hs)
>>> S = Sj.tospecdata() #Make spectrum object from numerical values
>>> S.tr = tm.TrOchi(mean=0, skew=0.16, kurt=0,
... sigma=Hs/4, ysigma=Hs/4)
>>> xs = S.sim(ns=2**16, iseed=10)
>>> ts = mat2timeseries(xs)
>>> g0, g0emp = ts.trdata(monitor=True)
>>> g1, g1emp = ts.trdata(method='m', gvar=0.5 )
>>> g2, g2emp = ts.trdata(method='n', gvar=[3.5, 0.5, 3.5])
>>> int(S.tr.dist2gauss()*100)
141
>>> int(g0emp.dist2gauss()*100)
217949
>>> int(g0.dist2gauss()*100)
93
>>> int(g1.dist2gauss()*100)
66
>>> int(g2.dist2gauss()*100)
84
See also
--------
LevelCrossings.trdata
wafo.transform.models
References
----------
Rychlik, I. , Johannesson, P and Leadbetter, M. R. (1997)
"Modelling and statistical analysis of ocean wavedata using
transformed Gaussian process."
Marine structures, Design, Construction and Safety, Vol. 10, No. 1,
pp 13--47
Brodtkorb, P, Myrhaug, D, and Rue, H (1999)
"Joint distribution of wave height and crest velocity from
reconstructed data"
in Proceedings of 9th ISOPE Conference, Vol III, pp 66-73
'''
data = np.atleast_1d(timeseries.data)
ma = data.mean()
sa = data.std()
method = self.method[0]
if method == 'l':
return TrLinear(mean=ma, sigma=sa), TrLinear(mean=ma, sigma=sa)
if method == 'n':
tp = timeseries.turning_points()
mM = tp.cycle_pairs()
lc = mM.level_crossings(self.crossdef)
return self._trdata_lc(lc)
elif method == 'm':
return self._trdata_cdf(data)
elif method == 'h':
ga1 = skew(data)
ga2 = kurtosis(data, fisher=True) # kurt(xx(n+1:end))-3;
up = min(4 * (4 * ga1 / 3) ** 2, 13)
lo = (ga1 ** 2) * 3 / 2
kurt1 = min(up, max(ga2, lo)) + 3
return TrHermite(mean=ma, var=sa ** 2, skew=ga1, kurt=kurt1)
elif method[0] == 'o':
ga1 = skew(data)
return TrOchi(mean=ma, var=sa ** 2, skew=ga1)
__call__ = trdata

@ -0,0 +1,258 @@
from __future__ import division
from numpy import round
from threading import Thread
from time import sleep
from win32gui import (InitCommonControls, CallWindowProc, CreateWindowEx,
CreateWindow, SetWindowLong, SendMessage, ShowWindow,
PumpWaitingMessages, PostQuitMessage, DestroyWindow,
MessageBox, EnumWindows, GetClassName)
from win32api import GetModuleHandle, GetSystemMetrics # @UnresolvedImport
from win32api import SetWindowLong as api_SetWindowLong # @UnresolvedImport
from commctrl import (TOOLTIPS_CLASS, TTM_GETDELAYTIME, TTM_SETDELAYTIME,
TTDT_INITIAL, TTDT_AUTOPOP)
import win32con
WM_USER = win32con.WM_USER
PBM_SETRANGE = (WM_USER + 1)
PBM_SETPOS = (WM_USER + 2)
PBM_DELTAPOS = (WM_USER + 3)
PBM_SETSTEP = (WM_USER + 4)
PBM_STEPIT = (WM_USER + 5)
PBM_SETRANGE32 = (WM_USER + 6)
PBM_GETRANGE = (WM_USER + 7)
PBM_GETPOS = (WM_USER + 8)
PBM_SETBARCOLOR = (WM_USER + 9)
PBM_SETMARQUEE = (WM_USER + 10)
PBM_GETSTEP = (WM_USER + 13)
PBM_GETBKCOLOR = (WM_USER + 14)
PBM_GETBARCOLOR = (WM_USER + 15)
PBM_SETSTATE = (WM_USER + 16)
PBM_GETSTATE = (WM_USER + 17)
PBS_SMOOTH = 0x01
PBS_VERTICAL = 0x04
PBS_MARQUEE = 0x08
PBS_SMOOTHREVERSE = 0x10
PBST_NORMAL = 1
PBST_ERROR = 2
PBST_PAUSED = 3
WC_DIALOG = 32770
WM_SETTEXT = win32con.WM_SETTEXT
def MAKELPARAM(a, b):
return (a & 0xffff) | ((b & 0xffff) << 16)
def _get_tooltip_handles(hwnd, resultList):
'''
Adds a window handle to resultList if its class-name is 'tooltips_class32',
i.e. the window is a tooltip.
'''
if GetClassName(hwnd) == TOOLTIPS_CLASS:
resultList.append(hwnd)
def set_max_pop_delay_on_tooltip(tooltip):
'''
Sets maximum auto-pop delay (delay before hiding) on an instance of
wx.ToolTip.
NOTE: The tooltip's SetDelay method is used just to identify the correct
tooltip.
'''
test_value = 12345
# Set initial delay just to identify tooltip.
tooltip.SetDelay(test_value)
handles = []
EnumWindows(_get_tooltip_handles, handles)
for hwnd in handles:
if SendMessage(hwnd, TTM_GETDELAYTIME, TTDT_INITIAL) == test_value:
SendMessage(hwnd, TTM_SETDELAYTIME, TTDT_AUTOPOP, 32767)
tooltip.SetDelay(500) # Restore default value
class Waitbar(Thread):
def __init__(self, title='Waitbar', can_abort=True, max_val=100):
Thread.__init__(self) # Initialize thread
self.title = title
self.can_abort = can_abort
self.max_val = max_val
InitCommonControls()
self.hinst = GetModuleHandle(None)
self.started = False
self.position = 0
self.do_update = False
self.start() # Run the thread
while not self.started:
sleep(0.1) # Wait until the dialog is ready
def DlgProc(self, hwnd, uMsg, wParam, lParam):
if uMsg == win32con.WM_DESTROY:
api_SetWindowLong(self.dialog,
win32con.GWL_WNDPROC,
self.oldWndProc)
if uMsg == win32con.WM_CLOSE:
self.started = False
if uMsg == win32con.WM_COMMAND and self.can_abort:
self.started = False
return CallWindowProc(self.oldWndProc, hwnd, uMsg, wParam, lParam)
def BuildWindow(self):
width = 400
height = 100
self.dialog = CreateWindowEx(
win32con.WS_EX_TOPMOST,
WC_DIALOG,
self.title + ' (0%)',
win32con.WS_VISIBLE | win32con.WS_OVERLAPPEDWINDOW,
int(round(
GetSystemMetrics(win32con.SM_CXSCREEN) * .5 - width * .5)),
int(round(
GetSystemMetrics(win32con.SM_CYSCREEN) * .5 - height * .5)),
width,
height,
0,
0,
self.hinst,
None)
self.progbar = CreateWindow(
# win32con.WS_EX_DLGMODALFRAME,
'msctls_progress32',
'',
win32con.WS_VISIBLE | win32con.WS_CHILD,
10,
10,
width - 30,
20,
self.dialog,
0,
0,
None)
if self.can_abort:
self.button = CreateWindow(
# win32con.WS_EX_DLGMODALFRAME,
'BUTTON',
'Cancel',
win32con.WS_VISIBLE | win32con.WS_CHILD | win32con.BS_PUSHBUTTON, # @IgnorePep8
int(width / 2.75),
40,
100,
20,
self.dialog,
0,
0,
None)
self.oldWndProc = SetWindowLong(
self.dialog,
win32con.GWL_WNDPROC,
self.DlgProc)
SendMessage(self.progbar, PBM_SETRANGE, 0, MAKELPARAM(0, self.max_val))
# win32gui.SendMessage(self.progbar, PBM_SETSTEP, 0, 10)
# win32gui.SendMessage(self.progbar, PBM_SETMARQUEE, 0, 0)
ShowWindow(self.progbar, win32con.SW_SHOW)
def run(self):
self.BuildWindow()
self.started = True
while self.started:
PumpWaitingMessages()
if self.do_update:
SendMessage(self.progbar, PBM_SETPOS,
int(self.position % self.max_val), 0)
percentage = int(round(100.0 * self.position / self.max_val))
SendMessage(self.dialog, WM_SETTEXT, 0,
self.title + ' (%d%%)' % percentage)
# SendMessage(self.progbar, PBM_STEPIT, 0, 0)
self.do_update = False
sleep(0.1)
PostQuitMessage(0)
DestroyWindow(self.dialog)
def update(self, pos):
if self.started:
if not self.do_update:
self.position = pos
self.do_update = True
return True
return False
def close(self):
self.started = False
# class Waitbar2(Dialog, Thread):
# def __init__(self, title='Waitbar'):
# template = [[title, (0, 0, 215, 36),
# (win32con.DS_MODALFRAME | win32con.WS_POPUP |
# win32con.WS_VISIBLE | win32con.WS_CAPTION |
# win32con.WS_SYSMENU | win32con.DS_SETFONT |
# win32con.WS_GROUP | win32con.WS_EX_TOPMOST),
# | win32con.DS_SYSMODAL),
# None, (8, "MS Sans Serif")], ]
# Dialog.__init__(self, id=template)
# Thread.__init__(self) # Initialize thread
# self.started = False
# self.start() # Run the thread
# while not self.started:
# sleep(0.1) # Wait until the dialog is ready
#
# def OnInitDialog(self):
# rc = Dialog.OnInitDialog(self)
# self.pbar = CreateProgressCtrl()
# self.pbar.CreateWindow (win32con.WS_CHILD | win32con.WS_VISIBLE,
# (10, 10, 310, 24), self, 1001)
# self.started = True
# return rc
#
# def run(self):
# self.DoModal()
#
# def update(self, pos):
# self.pbar.SetPos(int(pos))
#
# def close(self):
# self.OnCancel()
class WarnDlg(Thread):
def __init__(self, message='', title='Warning!'):
Thread.__init__(self) # Initialize thread
self.title = title
self.message = message
self.start() # Run the thread
def run(self):
# MessageBox(self.message, self.title, win32con.MB_ICONWARNING)
MessageBox(0, self.message, self.title,
win32con.MB_ICONWARNING | win32con.MB_SYSTEMMODAL)
class ErrorDlg(Thread):
def __init__(self, message='', title='Error!', blocking=False):
Thread.__init__(self) # Initialize thread
self.title = title
self.message = message
if blocking:
self.run() # Run without threading
else:
self.start() # Run in thread
def run(self):
# MessageBox(self.message, self.title, win32con.MB_ICONERROR)
MessageBox(0, self.message, self.title,
win32con.MB_ICONERROR | win32con.MB_SYSTEMMODAL)
if __name__ == '__main__':
WarnDlg('This is an example of a warning', 'Warning!')
ErrorDlg('This is an example of an error message')
wb = Waitbar('Waitbar example')
# wb2 = Waitbar2('Waitbar example')
for i in xrange(20):
print wb.update(i * 5)
# wb2.update(i)
sleep(0.1)
wb.close()
# wb2.close()
Loading…
Cancel
Save