import numpy as np
from nemf import caller
from nemf import worker
from nemf.interaction_functions import *
from copy import deepcopy
import yaml
# Model_Classes
[docs]class model_class:
# initialization methods
# they are only used when a new model_class is created
def __init__(self,model_path,ref_data_path=None):
self.init_sys_config = worker.initialize_ode_system(model_path)
self.sanity_check_input()
# makes a copy of the system config for further usage
self.compartment = deepcopy(
self.init_sys_config['compartment'])
self.interactions = deepcopy(
self.init_sys_config['interactions'])
self.configuration = deepcopy(
self.init_sys_config['configuration'])
# imports reference data
self.load_reference_data(ref_data_path)
# imports sinks and sources
if ('sinks' in self.configuration) and ('sources' in self.configuration):
self.fetch_index_of_source_and_sink()
# imports alternative interaction names and adds them to the namespace
if ('alternative_interaction_names' in self.configuration):
self.init_alternative_interaction_names()
# imports constraints used in the fitting process
if 'constraints_path' in self.configuration:
self.load_constraints(self.configuration['constraints_path'])
[docs] def load_reference_data(self,ref_data_path=None,**kwargs):
""" Loads reference data used in model optimization from file
Either, the path to the reference data is provided in the yaml
configuration file or passed to this function.
Latter overwrites the path in the configuration file.
Parameters
----------
ref_data_path : string (optional)
path to the file containing the reference data
"""
if ref_data_path != None:
ref_data,ref_headers = \
worker.import_reference_data(ref_data_path,**kwargs)
if len(np.shape(ref_data)) == 1:
ref_data = np.reshape(ref_data,(1,len(ref_data)))
self.reference_data = ref_data
self.reference_headers = ref_headers
print('Reference data set has been added.')
elif 'ref_data_path' in self.configuration:
ref_data_path = self.configuration['ref_data_path']
ref_data,ref_headers = worker.import_reference_data(ref_data_path)
if len(np.shape(ref_data)) == 1:
ref_data = np.reshape(ref_data,(1,len(ref_data)))
self.reference_data = ref_data
self.reference_headers = ref_headers
print('Reference data set has been added.')
else:
self.reference_data = None
self.reference_headers = None
print('No reference data has been provided')
def prep_ref_data(self):
""" prepares the reference data set for use in the objective funtion
Returns
-------
da : numpy.array (DataArray)
first column contains the posix time stamps for each observation
the remaining column contain the data point for the corrosponding
compartments
compart_of_interest : list of integers
contains the indices of the compartments in the model which are also
represented in the reference data
"""
ref_data = self.reference_data
ref_names = self.reference_headers
# datetime stamp column of the reference data
ref_timestamps = ref_data[:,0]
t_eval = list(ref_timestamps)
t_eval = np.reshape(t_eval,(len(t_eval),1))
# clean integers from data names (those can't be compartments)
for ii,item in enumerate(ref_names):
delete_columns = []
if type(item) == int:
delete_columns.append(ii)
ref_data = np.delete(ref_data,delete_columns,axis=1)
ref_names = \
[item for ii,item in enumerate(ref_names) if ii not in delete_columns]
# fetching the indexes of those compartments that are also modelled
ref_idx = self.fetch_index_of_compartment(ref_names)
# fetching the columns out of the ref. data set are also modelled
columns_of_interest = \
[ii for (ii,item) in enumerate(ref_idx) if type(item) == int]
# fetching indices of compartments that are present in the ref. data set
compart_of_interest = \
[item for (ii,item) in enumerate(ref_idx) if type(item) == int]
# reference data sliced down to the columns that contain the reference data
# of the compartments that are modelled
ref_data = ref_data[:,columns_of_interest]
da = np.concatenate((t_eval,ref_data),axis=1)
return da, compart_of_interest
def load_constraints(self,path):
""" Loads constraints from python file
Parameters
----------
path : string
path to python file
"""
self.configuration['constraints'] = \
worker.import_constraints(path)
## Export
def export_to_yaml(self,path='current_model.yml'):
""" Writes model compartments and interaction configuration to yaml-file
Parameters
----------
path : string (optional)
Path to output file. Writes into 'current_model.yml if no path is
given.
"""
# reduces the model to the minimum
export_model = {}
export_model['compartment'] = deepcopy(self.compartment)
export_model['interactions'] = deepcopy(self.interactions)
# transforms numpy objects to regular python types.
# this is necessary as the numpy object are dumped with all the extra
# unnecessary class information.
compart = export_model['compartment']
for item in compart:
compart[item]['value'] = float(compart[item]['value'])
inter = export_model['interactions']
for item in list(inter):
for ii,fkt in enumerate(list(inter[item])):
for jj,val in enumerate(fkt['parameters']):
if type(val) != int:
inter[item][ii]['parameters'][jj] = float(val)
# writes dict to file
with open(path, 'w') as yaml_file:
dump = yaml.dump(export_model,
default_flow_style = False,sort_keys=False)
yaml_file.write(dump)
def sanity_check_input(self):
"""
checks for obvious errors in the configuration file.
passing this test doesn't guarante a correct configuration.
"""
unit = self.init_sys_config
# checks if compartment is well defined
name = "Model configuration "
worker.assert_if_exists_non_empty(
'compartment',unit,reference='compartment')
assert (len(list(unit['compartment'])) > 1), \
name + "only contains a single compartment"
## check if all compartment are well defined
for item in list(unit['compartment']):
worker.assert_if_non_empty(
item,unit['compartment'],item,reference='state')
worker.assert_if_exists('optimise',unit['compartment'][item],item)
worker.assert_if_exists_non_empty('value',unit['compartment'][item],
item,'value')
worker.assert_if_exists('optimise',unit['compartment'][item],item)
# checks if interactions is well defined
worker.assert_if_exists_non_empty('interactions',unit,'interactions')
## check if all interactions are well defined
for item in list(unit['interactions']):
for edge in unit['interactions'][item]:
assert edge != None, \
name + "interaction {} is empty".format(item)
worker.assert_if_exists_non_empty('fkt',edge,item,)
worker.assert_if_exists('parameters',edge,item)
for interaction in unit['interactions']:
for fkt in unit['interactions'][interaction]:
allowed_items = ['fkt', 'parameters', 'optimise']
for item in list(fkt):
assert item in allowed_items, \
f"Invalid key: {item} in interaction {interaction}"
# checks if configuration is well defined
worker.assert_if_exists_non_empty('configuration', unit)
required_elements = ['time_evo_max']
for element in required_elements:
worker.assert_if_exists_non_empty(element,unit['configuration'])
def init_alternative_interaction_names(self):
""" add renamed copies defined in the yaml config to globals() """
alt_names = self.configuration['alternative_interaction_names']
for new_func in list(alt_names):
orig_func = globals()[alt_names[new_func]]
globals()[new_func] = orig_func
def initialize_log(self,maxiter):
max_iter = maxiter + 1
fit_parameter = self.fetch_to_optimize_args()[0][1]
param_log = np.full((max_iter,len(fit_parameter)), np.nan)
cost_log = np.full( (max_iter), np.nan )
log_dict = {'parameters': param_log,
'cost': cost_log,
'iter_idx': 0}
self.log = log_dict
# Logging
def construct_callback(self,method='SLSQP',debug=False):
model = self
if method == 'trust-constr':
def callback(xk, opt):# -> bool
if debug: print(f'xk: \n{xk}')
model.to_log(xk,cost=opt.fun)
else:
def callback(xk):# -> bool
if debug: print(f'xk: \n{xk}')
model.to_log(xk)
return callback
def to_log(self,parameters,cost=None):
#current monte sample
idx = self.log['iter_idx']
self.log['parameters'][idx] = parameters
self.log['cost'][idx] = cost
self.log['iter_idx'] += 1
# Parsing
def to_grad_method(self):
""" fetches the parameters necessary for the gradient descent method
Returns: free_parameters, constraints """
free_parameters = []
constraints = []
labels = []
for ii in self.compartment:
if self.compartment[ii]['optimise'] is not None:
labels.append('{}'.format(ii))
value = self.compartment[ii]['value']
lower_bound = self.compartment[ii]['optimise']['lower']
upper_bound = self.compartment[ii]['optimise']['upper']
free_parameters.append(value)
constraints.append([lower_bound,upper_bound])
for ii in self.interactions:
#function
for item in self.interactions[ii]:
#parameters
if 'optimise' in item:
if item['optimise'] is not None:
for jj,elements in enumerate(item['optimise']):
labels.append('{},fkt: {} #{}'.format(
ii,item['fkt'],elements['parameter_no']))
value = item['parameters'][jj]
lower_bound = elements['lower']
upper_bound = elements['upper']
free_parameters.append(value)
constraints.append([lower_bound,upper_bound])
free_parameters = np.array(free_parameters)
constraints = np.array(constraints)
return free_parameters, constraints, labels
def update_system_with_parameters(self, parameters):
values = list(parameters)
for ii in self.compartment:
if self.compartment[ii]['optimise'] is not None:
self.compartment[ii]['value'] = values.pop(0)
for ii in self.interactions:
#function
for item in self.interactions[ii]:
#parameters
if 'optimise' in item:
if item['optimise'] is not None:
for element in item['optimise']:
item['parameters'][element['parameter_no']-1] = \
values.pop(0)
# Fetching
# they retrieve some information from the model and output them in a certain
# form. They mostly look through dicts to extract a subset of them
def fetch_constraints(self):
# placeholder for constraints generator
if 'constraints' in self.configuration:
return self.configuration['constraints']
else:
return None
def fetch_index_of_interaction(self):
""" gets the indices in the interaction matrix """
## separate row & column
interactions = list(self.interactions)
compartments = list(self.compartment)
interaction_index = interactions.copy()
for index,item in enumerate(interactions):
interaction_index[index] = item.split(':')
## parse them with the index
for index, item in enumerate(interaction_index):
interaction_index[index][0] = compartments.index(item[0])
interaction_index[index][1] = compartments.index(item[1])
return interaction_index
def fetch_index_of_source_and_sink(self):
if self.configuration['sources'] == None:
sources = None
idx_sources = []
else:
sources = list(self.configuration['sources'])
idx_sources = sources.copy()
for ii, item in enumerate(idx_sources):
idx_sources[ii] = list(self.compartment).index(item)
if self.configuration['sinks'] == None:
sinks = None
idx_sinks = []
else:
sinks = list(self.configuration['sinks'])
idx_sinks = sinks.copy()
for ii, item in enumerate(idx_sinks):
idx_sinks[ii] = list(self.compartment).index(item)
self.configuration['idx_sources'] = idx_sources
self.configuration['idx_sinks'] = idx_sinks
def fetch_index_of_compartment(self,parameters):
# add something to make sure all stanges to parameters stay internal
compartments = list(self.compartment)
for nn,entry in enumerate(parameters):
if (type(entry) == str) & (entry in list(self.compartment)):
#print(entry, compartments.index(entry))
parameters[nn] = compartments.index(entry)
return parameters
def fetch_to_optimize_args(self):
""" fetches the parameters necessary for the gradient descent method
Returns: free_parameters, constraints """
labels = []
idx_state = []; val_state = []; bnd_state = []
for ii,entry in enumerate(self.compartment):
if self.compartment[entry]['optimise'] is not None:
labels.append('{}'.format(entry))
idx_state.append(ii)
val_state.append(self.compartment[entry]['value'])
lower_bound = self.compartment[entry]['optimise']['lower']
upper_bound = self.compartment[entry]['optimise']['upper']
bnd_state.append([lower_bound,upper_bound])
idx_args = []; val_args = []; bnd_args = []
for ii,interaction in enumerate(self.interactions):
for jj,function in enumerate(self.interactions[interaction]):
if 'optimise' in function:
if function['optimise'] is not None:
for kk,parameter in enumerate(function['optimise']):
labels.append('{},fkt: {} #{}'.format(
interaction,function['fkt'],
parameter['parameter_no']))
current_idx_args = \
[ii,jj,parameter['parameter_no']-1]
current_val_args = \
self.fetch_arg_by_idx(current_idx_args)
lower_bound = parameter['lower']
upper_bound = parameter['upper']
idx_args.append(current_idx_args)
val_args.append(current_val_args)
bnd_args.append([lower_bound,upper_bound])
fit_indices = [idx_state,idx_args]
fit_param = val_state + val_args
bnd_param = bnd_state + bnd_args
return [fit_indices,fit_param,bnd_param], labels
def fetch_states(self):
states = []
compartment = self.compartment
for item in compartment:
states.append(compartment[item]['value'])
return states
def fetch_args(self):
args = []
for interactions in self.interactions:
args_edge = []
for edges in self.interactions[interactions]:
indexed_args = self.fetch_index_of_compartment(edges['parameters'])
args_edge.append(indexed_args)
args.append(args_edge)
return args
def fetch_param(self):
states = self.fetch_states()
args = self.fetch_args()
return [states,args]
def fetch_arg_by_idx(self,index):
args = self.fetch_args()
idx = index
arg = args[idx[0]][idx[1]][idx[2]]
return arg
def de_constructor(self):
# the benefit of constructing it like this is that:
# * we are able to get the signature f(x,args)
# * all non-(x,args) related objects are only evaluated once.
# however, this for looping is still super inefficient and a more
# vectorized object should be intended
# args is expected to have the same shape as set_of_functions
set_of_function = self.interactions
idx_interactions = self.fetch_index_of_interaction()
n_compartments = len(self.compartment)
def differential_equation(t,x,args): #(t,x) later
y = np.zeros((n_compartments,n_compartments))
for ii,functions in enumerate(set_of_function):
interaction = set_of_function[functions]
for jj,edge in enumerate(interaction):
kk,ll = idx_interactions[ii]
#print(f'{edge["fkt"]} \t\t flows into '+'
# {list(self\.compartment)[kk]} outof {list(self\.compartment)[ll]}')
flow = max(0,globals()[edge['fkt']](x,kk,*args[ii][jj]))
# flows into kk (outof ll)
y[kk,ll] += flow
# flow outof ll (into kk)
y[ll,kk] -= flow
return np.sum(y,axis=1)
return differential_equation
[docs]def import_interaction_functions(func):
""" Adds the functions from 'func' to the globals in models
func : list
List containing the functions that will be added to globals
"""
for item in func:
name = item.__name__
if name in globals():
print('Warning! A function with the same name as interaction '
+ f"function '{name}' is already known in globals!\n"
+ 'Function will be overwritten!')
globals()[name] = item