import abc
import warnings
from re import compile
import numpy as np
from boolean.boolean import BooleanAlgebra
from numpy import where
from ..core.models import ConstraintBasedModel
MAX_PRECISION = 1e-10
GPR_GENE_RE = compile("\\b(?!and\\b|or\\b|AND\\b|OR\\b)[([A-z0-9\.]*")
BOOLEAN_ALGEBRA = BooleanAlgebra()
[docs]def normalize_boolean_expression(rule):
try:
expression = BOOLEAN_ALGEBRA.parse(rule, simplify=True)
bool_expression = BOOLEAN_ALGEBRA.normalize(expression, BOOLEAN_ALGEBRA.OR)
return str(bool_expression).replace('&', ' and ').replace('|', ' or ')
except Exception as e:
warnings.warn('Could not normalize this rule: ' + rule)
return rule
[docs]class AbstractObjectReader(object):
"""
An abstract class for reading metabolic model objects from external frameworks, and extracting the data needed for
pathway analysis methods. Also deals with name conversions.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, model):
"""
Parameters
----------
model: A Model instance from the external framework to use. Must be registered in the dict stored as
external_wrappers.model_readers along with its reader.
"""
self.model = model
self.r_ids, self.m_ids = self.get_reaction_and_metabolite_ids()
self.rx_instances = self.get_rx_instances()
self.S = self.get_stoichiometric_matrix()
self.lb, self.ub = tuple(zip(*self.get_model_bounds(False)))
self.irrev_bool = self.get_irreversibilities(False)
self.irrev_index = self.get_irreversibilities(True)
self.bounds_dict = self.get_model_bounds(True)
self.genes = self.get_model_genes()
self.gene_protein_reaction_rules = self.get_model_gprs()
self.gpr_map = None
[docs] @abc.abstractmethod
def get_stoichiometric_matrix(self):
"""
Returns a 2D numpy array with the stoichiometric matrix whose metabolite and reaction indexes match the names
defined in the class variables r_ids and m_ids
"""
return
[docs] @abc.abstractmethod
def get_model_bounds(self, as_dict, separate_list):
"""
Returns the lower and upper bounds for all fluxes in the model. This either comes in the form of an ordered list
with tuples of size 2 (lb,ub) or a dictionary with the same tuples mapped by strings with reaction identifiers.
Parameters
----------
as_dict: A boolean value that controls whether the result is a dictionary mapping str to tuple of size 2
separate: A boolean value that controls whether the result is two numpy.array(), one for lb and the other\n
to ub
"""
return
[docs] @abc.abstractmethod
def get_irreversibilities(self, as_index):
"""
Returns a vector representing irreversible reactions, either as a vector of booleans (each value is a flux,
ordered in the same way as reaction identifiers) or as a vector of reaction indexes.
Parameters
----------
as_dict: A boolean value that controls whether the result is a vector of indexes
"""
return
[docs] @abc.abstractmethod
def get_rx_instances(self):
"""
Returns the reaction instances contained in the model. Varies depending on the framework.
"""
return
[docs] @abc.abstractmethod
def get_model_genes(self):
"""
Returns the identifiers for the genes contained in the model
"""
[docs] @abc.abstractmethod
def get_model_gprs(self, apply_fx=None):
"""
Returns the model gene-protein-reaction rules associated with each reaction
"""
[docs] @abc.abstractmethod
def convert_gprs_to_list(self, rx, apply_fx):
return
[docs] def reaction_id_to_index(self, id):
"""
Returns the numerical index of a reaction when given a string representing its identifier.
Parameters
----------
id: A reaction identifier as a string
"""
return self.r_ids.index(id)
[docs] def get_gene_protein_reaction_rule(self, id):
return self.gene_protein_reaction_rules[id]
[docs] def convert_constraint_ids(self, tup, yield_constraint):
if yield_constraint:
constraint = tuple(list(map(self.reaction_id_to_index, tup[:2])) + list(tup[2:]))
else:
constraint = tuple([self.reaction_id_to_index(tup[0])] + list(tup[1:]))
return constraint
[docs] def decode_k_shortest_solution(self, solarg):
if isinstance(solarg, list):
return [self.__decode_k_shortest_solution(sol) for sol in solarg]
else:
return self.__decode_k_shortest_solution(solarg)
def __decode_k_shortest_solution(self, sol):
## TODO: Make MAX_PRECISION a parameter for linear systems or the KShortestAlgorithm
return {self.r_ids[k]: v for k, v in sol.attribute_value(sol.SIGNED_VALUE_MAP).items() if
abs(v) > MAX_PRECISION}
[docs] def g2rx(self, expression, and_fx=min, or_fx=max, as_vector=False, apply_fx=None):
if self.gpr_map == None:
self.gpr_map = {rx: self.convert_gprs_to_list(rx, apply_fx) for rx in self.r_ids}
gpr_map = self.gpr_map
def aux_apply(fx, it):
args = [k for k in it if k is not None]
return fx(args) if args else None
def eval_gpr(lists):
return aux_apply(or_fx,
[aux_apply(and_fx, [expression[x] for x in gs if x in expression.keys()]) for gs in lists])
exp_map = {rx: eval_gpr(gpr_map[rx]) for rx in gpr_map}
if as_vector:
return [exp_map[k] for k in self.r_ids]
else:
return exp_map
[docs] def get_working_gene_names(self):
current_model_genes = list(self.get_model_genes())
return dict(zip(current_model_genes, map(lambda x: '_' + x, current_model_genes)))
[docs] def to_cobamp_cbm(self, solver=None):
return ConstraintBasedModel(
S=self.get_stoichiometric_matrix(),
thermodynamic_constraints=[tuple(float(k) for k in l) for l in self.get_model_bounds()],
reaction_names=self.r_ids,
metabolite_names=self.m_ids,
optimizer=solver != None and solver,
solver=solver if solver not in (True, False) else None)
[docs]class COBRAModelObjectReader(AbstractObjectReader):
[docs] def get_stoichiometric_matrix(self):
S = np.zeros((len(self.m_ids), len(self.r_ids)))
for i, r_id in enumerate(self.r_ids):
for metab, coef in self.model.reactions.get_by_id(r_id).metabolites.items():
S[self.m_ids.index(metab.id), i] = coef
return S
[docs] def get_model_bounds(self, as_dict=False, separate_list=False):
bounds = [r.bounds for r in self.rx_instances]
if as_dict:
return dict(zip(self.r_ids, bounds))
else:
if separate_list:
return [list(bounds) for bounds in list(zip(*tuple(bounds)))]
else:
return tuple(bounds)
[docs] def get_irreversibilities(self, as_index):
irrev = [not r.reversibility for r in self.rx_instances]
if as_index:
irrev = list(np.where(irrev)[0])
return irrev
[docs] def get_rx_instances(self):
return [self.model.reactions.get_by_id(rx) for rx in self.r_ids]
[docs] def get_model_genes(self):
return set([g.id for g in self.model.genes])
# def get_model_gprs(self, apply_fx=None):
# if not apply_fx:
# return [r.gene_reaction_rule for r in self.model.reactions]
# else:
# return [apply_fx(r.gene_reaction_rule) for r in self.model.reactions]
[docs] def get_model_gprs(self, apply_fx=None, token_to_gene_ratio=20):
## TODO: this function should only retrieve a single gpr. the abstract class should normalize it
def fix_name(gpr_string):
# genes = set(findall(GPR_GENE_RE, gpr_string)) - {''}
matches = [k for k in GPR_GENE_RE.finditer(gpr_string) if k.span()[0] - k.span()[1] != 0]
unique_tokens = set([m.string for m in matches])
for offset, match_obj in enumerate(matches):
final_pos = match_obj.span()[0] + offset
gpr_string = gpr_string[:final_pos] + '_' + gpr_string[final_pos:]
return gpr_string, len(matches), len(unique_tokens), unique_tokens
gpr_list = []
for r in self.model.reactions:
rule, num_matches, num_unique_tokens, unique_tokens = fix_name(r.gene_reaction_rule)
if apply_fx:
rule = apply_fx(rule)
if (num_unique_tokens > 0) and (num_matches // num_unique_tokens) < token_to_gene_ratio:
rule = normalize_boolean_expression(rule)
else:
warnings.warn(
'Will not normalize rules with more than ' + str(token_to_gene_ratio) + ' average tokens per gene')
matches_post = [k for k in GPR_GENE_RE.finditer(rule) if
(k.span()[0] - k.span()[1] != 0) and k.string[k.span()[0]:k.span()[1]][0] == '_']
for offsetp, matchp_obj in enumerate(matches_post):
final_pos = matchp_obj.span()[0] - offsetp
rule = rule[:final_pos] + rule[final_pos + 1:]
gpr_list.append(rule)
# for tok in unique_tokens:
# rule = rule.replace('_'+tok, tok)
return gpr_list
#
# if not apply_fx:
# return [fix_name(r.gene_reaction_rule) for r in self.model.reactions]
# else:
# return [apply_fx(fix_name(r.gene_reaction_rule)) for r in self.model.reactions]
[docs] def convert_gprs_to_list(self, rx, apply_fx):
if apply_fx == None:
apply_fx = lambda x: x
proteins = list(map(lambda x: x.strip().replace('(', '').replace(')', ''),
apply_fx(self.gene_protein_reaction_rules[self.r_ids.index(rx)]).split('or')))
rules = [[s.strip() for s in x.split('and') if s.strip() != ''] for x in proteins]
return rules
[docs]class FramedModelObjectReader(AbstractObjectReader):
[docs] def get_stoichiometric_matrix(self):
return np.array(self.model.stoichiometric_matrix())
[docs] def get_model_bounds(self, as_dict=False, separate_list=False):
bounds = [(r.lb, r.ub) for r in self.rx_instances]
if as_dict:
return dict(zip(self.r_ids, bounds))
else:
if separate_list:
return [bounds for bounds in list(zip(*tuple(bounds)))]
else:
return tuple(bounds)
[docs] def get_irreversibilities(self, as_index):
irrev = [not r.reversible for r in self.rx_instances]
if as_index:
irrev = list(np.where(irrev)[0])
return irrev
[docs] def get_rx_instances(self):
return [self.model.reactions[rx] for rx in self.r_ids]
[docs] def convert_gprs_to_list(self, rx):
proteins = list(map(lambda x: x.strip().replace('(', '').replace(')', ''), rx.gene_reaction_rule.split('or')))
rules = [[s.strip() for s in x.split('and') if s.strip() != ''] for x in proteins]
return rules
[docs]class CobampModelObjectReader(AbstractObjectReader):
[docs] def get_stoichiometric_matrix(self):
return self.model.get_stoichiometric_matrix()
[docs] def get_model_bounds(self, as_dict, separate_list=False):
if as_dict:
return dict(zip(self.r_ids, self.model.bounds))
else:
if separate_list:
return [bounds for bounds in list(zip(*tuple(self.model.bounds)))]
else:
return tuple(self.model.bounds)
[docs] def get_irreversibilities(self, as_index):
irrev = [not self.model.is_reversible_reaction(r) for r in self.r_ids]
if as_index:
irrev = list(np.where(irrev)[0])
return irrev
[docs] def get_rx_instances(self):
return None
# This dict contains the mapping between model instance namespaces (without the class name itself) and the appropriate
# model reader object. Modify this if a new reader is implemented.
model_readers = {
'cobra.core.model': COBRAModelObjectReader,
'framed.model.cbmodel': FramedModelObjectReader,
'cobamp.core.models': CobampModelObjectReader,
'numpy': MatFormatReader
}