You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

413 lines
11 KiB
Python

"""
Commentary
----------
Most of the work is done by the scipy.stats.distributions module.
This provides a plethora of continuous distributions to play with.
Each distribution has functions to generate random deviates, pdf's,
cdf's etc. as well as a function to fit the distribution to some given
data.
The fitting uses scipy.optimize.fmin to minimise the log odds of the
data given the distribution.
There are a couple of problems with this approach. First it is
sensitive to the initial guess at the parameters. Second it can be a
little slow.
Two key parameters are the 'loc' and 'scale' parameters. Data is
shifted by 'loc' and scaled by scale prior to fitting. Supplying
appropriate values for these parameters is important to getting a good
fit.
See the factory() function which picks from a handful of common
approaches for each distribution.
For some distributions (eg normal) it really makes sense just to
calculate the parameters directly from the data.
The code in the __ifmain__ should be a good guide how to use this.
Simply:
get a QuickFit object
add the distributions you want to try to fit
call fit() with your data
call fit_stats() to generate some stats on the fit.
call plot() if you want to see a plot.
Named after Mrs Twolumps, minister's secretary in the silly walks
sketch, who brings in coffee with a full silly walk.
Tenuous link with curve fitting is that you generally see "two lumps"
one in your data and the other in the curve that is being fitted.
Or alternately, if your data is not too silly then you can fit a
curve to it.
License is GNU LGPL v3, see https://launchpad.net/twolumps
"""
import inspect
from itertools import izip
import numpy
from wafo import stats
from scipy import mean, std
def factory(name):
""" Factory to return appropriate objects for each distro. """
fitters = dict(
beta=ZeroOneScipyDistribution,
alpha=ZeroOneScipyDistribution,
ncf=ZeroOneScipyDistribution,
triang=ZeroOneScipyDistribution,
uniform=ZeroOneScipyDistribution,
powerlaw=ZeroOneScipyDistribution,
pareto=MinLocScipyDistribution,
expon=MinLocScipyDistribution,
gamma=MinLocScipyDistribution,
lognorm=MinLocScipyDistribution,
maxwell=MinLocScipyDistribution,
weibull_min=MinLocScipyDistribution,
weibull_max=MaxLocScipyDistribution)
return fitters.get(name, ScipyDistribution)(name)
def get_continuous_distros():
""" Find all attributes of stats that are continuous distributions. """
fitters = []
skip = set()
for name, item in inspect.getmembers(stats):
if name in skip: continue
if item is stats.rv_continuous: continue
if isinstance(item, stats.rv_continuous):
fitters.append([name, factory(name)])
return fitters
class ScipyDistribution(object):
def __init__(self, name):
self.name = name
self.distro = self.get_distro()
self.fitted = None
def __getattr__(self, attr):
""" Try delegating to the distro object """
return getattr(self.distro, attr)
def get_distro(self):
return getattr(stats, self.name)
def set_distro(self, parms):
self.distro = getattr(stats, self.name)(*parms)
return self.distro
def calculate_loc_and_scale(self, data):
""" Calculate loc and scale parameters for fit.
Depending on the distribution, these need to be approximately
right to get a good fit.
"""
return mean(data), std(data)
def fit(self, data, *args, **kwargs):
""" This needs some work.
Seems the various scipy distributions do a reasonable job if given a good hint.
Need to get distro specific hints.
"""
fits = []
# try with and without providing loc and scale hints
# increases chance of a fit without an exception being
# generated.
for (loc, scale) in ((0.0, 1.0),
self.calculate_loc_and_scale(data)):
try:
parms = self.get_distro().fit(data, loc=loc, scale=scale)
self.set_distro(list(parms))
expected = self.expected(data)
rss = ((expected-data)**2).sum()
fits.append([rss, list(parms)])
parms = self.get_distro().fit(data, floc=loc, scale=scale)
self.set_distro(list(parms))
expected = self.expected(data)
rss = ((expected-data)**2).sum()
fits.append([rss, list(parms)])
except:
pass
# no fits means all tries raised exceptions
if not fits:
raise Exception("Exception in fit()")
# pick the one with the smallest rss
fits.sort()
self.parms = fits[0][1]
print self.parms
return self.set_distro(list(self.parms))
def expected(self, data):
""" Calculate expected values at each data point """
if self.fitted is not None:
return self.fitted
n = len(data)
xx = numpy.linspace(0, 1, n + 2)[1:-1]
self.fitted = self.ppf(xx)
#self.fitted = [self.ppf(x) for x in xx]
return self.fitted
def fit_stats(self, data):
""" Return stats on the fits
data assumed to be sorted.
"""
n = len(data)
dvar = numpy.var(data)
expected = self.expected(data)
evar = numpy.var(expected)
rss = 0.0
for expect, obs in izip(expected, data):
rss += (obs-expect) ** 2.0
self.rss = rss
self.dss = dvar * n
self.fss = evar * n
def residuals(self, data):
""" Return residuals """
expected = self.expected(data)
return numpy.array(data) - numpy.array(expected)
class MinLocScipyDistribution(ScipyDistribution):
def calculate_loc_and_scale(self, data):
""" Set loc to min value in the data.
Useful for weibull_min
"""
return min(data), std(data)
class MaxLocScipyDistribution(ScipyDistribution):
def calculate_loc_and_scale(self, data):
""" Set loc to max value in the data.
Useful for weibull_max
"""
return max(data), std(data)
class ZeroOneScipyDistribution(ScipyDistribution):
def calculate_loc_and_scale(self, data):
""" Set loc and scale to move to [0, 1] interval.
Useful for beta distribution
"""
return min(data), max(data)-min(data)
class QuickFit(object):
""" Fit a family of distributions.
Calculates stats on each fit.
Option to create plots.
"""
def __init__(self):
self.distributions = []
def add_distribution(self, distribution):
""" Add a ready-prepared ScipyDistribution """
self.distributions.append(distribution)
def add(self, name):
""" Add a distribution by name. """
self.distributions.append(factory(name))
def fit(self, data):
""" Fit all of the distros we have """
fitted = []
for distro in self.distributions:
print 'fitting distro', distro.name
try:
distro.fit(data)
except:
continue
fitted.append(distro)
self.distributions = fitted
print 'finished fitting'
def stats(self, data):
""" Return stats on the fits """
for dd in self.distributions:
dd.fit_stats(data)
def get_topn(self, n):
""" Return top-n best fits. """
data = [[x.rss, x] for x in self.distributions if numpy.isfinite(x.rss)]
data.sort()
if not n:
n = len(data)
return [x[1] for x in data[:n]]
def fit_plot(self, data, topn=0, bins=20):
""" Create a plot. """
from matplotlib import pylab as pl
distros = self.get_topn(topn)
xx = numpy.linspace(data.min(), data.max(), 300)
table = []
nparms = max(len(x.parms) for x in distros)
tcolours = []
for dd in distros:
patch = pl.plot(xx, [dd.pdf(p) for p in xx], label='%10.2f%% %s' % (100.0*dd.rss/dd.dss, dd.name))
row = ['', dd.name, '%10.2f%%' % (100.0*dd.rss/dd.dss,)] + ['%0.2f' % x for x in dd.parms]
while len(row) < 3 + nparms:
row.append('')
table.append(row)
tcolours.append([patch[0].get_markerfacecolor()] + ['w'] * (2+nparms))
# add a historgram with the data
pl.hist(data, bins=bins, normed=True)
tab = pl.table(cellText=table, cellColours=tcolours,
colLabels=['', 'Distribution', 'Res. SS/Data SS'] + ['P%d' % (x + 1,) for x in range(nparms)],
bbox=(0.0, 1.0, 1.0, 0.3))
#loc='top'))
#pl.legend(loc=0)
tab.auto_set_font_size(False)
tab.set_fontsize(10.)
def residual_plot(self, data, topn=0):
""" Create a residual plot. """
from matplotlib import pylab as pl
distros = self.get_topn(topn)
n = len(data)
xx = numpy.linspace(0, 1, n + 2)[1:-1]
for dd in distros:
pl.plot(xx, dd.residuals(data), label='%10.2f%% %s' % (100.0*dd.rss/dd.dss, dd.name))
pl.grid(True)
def plot(self, data, topn):
""" Plot data fit and residuals """
from matplotlib import pylab as pl
pl.axes([0.1, 0.4, 0.8, 0.4]) # leave room above the axes for the table
self.fit_plot(data, topn=topn)
pl.axes([0.1, 0.05, 0.8, 0.3])
self.residual_plot(data, topn=topn)
def read_data(infile, field):
""" Simple utility to extract a field out of a csv file. """
import csv
reader = csv.reader(infile)
header = reader.next()
field = header.index(field)
data = []
for row in reader:
data.append(float(row[field]))
return data
if __name__ == '__main__':
import sys
import optparse
from matplotlib import pylab as pl
parser = optparse.OptionParser()
parser.add_option('-d', '--distro', action='append', default=[])
parser.add_option('-l', '--list', action='store_true',
help='List available distros')
parser.add_option('-i', '--infile')
parser.add_option('-f', '--field', default='P/L')
parser.add_option('-n', '--topn', type='int', default=0)
parser.add_option('-s', '--sample', default='normal',
help='generate a sample from this distro as a test')
parser.add_option('--size', type='int', default=1000,
help='Size of sample to generate')
opts, args = parser.parse_args()
if opts.list:
for name, distro in get_continuous_distros():
print name
sys.exit()
opts.distro = ['weibull_min', 'norm']
if not opts.distro:
opts.distro = [x[0] for x in get_continuous_distros()]
quickfit = QuickFit()
for distro in opts.distro:
quickfit.add(distro)
if opts.sample:
data = getattr(numpy.random, opts.sample)(size=opts.size)
else:
data = numpy.array(read_data(open(opts.infile), opts.field))
data.sort()
quickfit.fit(data)
print 'doing stats'
quickfit.stats(data)
print 'doing plot'
quickfit.plot(data, topn=opts.topn)
pl.show()