# -*- coding: utf-8 -*-
"""
Created on Thu Aug 27 16:39:32 2015
@author: ebachelet
"""
from __future__ import division
import time as python_time
import numpy as np
import scipy.optimize
import collections
import warnings
import emcee
import sys
import copy
from collections import OrderedDict
import multiprocessing as mp
from pyLIMA import microlmodels
from pyLIMA import microloutputs
from pyLIMA import microlguess
from pyLIMA import microltoolbox
from pyLIMA import microlpriors
from pyLIMA import microlcaustics
warnings.filterwarnings("ignore")
[docs]class FitException(Exception):
pass
[docs]class MLFits(object):
"""
######## Fitter module ########
This class contains the method to fit the event with the selected attributes.
**WARNING**: All fits (and so results) are made using data in flux.
Attributes :
event : the event object on which you perform the fit on. More details on the event module.
model : The microlensing model you want to fit. Has to be an object define in
microlmodels module.
More details on the microlmodels module.
method : The fitting method you want to use for the fit.
guess : The guess you can give to the fit or the guess return by the initial_guess function.
fit_results : the fit parameters returned by method LM and DE.
fit_covariance : the fit parameters covariance matrix returned by method LM and DE.
fit_time : the time needed to fit.
MCMC_chains : the MCMC chains returns by the MCMC method
MCMC_probabilities : the objective function computed for each chains of the MCMC method
fluxes_MCMC_method : a string describing how you want to estimate the model fluxes for the MCMC method.
outputs : the standard pyLIMA outputs. More details in the microloutputs module.
:param object event: the event object on which you perform the fit on. More details on the
event module.
"""
def __init__(self, event):
"""The fit class has to be intialized with an event object."""
self.event = event
self.model = microlmodels.ModelPSPL(event)
self.method = 'None'
self.guess = []
self.outputs = []
self.fit_results = []
self.fit_covariance = []
self.fit_time = []
self.DE_population = []
self.binary_regime = None
self.MCMC_chains = []
self.MCMC_probabilities = []
self.fluxes_MCMC_method = ''
self.pool = None
[docs] def mlfit(self, model, method, DE_population_size=10, flux_estimation_MCMC='MCMC', fix_parameters_dictionnary=None,
grid_resolution=10, computational_pool=None, binary_regime=None):
"""This function realize the requested microlensing fit, and set the according results
attributes.
:param object model: the model object requested. More details on the microlmodels module.
:param string method: The fitting method you want to use. Has to be a string in :
'LM' --> Levenberg-Marquardt algorithm. Based on the
scipy.optimize.leastsq routine.
**WARNING** : the parameter maxfev (number of maximum
iterations) is set to 50000
the parameter ftol (relative precision on the chi^2) is
set to 0.00001
your fit may not converge because of these limits.
The starting points of this method are found using the
initial_guess method.
Obviously, this can fail. In this case, switch to
method 'DE'.
'DE' --> Differential evolution algoritm. Based on the
scipy.optimize.differential_evolution.
Look Storn & Price (1997) : "Differential Evolution – A
Simple and Efficient Heuristic for global Optimization
over Continuous Spaces"
Because this method is heuristic, it is not 100% sure a
satisfying solution is found. Just relaunch :)
The result is then use as a starting point for the 'LM'
method.
'MCMC' --> Monte-Carlo Markov Chain algorithm. Based on the
emcee python package :
" emcee: The MCMC Hammer" (Foreman-Mackey et al. 2013).
The inital population is computed around the best
solution returned by
the 'DE' method.
:param int DE_population_size: The population factor desired for the DE method. Default is 10.
:param string flux_estimation_MCMC: The desired method to estimate the fluxes (f_source and g) of the
telescopes. 'MCMC' will do this through an MCMC method (default) when
everything else will do this thanks to a 1D polyfit through np.polyfit.
Note that a sanity check is done post-fit to assess the fit quality with the check_fit
function.
"""
print('')
print('Start fit on ' + self.event.name + ', with model ' + model.model_type + ' and method ' + method)
self.event.check_event()
self.model = model
self.method = method
self.fluxes_MCMC_method = flux_estimation_MCMC
self.DE_population_size = DE_population_size
self.model.define_model_parameters()
if method != 'DE':
self.check_parameters_boundaries()
if computational_pool:
pool = computational_pool
else:
pool = None
if pool:
manager = mp.Manager()
self.DE_population = manager.list()
self.binary_regime = binary_regime
if self.method == 'LM':
number_of_data = self.event.total_number_of_data_points()
if number_of_data <= (len(self.model.model_dictionnary)):
print("You do not have enough data points to use this method (LM), please switch to other methods." \
" Given the requested total model " + str(self.model.model_dictionnary.keys()) + \
" you need at least " + str(
len(self.model.model_dictionnary)) + ' data points to use the method LM!')
return
else:
self.fit_results, self.fit_covariance, self.fit_time = self.lmarquardt()
if self.method == 'TRF':
self.fit_results, self.fit_covariance, self.fit_time = self.trust_region_reflective()
if self.method == 'DE':
self.fit_results, self.fit_covariance, self.fit_time = self.differential_evolution(pool)
if self.method == 'MCMC':
self.MCMC_chains = self.MCMC(pool)
if self.method == 'GRIDS':
self.fix_parameters_dictionnary = OrderedDict(
sorted(fix_parameters_dictionnary.items(), key=lambda x: x[1]))
self.grid_resolution = grid_resolution
self.grid_parameters = self.grids()
fit_quality_flag = 'Good Fit'
#if self.method != 'MCMC':
# fit_quality_flag = self.check_fit()
#if fit_quality_flag == 'Bad Fit':
# if self.method == 'LM':
# print('We have to change method, this fit was unsuccessfull. We decided to switch ' \
# '' \
# 'method to "DE"')
# self.method = 'DE'
# self.mlfit(self.model, self.method, self.fluxes_MCMC_method)
# else:
# print('Unfortunately, this is too hard for pyLIMA :(')
[docs] def check_parameters_boundaries(self):
"""Check if the the parameters guess are inside the parameters boundaries.
return: raise an Error
"""
if self.model.parameters_guess != []:
for index,parameter in enumerate(self.model.parameters_guess):
if (parameter<self.model.parameters_boundaries[index][0]) | (parameter>self.model.parameters_boundaries[index][1]):
print('ERROR :Guess parameters provided are outside the specified boundaries')
raise FitException('Parameters guess are outside the parameters boundaries')
[docs] def check_fit(self):
"""Check if the fit results and covariance make sens.
0.0 terms or a negative term in the diagonal covariance matrix indicate the fit is not
reliable.
A negative source flux is also counted as a bad fit.
A negative rho or rho> 0.1 is also consider as a bad fit
:return: a flag indicated good or bad fit ('Good Fit' or 'Bad Fit')
:rtype: string
"""
flag_quality = 'Good Fit'
negative_covariance_diagonal = np.diag(self.fit_covariance) < 0
number_of_data = self.event.total_number_of_data_points()
if number_of_data >= (len(self.model.model_dictionnary) + 2 * len(self.event.telescopes)):
if (0.0 in self.fit_covariance):
print('Your fit probably wrong. Cause ==> bad covariance matrix')
flag_quality = 'Bad Fit'
return flag_quality
if (True in negative_covariance_diagonal) | \
(np.isnan(self.fit_covariance).any()) | (np.isinf(self.fit_covariance).any()):
print('Your fit probably wrong. Cause ==> bad covariance matrix')
flag_quality = 'Bad Fit'
return flag_quality
for i in self.event.telescopes:
if self.fit_results[self.model.model_dictionnary['fs_' + i.name]] < 0:
print('Your fit probably wrong. Cause ==> negative source flux for telescope ' + \
i.name)
flag_quality = 'Bad Fit'
return flag_quality
if 'rho' in list(self.model.model_dictionnary.keys()):
if (self.fit_results[self.model.model_dictionnary['rho']] > 0.1) | \
(self.fit_results[self.model.model_dictionnary['rho']] < 0.0):
print('Your fit probably wrong. Cause ==> bad rho ')
flag_quality = 'Bad Fit'
return flag_quality
return flag_quality
[docs] def initial_guess(self):
"""Try to estimate the microlensing parameters. Only use for PSPL and FSPL
models. More details on microlguess module.
:return guess_parameters: a list containing parameters guess related to the model.
:rtype: list
"""
if len(self.model.parameters_guess) == 0:
# Estimate the Paczynski parameters
if self.model.model_type == 'PSPL':
guess_paczynski_parameters, f_source = microlguess.initial_guess_PSPL(self.event)
if self.model.model_type == 'FSPL':
guess_paczynski_parameters, f_source = microlguess.initial_guess_FSPL(self.event)
if self.model.model_type == 'DSPL':
guess_paczynski_parameters, f_source = microlguess.initial_guess_DSPL(self.event)
# Estimate the telescopes fluxes (flux_source + g_blending) parameters
telescopes_fluxes = self.find_fluxes(guess_paczynski_parameters, self.model)
# The survey fluxes are already known from microlguess
telescopes_fluxes[0] = f_source
telescopes_fluxes[1] = 0.0
if 'piEN' in self.model.model_dictionnary.keys():
guess_paczynski_parameters = guess_paczynski_parameters + [0.0, 0.0]
if 'XiEN' in self.model.model_dictionnary.keys():
guess_paczynski_parameters = guess_paczynski_parameters + [0, 0]
if 'dsdt' in self.model.model_dictionnary.keys():
guess_paczynski_parameters = guess_paczynski_parameters + [0, 0]
if 'spot_size' in self.model.model_dictionnary.keys():
guess_paczynski_parameters = guess_paczynski_parameters + [0]
else:
guess_paczynski_parameters = list(self.model.parameters_guess)
telescopes_fluxes = self.find_fluxes(guess_paczynski_parameters, self.model)
guess_paczynski_parameters += telescopes_fluxes
print(sys._getframe().f_code.co_name, ' : Initial parameters guess SUCCESS')
return guess_paczynski_parameters
[docs] def MCMC(self,pool):
""" The MCMC method. Construct starting points of the chains around
the best solution found by the 'DE' method.
The objective function is :func:`chichi_MCMC`. Telescope flux (fs and g), can be optimized thanks to MCMC if
flux_estimation_MCMC is 'MCMC', either they are derived through np.polyfit.
Based on the emcee python package :
" emcee: The MCMC Hammer" (Foreman-Mackey et al. 2013).
Have a look here : http://dan.iel.fm/emcee/current/
:return: a tuple containing (MCMC_chains, MCMC_probabilities)
:rtype: tuple
**WARNING** :
nwalkers is set to 16 times the len of pazynski_parameters
nlinks is set to 1000
5*nwalkers*nlinks MCMC steps in total
"""
# start = python_time.time()
if len(self.model.parameters_guess) == 0:
differential_evolution_estimation = self.differential_evolution(pool)[0]
self.DE_population_size = 10
self.guess = differential_evolution_estimation
else:
self.guess = list(self.model.parameters_guess)
self.guess += self.find_fluxes(self.guess, self.model)
# Best solution
if self.fluxes_MCMC_method != 'MCMC':
limit_parameters = len(self.model.parameters_boundaries)
best_solution = self.guess[:limit_parameters]
else:
limit_parameters = len(self.guess)
best_solution = self.guess
nwalkers = 16 * len(best_solution)
nlinks = 1000
# Initialize the population of MCMC
population = []
count_walkers = 0
while count_walkers < nwalkers:
# Construct an individual of the population around the best solution.
individual = []
for parameter_key in list(self.model.model_dictionnary.keys())[:limit_parameters]:
parameter_trial = microlguess.MCMC_parameters_initialization(parameter_key,
self.model.model_dictionnary,
best_solution)
if parameter_trial:
for parameter in parameter_trial:
individual.append(parameter)
#if self.fluxes_MCMC_method == 'MCMC':
# fluxes = self.find_fluxes(individual, self.model)
# individual += (fluxes*np.random.uniform(0.99,1.01,len(fluxes))+np.random.uni).tolist()
chichi = self.chichi_MCMC(individual)
if chichi != -np.inf:
# np.array(individual)
# print count_walkers
population.append(np.array(individual))
count_walkers += 1
print('pre MCMC done')
number_of_parameters = len(individual)
try:
# create a new MPI pool
from schwimmbad import MPIPool
pool = MPIPool()
if not pool.is_master():
pool.wait()
sys.exit(0)
except:
pass
sampler = emcee.EnsembleSampler(nwalkers, number_of_parameters, self.chichi_MCMC,
a=2.0, pool= pool)
# First estimation using population as a starting points.
final_positions, final_probabilities, state = sampler.run_mcmc(population, nlinks, progress=True)
print('MCMC preburn done')
sampler.reset()
sampler.run_mcmc(final_positions, nlinks, progress=True)
MCMC_chains = np.c_[sampler.get_chain().reshape(nlinks*nwalkers,number_of_parameters),sampler.get_log_prob().reshape(nlinks*nwalkers)]
# Final estimation using the previous output.
#for positions, probabilities, states in sampler.sample(final_positions, iterations= nlinks,
# storechain=True):
# chains = np.c_[positions, probabilities]
# if MCMC_chains is not None:
# MCMC_chains = np.r_[MCMC_chains, chains]
# else:
# MCMC_chains = chains
print(sys._getframe().f_code.co_name, ' : MCMC fit SUCCESS')
return MCMC_chains
[docs] def chichi_MCMC(self, fit_process_parameters):
"""Return the chi^2 for the MCMC method. There is some priors here.
:param list fit_process_parameters: the model parameters ingested by the correpsonding
fitting routine.
:returns: here, the return is -chi^2/2 (likelihood)
:rtype: float
"""
chichi = 0
pyLIMA_parameters = self.model.compute_pyLIMA_parameters(fit_process_parameters)
prior = microlpriors.priors_on_models(pyLIMA_parameters, self.model, binary_regime = self.binary_regime)
if prior != np.inf:
chichi -= prior
else:
return -np.inf
for telescope in self.event.telescopes:
# Find the residuals of telescope observation regarding the parameters and model
residus = self.model_residuals(telescope, pyLIMA_parameters)
chichi += (residus ** 2).sum()
return -chichi / 2
[docs] def differential_evolution(self,pool):
""" The DE method. Differential evolution algorithm. The objective function is
:func:`chichi_differential_evolution`. The flux parameters are estimated through np.polyfit.
Based on the scipy.optimize.differential_evolution.
Look Storn & Price (1997) :
"Differential Evolution – A Simple and Efficient Heuristic for
global Optimization over Continuous Spaces"
:return: a tuple containing (fit_results, fit_covariance, computation_time)
:rtype: tuple
**WARNING** :
tol (relative standard deviation of the objective function) is set to 10^-4
popsize (the total number of individuals is :
popsize*number_of_paczynski_parameters)
is set to DE_population_size
mutation is set to (0.1, 1.5)
recombination is set to 0.7
These parameters can avoid the fit to properly converge (expected to be rare :)).
Just relaunch should be fine.
"""
starting_time = python_time.time()
if pool:
worker = pool.map
else:
worker = 1
differential_evolution_estimation = scipy.optimize.differential_evolution(
self.chichi_differential_evolution,
bounds=self.model.parameters_boundaries,
mutation=(0.5,1.5), popsize=int(self.DE_population_size), maxiter=100000, tol=0.0,
atol=1, strategy='rand1bin',
recombination=0.6, polish=True, init='latinhypercube',
disp=True,workers = worker,
)
# paczynski_parameters are all parameters to compute the model, excepted the telescopes fluxes.
paczynski_parameters = differential_evolution_estimation['x'].tolist()
print('DE converge to objective function : f(x) = ', str(differential_evolution_estimation['fun']))
print('DE converge to parameters : = ', differential_evolution_estimation['x'].astype(str))
self.DE_population = np.array(self.DE_population)
# Construct the guess for the LM method. In principle, guess and outputs of the LM
# method should be very close.
number_of_data = self.event.total_number_of_data_points()
if number_of_data <= (len(self.model.model_dictionnary)):
print("You do not have enough data points to use LM method to estimate the covariance matrix." \
"The covariance matrix is set to 0.0. please switch to MCMC if you need errors estimation.")
fit_results = paczynski_parameters + self.find_fluxes(paczynski_parameters, self.model) + \
[differential_evolution_estimation['fun']]
fit_covariance = np.zeros((len(paczynski_parameters) + 2 * len(self.event.telescopes),
len(paczynski_parameters) + 2 * len(self.event.telescopes)))
else:
self.guess = paczynski_parameters + self.find_fluxes(paczynski_parameters, self.model)
fit_results, fit_covariance, fit_time = self.trust_region_reflective()
computation_time = python_time.time() - starting_time
print(sys._getframe().f_code.co_name, ' : Differential evolution fit SUCCESS')
return fit_results, fit_covariance, computation_time
[docs] def chichi_differential_evolution(self, fit_process_parameters):
"""Return the chi^2 for the DE method.
:param list fit_process_parameters: the model parameters ingested by the correpsonding
fitting routine.
:returns: the chi^2
:rtype: float
"""
pyLIMA_parameters = self.model.compute_pyLIMA_parameters(fit_process_parameters)
chichi = 0.0
prior = microlpriors.priors_on_models(pyLIMA_parameters, self.model, binary_regime=self.binary_regime)
if prior != np.inf:
chichi += prior
for telescope in self.event.telescopes:
# Find the residuals of telescope observation regarding the parameters and model
residus = self.model_residuals(telescope, pyLIMA_parameters)
chichi += (residus ** 2).sum()
else:
chichi = np.inf
self.DE_population.append(fit_process_parameters.tolist() + [chichi])
return chichi
[docs] def lmarquardt(self):
"""The LM method. This is based on the Levenberg-Marquardt algorithm:
"A Method for the Solution of Certain Problems in Least Squares"
Levenberg, K. Quart. Appl. Math. 2, 1944, p. 164-168
"An Algorithm for Least-Squares Estimation of Nonlinear Parameters"
Marquardt, D. SIAM J. Appl. Math. 11, 1963, p. 431-441
Based on scipy.optimize.leastsq python routine, which is based on MINPACK's lmdif and
lmder
algorithms (fortran based).
The objective function is :func:`residuals_LM`.
The starting point parameters are self.guess.
the Jacobian is given by :func:`LM_Jacobian`.
The fit is performed on all parameters : Paczynski parameters and telescopes fluxes.
:return: a tuple containing (fit_results, covariance_matrix, computation_time)
:rtype: tuple
**WARNING**:
ftol (relative error desired in the sum of square) is set to 10^-6
maxfev (maximum number of function call) is set to 50000
These limits can avoid the fit to properly converge (expected to be rare :))
"""
starting_time = python_time.time()
# use the analytical Jacobian (faster) if no second order are present, else let the
# algorithm find it.
if self.guess == []:
self.guess = self.initial_guess()
n_data = 0
for telescope in self.event.telescopes:
n_data = n_data + telescope.n_data('flux')
n_parameters = len(self.model.model_dictionnary)
if self.model.Jacobian_flag == 'OK':
lmarquardt_fit = scipy.optimize.leastsq(self.residuals_LM, self.guess, maxfev=50000,
Dfun=self.LM_Jacobian, col_deriv=0, full_output=1, ftol=10 ** -8,
xtol=10 ** -10, gtol=10 ** -10)
fit_result = lmarquardt_fit[0].tolist()
fit_result.append(microltoolbox.chichi(self.residuals_LM, lmarquardt_fit[0]))
try:
# Try to extract the covariance matrix from the lmarquard_fit output
covariance_matrix = lmarquardt_fit[1] * fit_result[-1] / (n_data - n_parameters)
except:
covariance_matrix = np.zeros((len(self.model.model_dictionnary),
len(self.model.model_dictionnary)))
else:
lmarquardt_fit = scipy.optimize.least_squares(self.residuals_LM, self.guess, method='lm', x_scale='jac', ftol=10 ** -10,
xtol=10 ** -10, gtol=10 ** -10,
)
fit_result = lmarquardt_fit['x'].tolist()
fit_result.append(microltoolbox.chichi(self.residuals_LM, lmarquardt_fit['x']))
try:
# Try to extract the covariance matrix from the lmarquard_fit output
jacobian = lmarquardt_fit['jac']
covariance_matrix = np.linalg.pinv(np.dot(jacobian.T, jacobian))
except:
covariance_matrix = np.zeros((len(self.model.model_dictionnary),
len(self.model.model_dictionnary)))
computation_time = python_time.time() - starting_time
# import pdb; pdb.set_trace()
print(sys._getframe().f_code.co_name, ' : Levenberg_marquardt fit SUCCESS')
print(fit_result)
return fit_result, covariance_matrix, computation_time
[docs] def residuals_LM(self, fit_process_parameters):
"""The normalized residuals associated to the model and parameters.
:param list fit_process_parameters: the model parameters ingested by the correpsonding
fitting routine.
:return: a numpy array which represents the residuals_i for each telescope,
residuals_i=(data_i-model_i)/sigma_i
:rtype: array_like
The sum of square residuals gives chi^2.
"""
pyLIMA_parameters = self.model.compute_pyLIMA_parameters(fit_process_parameters)
prior = microlpriors.priors_on_models(pyLIMA_parameters, self.model, binary_regime=self.binary_regime)
if prior == np.inf:
datalength = [i.n_data() for i in self.event.telescopes]
return np.array([np.inf]*int(np.sum(datalength)))
residuals = np.array([])
for telescope in self.event.telescopes:
# Find the residuals of telescope observation regarding the parameters and model
residus = self.model_residuals(telescope, pyLIMA_parameters)
residuals = np.append(residuals, residus)
if prior != np.inf:
residuals += prior/len(residuals)
# print python_time.time()-start
return residuals
[docs] def LM_Jacobian(self, fit_process_parameters):
"""Return the analytical Jacobian matrix, if requested by method LM.
Available only for PSPL and FSPL without second_order.
:param list fit_process_parameters: the model parameters ingested by the correpsonding
fitting routine.
:return: a numpy array which represents the jacobian matrix
:rtype: array_like
"""
pyLIMA_parameters = self.model.compute_pyLIMA_parameters(fit_process_parameters)
count = 0
# import pdb;
# pdb.set_trace()
for telescope in self.event.telescopes:
if count == 0:
_jacobi = self.model.model_Jacobian(telescope, pyLIMA_parameters)
else:
_jacobi = np.c_[_jacobi, self.model.model_Jacobian(telescope, pyLIMA_parameters)]
count += 1
# The objective function is : (data-model)/errors
_jacobi = -_jacobi
jacobi = _jacobi[:-2]
# Split the fs and g derivatives in several columns correpsonding to
# each observatories
start_index = 0
dresdfs = _jacobi[-2]
dresdg = _jacobi[-1]
for telescope in self.event.telescopes:
derivative_fs = np.zeros((len(dresdfs)))
derivative_g = np.zeros((len(dresdg)))
index = np.arange(start_index, start_index + len(telescope.lightcurve_flux[:, 0]))
derivative_fs[index] = dresdfs[index]
derivative_g[index] = dresdg[index]
jacobi = np.r_[jacobi, np.array([derivative_fs, derivative_g])]
start_index = index[-1] + 1
return jacobi.T
[docs] def trust_region_reflective(self):
starting_time = python_time.time()
# use the analytical Jacobian (faster) if no second order are present, else let the
# algorithm find it.
if self.guess == []:
self.guess = self.initial_guess()
bounds_min = [i[0] for i in self.model.parameters_boundaries] + [0, -np.inf] * len(self.event.telescopes)
bounds_max = [i[1] for i in self.model.parameters_boundaries] + [np.inf, np.inf] * len(self.event.telescopes)
if self.model.Jacobian_flag == 'OK':
trf_fit = scipy.optimize.least_squares(self.residuals_LM, self.guess, max_nfev=50000,
jac=self.LM_Jacobian, bounds=(bounds_min, bounds_max), ftol=10 ** -6,
xtol=10 ** -10, gtol=10 ** -5)
else:
trf_fit = scipy.optimize.least_squares(self.residuals_LM, self.guess, max_nfev=50000,
bounds=(bounds_min, bounds_max), ftol=10 ** -6,
xtol=10 ** -10, gtol=10 ** -5)
computation_time = python_time.time() - starting_time
fit_result = np.copy(trf_fit['x']).tolist()
fit_result += [2 * trf_fit['cost']]
try:
jacobian = trf_fit['jac']
except:
jacobian = self.LM_Jacobian(fit_result)
try:
covariance_matrix = np.linalg.pinv(np.dot(jacobian.T, jacobian))
except:
covariance_matrix = np.zeros((len(self.model.model_dictionnary),
len(self.model.model_dictionnary)))
n_data = 0
for telescope in self.event.telescopes:
n_data = n_data + telescope.n_data('flux')
n_parameters = len(self.model.model_dictionnary)
covariance_matrix *= fit_result[-1] / (n_data - n_parameters)
print(sys._getframe().f_code.co_name, ' : TRF fit SUCCESS')
print(fit_result)
return fit_result, covariance_matrix, computation_time
[docs] def chichi_telescopes(self, fit_process_parameters):
"""Return a list of chi^2 (float) for individuals telescopes.
:param list fit_process_parameters: the model parameters ingested by the correpsonding
fitting routine.
:returns: the chi^2 for each telescopes
:rtype: list
"""
residuals = self.residuals_LM(fit_process_parameters)
chichi_list = []
start_index = 0
for telescope in self.event.telescopes:
chichi_list.append(
(residuals[start_index:start_index + len(telescope.lightcurve_flux)] ** 2).sum())
start_index += len(telescope.lightcurve_flux)
return chichi_list
[docs] def model_residuals(self, telescope, pyLIMA_parameters):
""" Compute the residuals of a telescope lightcurve according to the model.
:param object telescope: a telescope object. More details in telescopes module.
:param object pyLIMA_parameters: object containing the model parameters, see microlmodels for more details
:return: the residuals in flux, the priors
:rtype: array_like, float
"""
lightcurve = telescope.lightcurve_flux
flux = lightcurve[:, 1]
errflux = lightcurve[:, 2]
microlensing_model = self.model.compute_the_microlensing_model(telescope,pyLIMA_parameters)
residuals = (flux - microlensing_model[0]) / errflux
return residuals
[docs] def all_telescope_residuals(self, pyLIMA_parameters):
""" Compute the residuals of all telescopes according to the model.
:param object pyLIMA_parameters: object containing the model parameters, see microlmodels for more details
:return: the residuals in flux,
:rtype: list, a list of array of residuals in flux
"""
residuals = []
for telescope in self.event.telescopes:
# Find the residuals of telescope observation regarding the parameters and model
residus = self.model_residuals(telescope, pyLIMA_parameters)
# no prior here
residuals.append(residus)
# print python_time.time()-start
return residuals
[docs] def find_fluxes(self, fit_process_parameters, model):
"""Find telescopes flux associated (fs,g) to the model. Used for initial_guess and LM
method.
:param list fit_process_parameters: the model parameters ingested by the correpsonding fitting
routine.
:param object model: a microlmodels which you want to compute the fs,g parameters.
:return: a list of tuple with the (fs,g) telescopes flux parameters.
:rtype: list
"""
telescopes_fluxes = []
pyLIMA_parameters = model.compute_pyLIMA_parameters(fit_process_parameters)
for telescope in self.event.telescopes:
flux = telescope.lightcurve_flux[:, 1]
ml_model, f_source, f_blending = model.compute_the_microlensing_model(telescope, pyLIMA_parameters)
# Prior here
if f_source < 0:
telescopes_fluxes.append(np.min(flux))
telescopes_fluxes.append(0.0)
else:
telescopes_fluxes.append(f_source)
telescopes_fluxes.append(f_blending)
return telescopes_fluxes
[docs] def grids(self):
""" Compute models on a grid. ON CONSTRUCTION.
"""
parameters_on_the_grid = []
for parameter_name in self.fix_parameters_dictionnary:
parameter_range = self.model.parameters_boundaries[self.model.model_dictionnary[parameter_name]]
parameters_on_the_grid.append(
np.linspace(parameter_range[0], parameter_range[1],
self.grid_resolution))
hyper_grid = self.construct_the_hyper_grid(parameters_on_the_grid)
self.new_parameters_boundaries = self.redefine_parameters_boundaries()
if self.pool is not None:
computational_map = self.pool.map
else:
computational_map = map
grid_results = list(
computational_map(emcee.ensemble._function_wrapper(self.optimization_on_grid_pixel, args=[], kwargs={}),
hyper_grid))
return np.array(grid_results)
[docs] def optimization_on_grid_pixel(self, grid_pixel_parameters):
differential_evolution_estimation = scipy.optimize.differential_evolution(
self.chichi_grids,
bounds=self.new_parameters_boundaries, args=tuple(grid_pixel_parameters.tolist()),
mutation=(0.8, 1.2), popsize=10, maxiter=1000,
tol=0.0, atol=0.1, strategy='best1bin',
recombination=0.2, polish=True,
disp=True
)
best_parameters = self.reconstruct_fit_process_parameters(differential_evolution_estimation['x'],
grid_pixel_parameters)
best_parameters += [differential_evolution_estimation['fun']]
print(sys._getframe().f_code.co_name, ' Grid step on ' + str(grid_pixel_parameters.tolist()).strip(
'[]') + ' converge to f(x) = ' + str(differential_evolution_estimation['fun']))
return best_parameters
[docs] def chichi_grids(self, moving_parameters, *fix_parameters):
""" Compute chi^2. ON CONSTRUCTION.
"""
fit_process_parameters = self.reconstruct_fit_process_parameters(moving_parameters, fix_parameters)
pyLIMA_parameters = self.model.compute_pyLIMA_parameters(fit_process_parameters)
chichi = 0.0
for telescope in self.event.telescopes:
# Find the residuals of telescope observation regarding the parameters and model
residus = self.model_residuals(telescope, pyLIMA_parameters)
chichi += (residus ** 2).sum()
return chichi
[docs] def reconstruct_fit_process_parameters(self, moving_parameters, fix_parameters):
""" Reconstruc parameters. ON CONSTRUCTION.
"""
fit_process_parameters = []
for key in list(self.model.model_dictionnary.keys())[:len(self.model.parameters_boundaries)]:
if key in self.moving_parameters_dictionnary:
fit_process_parameters.append(moving_parameters[self.moving_parameters_dictionnary[key]])
else:
fit_process_parameters.append(fix_parameters[self.fix_parameters_dictionnary[key]])
return fit_process_parameters
[docs] def redefine_parameters_boundaries(self):
""" Recompute the parameters boundaries. ON CONSTRUCTION.
"""
parameters_boundaries = []
self.moving_parameters_dictionnary = {}
count = 0
for indice, key in enumerate(list(self.model.model_dictionnary.keys())[:len(self.model.parameters_boundaries)]):
if key not in self.fix_parameters_dictionnary.keys():
parameters_boundaries.append(self.model.parameters_boundaries[indice])
self.moving_parameters_dictionnary[key] = count
count += 1
return parameters_boundaries
[docs] def construct_the_hyper_grid(self, parameters):
"""Define the grid. ON CONSTRUCTION.
"""
params = map(np.asarray, parameters)
grid = np.broadcast_arrays(*[x[(slice(None),) + (None,) * i] for i, x in enumerate(params)])
reformate_grid = np.vstack(grid).reshape(len(parameters), -1).T
return reformate_grid
[docs] def produce_outputs(self):
""" Produce the standard outputs for a fit.
More details in microloutputs module.
"""
outputs = microloutputs.fit_outputs(self)
self.outputs = outputs
[docs] def produce_fit_statistics(self):
""" Produce the standard outputs for a fit.
More details in microloutputs module.
"""
stats_outputs = microloutputs.statistical_outputs(self)
self.stats_outputs = stats_outputs
[docs] def produce_pdf(self, output_directory='./'):
""" ON CONSTRUCTION
"""
microloutputs.pdf_output(self, output_directory)
[docs] def produce_latex_table_results(self, output_directory='./'):
""" ON CONSTRUCTION
"""
microloutputs.latex_output(self, output_directory)