Source code for park.fit

# This program is public domain
"""
Fitting service interface.

A fit consists of a set of models and a fitting engine.  The models are
collected in an assembly, which manages the parameter set and the
constraints between them.  The models themselves are tightly coupled
to the data that they are modeling and the data is invisible to the fit.

The fitting engine can use a variety of methods depending on model.


Usage
=====

The fitter can be run directly on the local machine::

    import park
    M1 = park.models.Peaks(datafile=park.sampledata('peak.dat'))
    M1.add_peak('P1', 'gaussian', A=[4,6], mu=[0.2, 0.5], sigma=0.1)
    result = park.fit(models=[M1])
    print result

The default settings print results every time the fit improves, and
print a global result when the fit is complete.  This is a suitable
interface for a fitting script.

For larger fit jobs you will want to run the fit on a remote server.
The model setup is identical, but the fit call is different::

    service = park.FitService('server:port')
    result = park.fit(models=[M1], service=service)
    print result

Again, the default settings print results every time the fit improves,
and print a global result when the fit is complete.

For long running fit jobs, you want to be able to disconnect from
the server after submitting the job, and later reconnect to fetch
the results.  An additional email field will send notification by
email when the fit starts and ends, and daily updates on the status
of all fits::

    service = park.FitService('server:port')
    service.notify(email='me@my.email.address',update='daily')
    fit = park.Fit(models=[M1])
    id = service.submit_job(fit, jobname='peaks')
    print id

The results can be retrieved either by id returned from the server,
or by the given jobname::

    import park
    service = park.FitService('server:port',user='userid')
    fitlist = service.retrieve('peaks')
    for fit in fitlist:
        print fit.summary()

The fit itself is a complicated object, including the model, the
optimizer, and the type of uncertainty analysis to perform.

GUI Usage
=========

When used from a graphical user interface, a different programming
interface is needed.  In this case, the user may want to watch
the progress of the fit and perhaps stop it.  Also, as fits can
take some time to complete, the user would like to be able to
set up additional fits and run them at the same time, switching
between them as necessary to monitor progress.

"""
import time, thread

import numpy

import assembly, fitresult

[docs]class Objective(object): """ Abstract interface to the fitness function for the park minimizer classes. Park provides a specific implementation `park.assembly.Assembly`. TODO: add a results() method to return model specific info to the TODO: fit handler. """
[docs] def residuals(self, p): """ Some fitters, notably Levenberg-Marquardt, operate directly on the residuals vector. If the individual residuals are not available, then LM cannot be used. This method is optional. """ raise NotImplementedError
[docs] def residuals_deriv(self, p): """ Returns residuals and derivatives with respect to the given parameters. If these are unavailable in the model, then they can be approximated by numerical derivatives, though it is generally better to use a derivative free optimizer such as coliny or cobyla which can use the function evaluations more efficiently. In any case, your objective function is responsible for calculating these. This method is optional. """ raise NotImplementedError
[docs] def fit_parameters(self): """ Returns a list of fit parameters. Each parameter has a name, an initial value and a range. See `park.fitresult.FitParameter` for an example. On each function evaluation a new parameter set will be passed to the fitter, with values in the same order as the list of parameters. """ raise NotImplementedError
def __call__(self, p): """ Returns the objective value for parameter set p . """ raise NotImplementedError
[docs] def abort(self): """ Halts the current function evaluation, and has it return inf. This will be called from a separate thread. If the function contains an expensive calculation, it should reset an abort flag before each evaluation and test it periodically. This method is optional. """
[docs]class Fitter(object): """Abstract interface for a fitness optimizer. A fitter has a single method, fit, which takes an objective function (`park.fit.Objective`) and a handler. For a concrete instance see `park.fitmc.FitMC`. """ def __init__(self, **kw): for k,v in kw.items(): if hasattr(self,k): setattr(self,k,v) else: raise AttributeError(k+" is not an attribute of "+self.__class__.__name__) def _threaded(self, fn, *args, **kw): thread.start_new_thread(fn,args,kw) def _fit(self, objective, x0, bounds): """ Run the actual fit in a separate thread Each cycle k of n: self.handler.progress(k,n) Each improvement: self.handler.result.update(x,fx,ncalls) self.handler.improvement() On completion (if not already performed): self.hander.result.update(x,fx,ncalls) self.handler.done self.handler.finalize() """ raise NotImplementedError
[docs] def fit(self, fitness, handler): """ Global optimizer. This function should return immediately """ # Determine initial value and bounds pars = fitness.fit_parameters() bounds = numpy.array([p.range for p in pars]).T x0 = [p.value for p in pars] # Initialize the monitor and results. # Need to make our own copy of the fit results so that the # values don't get stomped on by the next fit iteration. handler.done = False self.handler = handler fitpars = [fitresult.FitParameter(pars[i].name,pars[i].range,v) for i,v in enumerate(x0)] handler.result = fitresult.FitResult(fitpars, 0, numpy.NaN) # Run the fit (fit should perform _progress and _improvement updates) # This function may return before the fit is complete. self._fit(fitness, x0, bounds)
[docs]class FitJob(object): """ Fit job. This implements `park.job.Job`. """ def __init__(self, objective=None, fitter=None, handler=None): self.fitter = fitter self.objective = objective self.handler = handler
[docs] def run(self): self.fitter.fit(self.objective, self.handler)
[docs]class LocalQueue(object): """ Simple interface to the local job queue. Currently supports start and wait. Needs to support stop and status. Also, needs to be a proper queue, and needs to allow jobs to run in separate processes according to priority, etc. All the essentials of the remote queuing system without the remote calls. Unlike the remote queue, the local queue need not maintain persistence. """ running = False
[docs] def start(self, job): self.job = job job.run() return id(job)
[docs] def wait(self, interval=.1): """ Wait for the job to complete. This is used in scripts to impose a synchronous interface to the fitting service. """ while not self.job.handler.done: time.sleep(interval) return self.job.handler.result
[docs]def fit(models=None, fitter=None, service=None, handler=None): """ Start a fit with a set of models. The model set must be in a form accepted by `park.assembly.Assembly`. This is a convenience function which sets up the default optimizer and uses the local fitting engine to do the work. Progress reports are printed as they are received. The choice of fitter, service and handler can be specified by the caller. The default fitter is FitMC, which is a monte carlo Nelder-Mead simplex local optimizer with 100 random start points. The default handler does nothing. Instead, ConsoleUpdate could be used to report progress during the fit. The default service is to run in a separate thread with FitThread. Note that this will change soon to run in a separate process on the local machine so that python's global interpreter lock does not interfere with parallelism. """ if models is None: raise RuntimeError('fit expected a list of models') if service is None: service = LocalQueue() if fitter is None: import fitmc fitter = fitmc.FitMC() if handler is None: handler = fitresult.FitHandler() objective = assembly.Assembly(models) if isinstance(models,list) else models job = FitJob(objective,fitter,handler) service.start(job) return service.wait()
[docs]def assembly_example(): import park, time problem = park.assembly.example() #result = fit(problem) #result.print_summary() handler=fitresult.ConsoleUpdate(improvement_delta=0.1,progress_delta=1) #result = fit(problem, handler=handler) result = fit(problem) print "=== Fit complete ===" result.print_summary() print "=== Target values ===" print "M1: a=1, c=1.5" print "M2: a=2.5, c=3" if False: # Detailed results print "parameter vector",result.pvec problem(result.pvec) print "residuals",problem.residuals for k,m in enumerate(problem.parts): print "Model",k,"chisq",m.chisq,"weight",m.weight print "pars",m.fitness.model.a,m.fitness.model.c print "x",m.fitness.data.fit_x print "y",m.fitness.data.fit_y print "f(x)",m.fitness.data.fx print "(y-f(x))/dy",m.residuals
[docs]def demo(fitter=None): """Multiple minima example""" import time, math class MultiMin(object): def fit_parameters(self): return [fitresult.FitParameter('x1',[-5,5],1)] def __call__(self, p): x=p[0] fx = x**2 + math.sin(2*math.pi*x+3*math.pi/2) return fx handler = fitresult.ConsoleUpdate() # Show updates on the console handler.progress_delta = 1 # Update user every second handler.improvement_delta = 0.1 # Show improvements almost immediately fitter.fit(MultiMin(), handler) while not handler.done: time.sleep(1)
[docs]def demo2(fitter=None): import park, time problem = park.assembly.example() handler = fitresult.ConsoleUpdate() # Show updates on the console handler.progress_delta = 1 # Update user every second handler.improvement_delta = 1 # Show improvements at the same rate fitter.fit(problem, handler) while not handler.done: time.sleep(1)
if __name__ == "__main__": #main() assembly_example()