# -*- coding: utf-8 -*-
# Copyright (C) 2016 Jeremy Sanders <jeremy@jeremysanders.net>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Library General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Library General Public License for more details.
#
# You should have received a copy of the GNU Library General Public
# License along with this library; if not, write to the Free
# Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
# MA 02111-1307, USA
"""A fit to the model.
"""
from __future__ import division, print_function, absolute_import
from six.moves import range, zip
import six.moves.cPickle as pickle
import scipy.optimize
import numpy as N
from . import utils
from .utils import uprint
try:
import veusz.embed as veusz
except ImportError:
veusz = None
debugfit = True
[docs]class Fit:
"""Class to help fitting model, by keeping track of thawed parameters."""
def __init__(self, pars, model, data):
"""
:param dict[str,ParamBase] pars: parameters for model
:param Model model: Model to fit
:param Data data: Data to fit
The parameters pars are for the model, but a parameter called
backscale can be included, which controls the scaling of the
background
"""
self.pars = pars
self.model = model
self.data = data
self.refreshThawed()
self.veuszembed = None
self.bestlike = -1e99
[docs] def refreshThawed(self):
"""Call this after making changes to which parameters are thawed."""
self.thawed = [
name for name, par in sorted(self.pars.items()) if not par.frozen]
[docs] def calcProfiles(self):
"""Predict model profiles for each band.
"""
ne_prof, T_prof, Z_prof = self.model.computeProfs(self.pars)
# optional background scaling parameter
if 'backscale' in self.pars:
backscale = self.pars['backscale'].val
else:
backscale = 1.
profs = []
for band in self.data.bands:
modprof = band.calcProjProfile(
self.data.annuli, ne_prof, T_prof, Z_prof,
self.model.NH_1022pcm2,
backscale=backscale)
profs.append(modprof)
return profs
[docs] def likeFromProfs(self, predprofs):
"""Given predicted profiles, calculate log likelihood
(excluding prior).
:param list[numpy.array] predprofs: input profiles
"""
likelihood = 0.
for band, predprof in zip(self.data.bands, predprofs):
likelihood += utils.cashLogLikelihood(band.cts, predprof)
return likelihood
[docs] def thawedParVals(self):
"""Return list of numeric values of thawed parameters."""
return [self.pars[name].val for name in self.thawed]
[docs] def updateThawed(self, vals):
"""Update values of parameter ParamBase objects which are thawed.
:param list[float] vals: numerical values of parameters
"""
for val, name in zip(vals, self.thawed):
self.pars[name].val = val
[docs] def getLikelihood(self, vals=None):
"""Get likelihood for parameters given.
Also include are the priors from the various components
"""
if vals is not None:
self.updateThawed(vals)
# prior on parameters
parprior = sum( (self.pars[p].prior() for p in self.pars) )
if not N.isfinite(parprior):
# don't want to evaluate profiles for invalid parameters
return -N.inf
profs = self.calcProfiles()
like = self.likeFromProfs(profs)
prior = self.model.prior(self.pars) + parprior
totlike = float(like+prior)
if debugfit and (totlike-self.bestlike) > 0.1:
self.bestlike = totlike
#print("Better fit %.1f" % totlike)
with utils.AtomicWriteFile("fit.dat") as fout:
uprint(
"likelihood = %g + %g = %g" % (like, prior, totlike),
file=fout)
for p in sorted(self.pars):
uprint("%s = %s" % (p, self.pars[p]), file=fout)
return totlike
[docs] def doFitting(self, silent=False, maxiter=10):
"""Optimize parameters to increase likelihood. Uses scipy's
Nelder-Mead and Powell optimizers, repeating if a new minimum
is found.
:param bool silent: print output during fitting
:param int maxiter: maximum number of iterations
:returns: log likelihood
"""
if not silent:
uprint('Fitting (Iteration 1)')
ctr = [0]
def minfunc(pars):
like = self.getLikelihood(pars)
if ctr[0] % 1000 == 0 and not silent:
uprint('%10i %10.1f' % (ctr[0], like))
ctr[0] += 1
return -like
thawedpars = self.thawedParVals()
lastlike = self.getLikelihood(thawedpars)
fpars = thawedpars
for i in range(maxiter):
fitpars = scipy.optimize.minimize(
minfunc, fpars, method='Nelder-Mead')
fpars = fitpars.x
fitpars = scipy.optimize.minimize(
minfunc, fpars, method='Powell')
fpars = fitpars.x
like = -fitpars.fun
if abs(lastlike-like) < 0.1:
break
if not silent:
uprint('Iteration %i' % (i+2))
lastlike = like
if not silent:
uprint('Fit Result: %.1f' % like)
self.updateThawed(fpars)
return like
[docs] def plotProfiles(self):
"""Plot surface brightness profiles of model and data if Veusz is
installed.
"""
if veusz is None:
raise RuntimeError('Veusz not found')
if self.veuszembed is None:
embed = self.veuszembed = veusz.Embedded()
grid = self.veuszembed.Root.Add('page').Add('grid', columns=1)
xaxis = grid.Add(
'axis', name='x', direction='horizontal', log=True,
autoRange='+2%')
for i in range(len(self.data.bands)):
graph = grid.Add('graph', name='graph%i' % i)
xydata = graph.Add(
'xy', marker='none', name='data',
xData='radius', yData='data_%i' %i)
xymodel = graph.Add(
'xy', marker='none', name='model', color='red',
xData='radius', yData='model_%i' %i)
edges = self.data.annuli.edges_arcmin
self.veuszembed.SetData(
'radius', 0.5*(edges[1:]+edges[:-1]),
symerr=0.5*(edges[1:]-edges[:-1]))
grid.Action('zeroMargins')
profs = self.calcProfiles()
for i, (band, prof) in enumerate(zip(self.data.bands, profs)):
self.veuszembed.SetData('data_%i' % i, band.cts)
self.veuszembed.SetData('model_%i' % i, prof)
[docs] def save(self, filename):
"""Save fit in Python pickle format."""
# don't try to pickle veusz window
embed = None
if self.veuszembed is not None:
embed = self.veuszembed
self.veuszembed = None
with open(filename, 'wb') as f:
pickle.dump(self, f)
if embed is not None:
self.veuszembed = embed
[docs]def genericPopulationMinimizer(
function, gennewparams, popnum=1000, keepfrac=0.8, maxiter=1000, sigmabreak=1e-3):
"""Minimize using a set of populations."""
# generate initial list of function values and parameters
uprint('Populating 0th generation')
popfun = []
poppar = []
while len(poppar) < popnum:
par = gennewparams()
fun = function(par)
if N.isfinite(fun):
popfun.append(fun)
poppar.append(par)
popfun = N.array(popfun)
poppar = N.array(poppar)
# number of items to keep in each iteration
keepnum = int(popnum*keepfrac)
# number of new parameters to create
newnum = popnum-keepnum
# temporary locations for new function values and new parameters
newfun = N.zeros(newnum)
newpar = N.zeros((newnum, poppar.shape[1]))
for gen in range(maxiter):
# Sort into function order, keeping fraction. This could be a
# partition, but this let's us keep track of what the best
# item is.
sortidxs = N.argsort(popfun)[:keepnum]
popfun = popfun[sortidxs]
poppar = poppar[sortidxs]
bestpars = ' '.join(['%6.3f' % p for p in poppar[0]])
bestfun = popfun[0]
rmsfun = N.sqrt(N.mean(popfun**2))
uprint('Gen %3i, best %7.4f, rms %7.4f, pars=%s' % (
gen, bestfun, rmsfun, bestpars))
if abs(bestfun-rmsfun) < sigmabreak:
break
# create new set of parameters
i = 0
while i < newnum:
# choose random best parameter
par1 = poppar[N.random.randint(keepnum)]
par2 = poppar[N.random.randint(keepnum)]
# adjust by standard deviations
movpar = N.random.normal()*(par2-par1) + par1
# if it is valid, keep
fun = function(movpar)
if N.isfinite(fun):
newfun[i] = fun
newpar[i, :] = movpar
i += 1
# merge list
popfun = N.concatenate( (popfun, newfun) )
poppar = N.vstack( (poppar, newpar) )
# find lowest value and return value and parameters
bestidx = N.argmin(popfun)
return popfun[bestidx], poppar[bestidx]
def populationMinimiser(fit, popnum=1000, keepfrac=0.8, maxiter=1000, sigmabreak=1e-3):
# get list of parameters min and max values
minvals = []
maxvals = []
for name, par in sorted(fit.pars.items()):
if not par.frozen:
minvals.append(par.minval)
maxvals.append(par.maxval)
minvals = N.array(minvals)
maxvals = N.array(maxvals)
def gennewparams():
r = N.random.rand(len(minvals))
pars = minvals + r*(maxvals-minvals)
return pars
def minfunc(pars):
like = fit.getLikelihood(pars)
return -like
bestfun, bestpar = genericPopulationMinimizer(
minfunc, gennewparams, popnum=popnum, keepfrac=keepfrac,
maxiter=maxiter, sigmabreak=sigmabreak)
uprint('Best likelihood after minimization:', -bestfun)
fit.updateThawed(bestpar)