import string, random, shutil
from numpy import nan, array, int_, float_, abs, zeros, max
import pandas as pd
from collections import OrderedDict
[docs]def random_string_generator(N):
"""
Parameters
----------
N : an integer
Returns a random string of uppercase character and digits of length N
-------
"""
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(N))
[docs]class Solution(object):
"""
Class representing a solution to a given linear optimization problem. Includes an internal dictionary for additional
information to be included.
"""
def __init__(self, value_map, status, **kwargs):
"""
Parameters
----------
value_map: A dictionary mapping variable indexes with their values as determined by the solver
status: An object (preferrably str or int) containing the solution status
kwargs: Any additional information to be included in the attribute_dict variable that can be accessed by
the <self.attribute_value> function.
"""
self.__var_names = None
self.__value_map = value_map
self.__status = status
self.__attribute_dict = {k:v for k,v in kwargs.items() if k != 'names'}
if 'names' in kwargs:
self.__var_names = kwargs['names']
if 'objective_value' in kwargs:
self.__obj_value = kwargs['objective_value']
def __getitem__(self, item):
if hasattr(item, '__iter__') and not isinstance(item, str):
return {k: self.__value_map[k] for k in item}
elif isinstance(item, str):
return self.__value_map[item]
else:
raise TypeError('\'item\' is not a sequence or string.')
[docs] def to_series(self):
if self.__var_names != None:
return pd.Series({self.__var_names[i]:self.__value_map[i] for i in range(len(self.__var_names))})
else:
return pd.Series(self.var_values())
[docs] def set_attribute(self, key, value):
"""
Sets the value of a given <key> as <value>.
Parameters
----------
key - A string
value - Any object to be associated with the supplied key
-------
"""
self.__attribute_dict[key] = value
[docs] def var_values(self):
"""
Returns a dict mapping reaction indices with the variable values.
-------
"""
return self.__value_map
[docs] def status(self):
"""
Returns the status of this solution, as supplied by the solver.
-------
"""
return self.__status
[docs] def attribute_value(self, attribute_name):
"""
Parameters
----------
attribute_name: A dictionary key (preferrably str)
Returns the value associated with the supplied key
-------
"""
return self.__attribute_dict[attribute_name]
[docs] def attribute_names(self):
"""
Returns all keys present in the attribute dictionary.
-------
"""
return self.__attribute_dict.keys()
[docs] def objective_value(self):
"""
Returns the objective value for this solution
"""
return self.__obj_value
[docs] def x(self):
'''
Returns a ndarray with the solution values in order (from the variables)
'''
return array(list(self.__value_map.values()))
[docs]class LinearSystemOptimizer(object):
"""
Class with methods to solve a <LinearSystem> as a linear optimization problem.
"""
def __init__(self, linear_system, hard_fail=False, build=True):
"""
Parameters
----------
linear_system: A <LinearSystem> instance.
hard_fail: A boolean flag indicating whether an Exception is raised when the optimization fails
"""
if build:
linear_system.build_problem()
self.solver = linear_system.solver
self.model = linear_system.get_model()
self.hard_fail = hard_fail
[docs] def optimize(self):
"""
Internal function to instantiate the solver and return a solution to the optimization problem
Parameters
----------
objective: A List[Tuple[coef,name]], where coef is an objective coefficient and name is the name of the variable
to be optimized.
minimize: A boolean that, when True, defines the problem as a minimization
Returns a <Solution> instance
-------
"""
names = self.model._get_variables_names()
value_map = OrderedDict([(v, nan) for v in names])
status = None
ov = nan
# self.model.configuration.tolerances.feasibility = 1e-9 # TODO this is for a test, to delete later
# self.model.configuration.tolerances.optimality = 1e-6 # TODO this is for a test, to delete later
# tINIT test parameters
# self.model.problem.Params.MIPGap = 1e-9
# self.model.configuration.tolerances.feasibility = 1e-8
# self.model.configuration.tolerances.optimality = 1e-8
# self.model.configuration.verbosity = 3
try:
self.model.optimize()
values = self.model._get_primal_values()
value_map = OrderedDict([(k, v) for k, v in zip(names, values)])
status = self.model.status
ov = self.model.objective.value
except Exception as e:
frozen_exception = e
if status or not self.hard_fail:
return Solution(value_map, self.model.status, objective_value=ov)
else:
raise frozen_exception
[docs] def populate(self, limit=None):
intf_dict = {
'CPLEX': self.__populate_cplex,
'GUROBI': self.__populate_gurobi
}
if self.solver in ['CPLEX', 'GUROBI']:
return intf_dict[self.solver](limit)
else:
raise ValueError('The provided solver does not have an implemented populate function. Choose from' +
''.join(list(intf_dict.keys())))
def __populate_cplex(self, limit=None):
instance = self.model.problem
if not limit:
instance.parameters.mip.pool.capacity = instance.parameters.mip.pool.capacity.max()
else:
instance.parameters.mip.pool.capacity = limit
vnames = instance.variables.get_names()
mnames = self.model._get_variables_names()
solutions = []
try:
instance.populate_solution_pool()
pool_intf = instance.solution.pool
nsols = pool_intf.get_num()
for s in range(nsols):
vmap = {k: v for k, v in zip(vnames, pool_intf.get_values(s))}
ord_vmap = OrderedDict([(k, vmap[k]) for k in mnames])
sol = Solution(ord_vmap, 'optimal', objective_value=pool_intf.get_objective_value(s))
# TODO: get status dict from optlang and use it accordingly
solutions.append(sol)
except Exception as e:
print(e)
return solutions
def __populate_gurobi(self, limit=None):
instance = self.model.problem
solutions = []
instance.params.PoolSolutions = limit
instance.params.SolutionNumber = 0
try:
instance.optimize()
mnames = self.model._get_variables_names()
if instance.SolCount > 0:
for n in range(instance.SolCount):
instance.params.SolutionNumber = n
ord_vmap = OrderedDict([(k, instance.getVarByName(k).Xn) for k in mnames])
sol = Solution(ord_vmap, 'optimal', objective_value=instance.PoolObjVal)
solutions.append(sol)
except Exception as e:
print(e)
finally:
instance.params.SolutionNumber = 0
return solutions
[docs]class CORSOSolution(Solution):
def __init__(self, sol_max, sol_min, f, index_map, var_names, eps=1e-8):
x = sol_min.x()
rev = index_map[max(index_map) + 1:]
nx = x[:max(index_map) + 1]
nx[rev] = x[rev] - sol_min.x()[max(index_map) + 1:-1]
nx[abs(nx) < eps] = 0
nvalmap = OrderedDict([(k, v) for k, v in zip(var_names, nx)])
super().__init__(nvalmap, [sol_max.status(), sol_min.status()], objective_value=f)
[docs]class GIMMESolution(Solution):
def __init__(self, sol, exp_vector, var_names, mapping=None):
self.exp_vector = exp_vector
gimme_solution = sol.x()
if mapping:
gimme_solution = [max(gimme_solution[array(new)]) if isinstance(new, (tuple, list)) else gimme_solution[new]
for orig, new
in mapping.items()]
super().__init__(
value_map=OrderedDict([(k, v) for k, v in zip(var_names, gimme_solution)]),
status=sol.status(),
objective_value=sol.objective_value()
)
[docs] def get_reaction_activity(self, flux_threshold):
gimme_fluxes = array([kv[1] for i, kv in enumerate(self.var_values().items())])
activity = zeros(gimme_fluxes.shape)
ones = (self.exp_vector > flux_threshold) | (self.exp_vector == -1)
twos = gimme_fluxes > 0
activity[ones] = 1
activity[twos & ~ones] = 2
return activity
[docs]class KShortestSolution(Solution):
"""
A Solution subclass that also contains attributes suitable for elementary flux modes such as non-cancellation sums
of split reactions and reaction activity.
"""
SIGNED_INDICATOR_SUM = 'signed_indicator_map'
SIGNED_VALUE_MAP = 'signed_value_map'
def __init__(self, value_map, status, indicator_map, dvar_mapping, dvars, **kwargs):
"""
Parameters
----------
value_map: A dictionary mapping variable names with values
status: See <Solution>
indicator_map: A dictionary mapping indicators with
dvar_mapping: A mapping between reaction indices and solver variables (Tuple[str] or str)
kwargs: See <Solution>
"""
signed_value_map = {
i: value_map[dvars[varlist[0]]] - value_map[dvars[varlist[1]]] if isinstance(varlist, (tuple, list)) else
value_map[dvars[varlist]] for
i, varlist in dvar_mapping.items()}
signed_indicator_map = {
i: value_map[indicator_map[dvars[varlist[0]]]] - value_map[indicator_map[dvars[varlist[1]]]] if isinstance(
varlist,
(tuple, list)) else
value_map[indicator_map[dvars[varlist]]] for
i, varlist in dvar_mapping.items()}
super().__init__(value_map, status, **kwargs)
self.set_attribute(self.SIGNED_VALUE_MAP, signed_value_map)
self.set_attribute(self.SIGNED_INDICATOR_SUM, signed_indicator_map)
[docs] def get_active_indicator_varids(self):
"""
Returns the indices of active indicator variables (maps with variables on the original stoichiometric matrix)
-------
"""
return [k for k, v in self.attribute_value(self.SIGNED_INDICATOR_SUM).items() if abs(v) > 1e-10]