import warnings
from collections import OrderedDict
from copy import deepcopy
from numpy import ndarray, array, delete, zeros, vstack, hstack, nonzero, append, int_, int8, int16, \
int32, int64
from .linear_systems import SteadyStateLinearSystem, VAR_CONTINUOUS, make_irreversible_model
from .optimization import LinearSystemOptimizer, CORSOSolution, GIMMESolution
INT_TYPES = (int, int_, int8, int16, int32, int64)
LARGE_NUMBER = 10e6 - 1
SMALL_NUMBER = 1e-6
BACKPREFIX = 'flux_backwards'
[docs]def make_irreversible_model_raven(S, lb, ub, inverse_reverse_reactions=False):
irrev = array([i for i in range(S.shape[1]) if not (lb[i] < 0)])
rev = array([i for i in range(S.shape[1]) if i not in irrev])
Sr = S[:, rev]
offset = S.shape[1]
rx_mapping = {k: v if k in irrev else [v] for k, v in dict(zip(range(offset), range(offset))).items()}
for i, rx in enumerate(rev):
rx_mapping[rx].append(offset + i)
rx_mapping = OrderedDict([(k, tuple(v)) if isinstance(v, list) else (k, v) for k, v in rx_mapping.items()])
S_new = hstack([S, -Sr])
nlb, nub = zeros(S_new.shape[1]), zeros(S_new.shape[1])
for orig_rx, new_rx in rx_mapping.items():
if isinstance(new_rx, tuple):
nlb[new_rx[0]] = lb[orig_rx] if lb[orig_rx] >= 0 else 0 # not necessary since they're already 0
nub[new_rx[0]] = ub[orig_rx] if ub[orig_rx] >= 0 else 0 # not necessary since they're already 0
nlb[new_rx[1]] = ub[orig_rx] * -1 if (ub[orig_rx] * -1) >= 0 else 0
nub[new_rx[1]] = lb[orig_rx] * -1 if (lb[orig_rx] * -1) >= 0 else 0
else:
nlb[new_rx] = lb[orig_rx]
nub[new_rx] = ub[orig_rx]
return S_new, nlb, nub, rx_mapping
# def make_irreversible_model(S, lb, ub):
# irrev = array([i for i in range(S.shape[1]) if not (lb[i] < 0 and ub[i] > 0)])
# rev = array([i for i in range(S.shape[1]) if i not in irrev])
# Sr = S[:, rev]
# offset = S.shape[1]
# rx_mapping = {k: v if k in irrev else [v] for k, v in dict(zip(range(offset), range(offset))).items()}
# for i, rx in enumerate(rev):
# rx_mapping[rx].append(offset + i)
# rx_mapping = OrderedDict([(k, tuple(v)) if isinstance(v, list) else (k, v) for k, v in rx_mapping.items()])
#
# S_new = hstack([S, -Sr])
# nlb, nub = zeros(S_new.shape[1]), zeros(S_new.shape[1])
# for orig_rx, new_rx in rx_mapping.items():
# if isinstance(new_rx, tuple):
# nub[new_rx[0]] = abs(lb[orig_rx])
# nub[new_rx[1]] = ub[orig_rx]
# else:
# nlb[new_rx], nub[new_rx] = lb[orig_rx], ub[orig_rx]
#
# return S_new, nlb, nub, rx_mapping
# def test_irreversible_conversions():
# S = zeros([3,3])
# lb, ub = ([0,1000],[-1000, 1000],[-1000, 0])
[docs]class ConstraintBasedModel(object):
def __init__(self, S, thermodynamic_constraints, reaction_names=None, metabolite_names=None, optimizer=True,
solver=None):
def __validate_args():
# stoichiometric matrix
if isinstance(S, ndarray):
m, n = S.shape
elif isinstance(S, tuple) or isinstance(S, list):
m, n = len(S), len(S[0])
else:
raise TypeError('S matrix is not an ndarray or list')
thermodynamic_types = set(map(type, thermodynamic_constraints))
allowed_types = {list, tuple, bool}
assert len(thermodynamic_types & allowed_types) == len(thermodynamic_types), 'Invalid bound type found.'
reactions_ok = reaction_names is None or len(reaction_names) == n and isinstance(reaction_names, list)
metabolites_ok = metabolite_names is None or len(metabolite_names) == m and isinstance(metabolite_names,
list)
assert reactions_ok, 'Reaction name dimensions do not match the supplied stoichiometrix matrix'
assert metabolites_ok, 'Metabolite name dimensions do not match the supplied stoichiometrix matrix'
return m, n
self.solver = solver
m, n = __validate_args()
# self.__S = self.__parse_stoichiometric_matrix(S)
self.__S = array(S)
self.bounds = self.__interpret_bounds(thermodynamic_constraints)
self.original_bounds = deepcopy(self.bounds)
self.reaction_names, self.metabolite_names = deepcopy(reaction_names), deepcopy(metabolite_names)
self.__update_decoder_map()
self.c = None
self.model = None
if optimizer:
self.initialize_optimizer()
def __getstate__(self):
return self.__dict__
def __update_decoder_map(self):
self.reaction_decoder_map = self.metabolite_decoder_map = None
if self.reaction_names:
self.reaction_decoder_map = dict(zip(self.reaction_names, list(range(self.__S.shape[1]))))
if self.metabolite_names:
self.metabolite_decoder_map = dict(zip(self.metabolite_names, list(range(self.__S.shape[0]))))
self.map_labels = {'reaction': self.reaction_decoder_map,
'metabolite': self.metabolite_decoder_map}
def __interpret_bounds(self, thermodynamic_constraints):
def interpret_bound(bound_obj):
i, bound = bound_obj
# value = [None, None]
if isinstance(bound, bool):
if bound:
value = 0, LARGE_NUMBER
else:
value = -LARGE_NUMBER, LARGE_NUMBER
else:
value = bound
for i, v in enumerate(value):
if i == 0 and v is None:
value[i] = -LARGE_NUMBER
elif i == 1 and v is None:
value[i] = LARGE_NUMBER
if value[0] > value[1]:
warnings.warn('Lower bound is >= than upper bound for reaction ' + str(i) + '. Reversing...')
value = value[::-1]
return value
return list(map(interpret_bound, enumerate(thermodynamic_constraints)))
[docs] def flux_limits(self, reaction):
self.set_objective({reaction: 1}, True)
min_flux = self.optimize().objective_value()
self.set_objective({reaction: 1}, False)
max_flux = self.optimize().objective_value()
return min_flux, max_flux
[docs] def initialize_optimizer(self):
lb, ub = self.get_bounds_as_list()
self.model = SteadyStateLinearSystem(self.__S, lb, ub, var_names=self.reaction_names, solver=self.solver)
self.model.build_problem()
self.optimizer = LinearSystemOptimizer(self.model, build=False)
[docs] def decode_index(self, index, labels):
if type(index) in INT_TYPES:
return index
elif labels is not None:
return self.map_labels[labels][index]
else:
raise IndexError('Could not find specified index "' + str(index) + '".')
[docs] def get_bounds_as_list(self):
return list(zip(*self.bounds))
[docs] def is_reversible_reaction(self, index):
lb, ub = self.bounds[self.decode_index(index, 'reaction')]
return (lb < 0 and ub > 0)
[docs] def get_stoichiometric_matrix(self, rows=None, columns=None):
row_index = [self.decode_index(i, 'metabolite') for i in rows] if rows else None
col_index = [self.decode_index(i, 'reaction') for i in columns] if columns else None
if rows and columns:
return self.__S[row_index, col_index]
elif rows:
return self.__S[row_index, :]
elif columns:
return self.__S[:, col_index]
else:
return self.__S
[docs] def set_stoichiometric_matrix(self, values, rows=None, columns=None):
row_index = [self.decode_index(i, 'metabolite') for i in rows] if rows else None
col_index = [self.decode_index(i, 'reaction') for i in columns] if columns else None
if rows and columns:
self.__S[row_index, col_index] = values
elif rows:
self.__S[row_index, :] = values
elif columns:
self.__S[:, col_index] = values
else:
self.__S = values
row_index = row_index if row_index is not None else range(self.__S.shape[0])
col_index = col_index if col_index is not None else range(self.__S.shape[1])
if self.model:
constraints = [self.model.model.constraints[i] for i in
row_index] if row_index else self.model.model.constraints
vars = [self.model.model.variables[i] for i in col_index] if row_index else self.model.model.variables
self.model.populate_constraints_from_matrix(values, constraints=constraints, vars=vars)
[docs] def add_reaction(self, arg, bounds, name=None):
assert not name in self.reaction_names, 'Duplicate reaction name found!'
if isinstance(arg, dict):
col = zeros([self.__S.shape[0], 1])
for k, v in arg.items():
col[self.decode_index(k, 'metabolite'), 0] = v
elif isinstance(arg, ndarray):
if len(arg) == len(self.metabolite_names):
col = arg.reshape(self.__S.shape[0], 1)
else:
raise Exception('Numpy argument dimensions should be (', len(self.reaction_names), ',). Got', arg.shape,
'instead')
else:
raise ValueError('Invalid argument type: ', type(arg), '. Please supply an ndarray or dict instead.')
self.__S = hstack([self.__S, col])
self.reaction_names.append(name)
self.bounds.append(bounds)
self.__update_decoder_map()
if self.model:
self.model.add_columns_to_model(col, [name], [bounds[0]], [bounds[1]], VAR_CONTINUOUS)
[docs] def remove_reaction(self, index):
j = self.decode_index(index, 'reaction')
if not isinstance(index, int):
self.reaction_names.pop(j)
self.bounds.pop(j)
self.__S = delete(self.__S, [j], axis=1)
self.__update_decoder_map()
if self.model:
self.model.remove_from_model(j, 'var')
[docs] def make_irreversible(self):
lb, ub = self.get_bounds_as_list()
Sn, nlb, nub, rx_mapping = make_irreversible_model(self.__S, lb, ub)
rev = array([int(v[0]) for k, v in rx_mapping.items() if isinstance(v, (list, tuple))])
revnames = ['_'.join([name, BACKPREFIX]) for name in (array(self.reaction_names)[rev]).tolist()]
rnames = self.reaction_names + revnames
model = ConstraintBasedModel(Sn, list(zip(nlb, nub)), rnames, self.metabolite_names,
optimizer=self.model is not None, solver=self.solver)
return model, rx_mapping
[docs] def get_reaction_bounds(self, index):
return self.bounds[self.decode_index(index, 'reaction')]
[docs] def set_reaction_bounds(self, index, **kwargs):
true_idx = self.decode_index(index, 'reaction')
lb, ub = self.get_reaction_bounds(true_idx)
if 'lb' in kwargs:
lb = kwargs['lb']
if 'ub' in kwargs:
ub = kwargs['ub']
bound = (lb, ub)
self.bounds[true_idx] = bound
if 'temporary' in kwargs and kwargs['temporary'] == False:
self.original_bounds[self.decode_index(index, 'reaction')] = self.get_reaction_bounds(index)
if self.model:
var = self.model.model.variables[true_idx]
self.model.set_variable_bounds([var], [lb], [ub])
[docs] def set_objective(self, coef_dict, minimize=False):
if self.model:
if isinstance(coef_dict, dict):
f = zeros(self.__S.shape[1], )
self.c = f
for k, v in coef_dict.items():
self.c[self.decode_index(k, 'reaction')] = v
self.model.set_objective(self.c, minimize)
elif isinstance(coef_dict, ndarray):
self.model.set_objective(coef_dict, minimize)
else:
raise TypeError('`coef_dict` must either be a dict or an ndarray')
else:
raise Exception('Cannot set an objective on a model without the optimizer flag as True.')
[docs] def optimize(self, coef_dict=None, minimize=False):
cur_obj = self.model.model.objective
if coef_dict != None:
self.set_objective(coef_dict, minimize)
sol = self.optimizer.optimize()
if coef_dict != None:
self.model.model.objective = cur_obj
return sol
[docs] def revert_to_original_bounds(self):
for rx, bounds in zip(self.reaction_names, self.original_bounds):
clb, cub = self.get_reaction_bounds(rx)
lb, ub = bounds
if clb != lb or cub != ub:
self.set_reaction_bounds(rx, lb=lb, ub=ub)
[docs]class CORSOModel(ConstraintBasedModel):
def __init__(self, cbmodel, corso_element_names=('R_PSEUDO_CORSO', 'M_PSEUDO_CORSO'), solver=None):
if not cbmodel.model:
cbmodel.initialize_optimizer()
self.cbmodel = cbmodel
irrev_model, self.mapping = cbmodel.make_irreversible()
S = irrev_model.get_stoichiometric_matrix()
bounds = irrev_model.bounds
self.cost_index_mapping = zeros(S.shape[1], dtype=int_)
self.corso_rx, self.corso_mt = corso_element_names
super().__init__(S, bounds, irrev_model.reaction_names, irrev_model.metabolite_names, solver=solver)
self.add_metabolite(zeros(len(self.reaction_names)), self.corso_mt)
self.add_reaction(zeros(len(self.metabolite_names)), (0, 0), self.corso_rx)
self.original_bounds = deepcopy(self.bounds)
for orx, nrx in self.mapping.items():
if isinstance(nrx, int):
self.cost_index_mapping[nrx] = orx
else:
for nrx_split in nrx:
self.cost_index_mapping[nrx_split] = orx
[docs] def solve_original_model(self, of_dict, minimize=False):
self.cbmodel.set_objective(of_dict, minimize)
sol = self.cbmodel.optimize()
return sol
[docs] def set_costs(self, cost):
true_cost = cost[self.cost_index_mapping]
true_cost = append(true_cost, array([-1]))
self.set_stoichiometric_matrix(true_cost.reshape(1, len(true_cost)), rows=[self.corso_mt])
# def set_reaction_bounds(self, index, **kwargs):
# super().set_reaction_bounds(index, **kwargs)
# self.original_bounds[self.decode_index(index, 'reaction')] = self.get_reaction_bounds(index)
[docs] def set_corso_objective(self):
self.set_objective({self.corso_rx: 1}, True)
[docs] def optimize_corso(self, cost, of_dict, minimize=False, constraint=1, constraintby='val', eps=1e-6, flux1=None):
if flux1 is None:
flux1 = self.solve_original_model(of_dict, minimize)
if abs(flux1.objective_value()) < eps:
return flux1, flux1
f1_x = flux1.x()
if constraintby == 'perc':
# f1_f = flux1.x()[self.cbmodel.c != 0]
f1_f = {idx: f1_x[idx] * (constraint / 100) for idx in of_dict.keys()}
elif constraintby == 'val':
if (flux1.objective_value() < constraint and not minimize) or (
flux1.objective_value() > constraint and minimize):
raise Exception('Objective flux is not sufficient for the the set constraint value.')
else:
f1_f = {idx: constraint for idx in of_dict.keys()}
else:
raise Exception('Invalid constraint options.')
self.set_reaction_bounds(self.corso_rx, lb=0, ub=1e20)
# corso_of_dict = deepcopy(of_dict)
# corso_of_dict[self.corso_rx] = 1
self.set_costs(cost)
for rx in f1_f.keys():
true_idx = self.decode_index(rx, 'reaction')
involved = self.mapping[true_idx]
fluxval = f1_f[rx] # if isinstance(f1_f, ndarray) else f1_f
if isinstance(involved, (int, int_)):
self.set_reaction_bounds(involved, lb=fluxval, ub=fluxval, temporary=True)
else:
self.set_reaction_bounds(involved[0], lb=fluxval, ub=fluxval, temporary=True)
self.set_reaction_bounds(involved[1], lb=0, ub=0, temporary=True)
self.set_objective({self.corso_rx: 1}, True)
sol = self.optimize()
self.revert_to_original_bounds()
return flux1, CORSOSolution(flux1, sol, sum([of_dict[k] * f1_f[k] for k in of_dict.keys()]),
self.cost_index_mapping, self.cbmodel.reaction_names, eps=eps)
[docs]class GIMMEModel(ConstraintBasedModel):
def __init__(self, cbmodel, solver=None):
self.cbmodel = cbmodel
if not self.cbmodel.model:
self.cbmodel.initialize_optimizer()
irrev_model, self.mapping = cbmodel.make_irreversible()
S = irrev_model.get_stoichiometric_matrix()
bounds = irrev_model.bounds
super().__init__(S, bounds, irrev_model.reaction_names, irrev_model.metabolite_names, solver=solver,
optimizer=True)
def __adjust_objective_to_irreversible(self, objective_dict):
obj_dict = {}
for k, v in objective_dict.items():
irrev_map = self.mapping[self.cbmodel.decode_index(k, 'reaction')]
if isinstance(irrev_map, (list, tuple)):
for i in irrev_map:
obj_dict[i] = v
else:
obj_dict[irrev_map] = v
return obj_dict
def __adjust_expression_vector_to_irreversible(self, exp_vector):
exp_vector_n = zeros(len(self.reaction_names), )
for rxn, val in enumerate(exp_vector):
rxmap = self.mapping[rxn]
if isinstance(rxmap, tuple):
exp_vector_n[rxmap[0]] = exp_vector_n[rxmap[1]] = val
else:
exp_vector_n[rxmap] = val
return exp_vector_n
[docs] def optimize_gimme(self, exp_vector, objectives, obj_frac, flux_thres):
N = len(self.cbmodel.reaction_names)
objectives_irr = [self.__adjust_objective_to_irreversible(obj) for obj in objectives]
exp_vector_irr = self.__adjust_expression_vector_to_irreversible(exp_vector)
def find_objective_value(obj):
self.cbmodel.set_objective(obj, False)
return self.cbmodel.optimize().objective_value()
objective_values = list(map(find_objective_value, objectives_irr))
gimme_model_objective = array(
[flux_thres - exp_vector_irr[i] if -1 < exp_vector_irr[i] < flux_thres else 0 for i in range(N)])
objective_lbs = zeros(len(self.reaction_names))
for ov, obj in zip(objective_values, objectives_irr):
for rx, v in obj.items():
objective_lbs[rx] = v * ov * obj_frac
objective_ids = nonzero(objective_lbs)[0]
for id, lb in zip(objective_ids, objective_lbs):
self.set_reaction_bounds(id, lb=lb, temporary=True)
self.set_objective(gimme_model_objective, True)
sol = self.optimize()
self.revert_to_original_bounds()
return GIMMESolution(sol, exp_vector, self.cbmodel.reaction_names, self.mapping)