#!/usr/bin/env python3
#
# Copyright 2022 Max Planck Insitute Magdeburg
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
"""Functions for metabolic network compression and extension with GPR rules"""
import numpy as np
from scipy import sparse
from sympy.core.numbers import One
from sympy import Rational, nsimplify, parse_expr, to_dnf
from typing import Dict, List
from re import search
import jpype
from cobra import Model, Metabolite, Reaction, Configuration
from cobra.util.array import create_stoichiometric_matrix
import straindesign.efmtool as efm
from straindesign import fva, select_solver, parse_constraints, avail_solvers
from straindesign.names import *
import logging
[docs]def remove_irrelevant_genes(model, essential_reacs, gkis, gkos):
"""Remove genes whose that do not affect the flux space of the model
This function is used in preprocessing of computational strain design computations. Often,
certain reactions, for instance, reactions essential for microbial growth can/must not be
targeted by interventions. That can be exploited to reduce the set of genes in which
interventions need to be considered.
Given a set of essential reactions that is to be maintained operational, some genes can be
removed from a metabolic model, either because they only affect only blocked reactions or
essential reactions, or because they are essential reactions and must not be removed. As a
consequence, the GPR rules of a model can be simplified.
Example:
remove_irrelevant_genes(model, essential_reacs, gkis, gkos):
Args:
model (cobra.Model):
A metabolic model that is an instance of the cobra.Model class containing GPR rules
essential_reacs (list of str):
A list of identifiers of essential reactions.
gkis, gkos (dict):
Dictionaries that contain the costs for gene knockouts and additions. E.g.,
gkos={'adhE': 1.0, 'ldhA' : 1.0 ...}
Returns:
(dict):
An updated dictionary of the knockout costs in which irrelevant genes are removed.
"""
# 1) Remove gpr rules from blocked reactions
blocked_reactions = [reac.id for reac in model.reactions if reac.bounds == (0, 0)]
for rid in blocked_reactions:
model.reactions.get_by_id(rid).gene_reaction_rule = ''
for g in model.genes[::-1]: # iterate in reverse order to avoid mixing up the order of the list when removing genes
if not g.reactions:
model.genes.remove(g)
protected_genes = set()
# 1. Protect genes that only occur in essential reactions
for g in model.genes:
if not g.reactions or {r.id for r in g.reactions}.issubset(essential_reacs):
protected_genes.add(g)
# 2. Protect genes that are essential to essential reactions
for r in [model.reactions.get_by_id(s) for s in essential_reacs]:
for g in r.genes:
exp = r.gene_reaction_rule.replace(' or ', ' | ').replace(' and ', ' & ')
exp = to_dnf(parse_expr(exp, {g.id: False}), force=True)
if exp == False:
protected_genes.add(g)
# 3. Remove essential genes, and knockouts without impact from gko_costs
[gkos.pop(pg.id) for pg in protected_genes if pg.id in gkos]
# 4. Add all notknockable genes to the protected list
[protected_genes.add(g) for g in model.genes if (g.id not in gkos) and (g.name not in gkos)] # support names or ids in gkos
# genes with kiCosts are kept
gki_ids = [g.id for g in model.genes if (g.id in gkis) or (g.name in gkis)] # support names or ids in gkis
protected_genes = protected_genes.difference({model.genes.get_by_id(g) for g in gki_ids})
protected_genes_dict = {pg.id: True for pg in protected_genes}
# 5. Simplify gpr rules and discard rules that cannot be knocked out
for r in model.reactions:
if r.gene_reaction_rule:
exp = r.gene_reaction_rule.replace(' or ', ' | ').replace(' and ', ' & ')
exp = to_dnf(parse_expr(exp, protected_genes_dict), simplify=True, force=True)
if exp == True:
model.reactions.get_by_id(r.id).gene_reaction_rule = ''
elif exp == False:
logging.error('Something went wrong during gpr rule simplification.')
else:
model.reactions.get_by_id(r.id).gene_reaction_rule = str(exp).replace(' & ', ' and ').replace(' | ', ' or ')
# 6. Remove obsolete genes and protected genes
for g in model.genes[::-1]:
if not g.reactions or g in protected_genes:
model.genes.remove(g)
return gkos
[docs]def extend_model_gpr(model, use_names=False):
"""Integrate GPR-rules into a metabolic model as pseudo metabolites and reactions
COBRA modules often have gene-protein-reaction (GPR) rules associated with each reaction.
These can be integrated into the metabolic network structure through pseudo reactions
and variables. As GPR rules are integrated into the metabolic network, the metabolic flux
space does not change. After integration, the gene-pseudoreactions can be fixed to a flux of
zero to simulate gene knockouts. Gene pseudoreactions are referenced either by the gene name
or the gene identifier (user selected).
GPR-rule integration enables the computation of strain designs based on genetic interventions.
This function requires all GPR-rules to be provided in DNF (disjunctive normal form).
(e.g. g1 and g2 or g1 and g3, NOT g1 and (g2 or g3)). Brackets are allowed but not required.
If reversible reactions are associated with GPR-rules, these reactions are spit during
GPR-integration. The function returns a mapping of old and new reaction identifiers.
Example:
reac_map = extend_model_gpr(model):
Args:
model (cobra.Model):
A metabolic model that is an instance of the cobra.Model class containing GPR rules
in DNF
use_names (bool): (Default: False)
If set to True, the gene pseudoreactions will carry the gene name as reaction
identifier. If False, the gene identifier will be used. By default this option is
turned off because many models do not provide gene names.
Returns:
(dict):
A dictionary to reference old and new reaction identifiers, for reversible reactions
that were split (when they are associated with GPR rules). Entries have the form:
{'Reaction1' : {'Reaction1' : 1, 'Reaction1_reverse_a59c' : -1}}
"""
# Check if reaction names and gene names/IDs overlap. If yes, throw Error
reac_ids = {r.id for r in model.reactions}
if (not use_names) and any([g.id in reac_ids for g in model.genes]):
raise Exception("GPR rule integration requires distinct identifiers for reactions and "+\
"genes.\nThe following identifiers seem to be both, reaction IDs and gene "+\
"IDs:\n"+str([g.id for g in model.genes if g.id in reac_ids])+\
"\nTo prevent this problem, call extend_model_gpr with the use_names=False "+\
"option or refer\nto gene names instead of IDs in your strain design "+\
"computation if they are available.\nYou may also rename the conflicting "+\
"genes with cobra.manipulation.modify.rename_genes(model, {'g_old': 'g_new'})")
elif use_names and any([g.name in reac_ids for g in model.genes]):
raise Exception("GPR rule integration requires distinct identifiers for reactions and "+\
"genes.\nThe following identifiers seem to be both, reaction IDs and gene "+\
"names:\n"+str([g.name for g in model.genes if g.name in reac_ids])+\
"\nTo prevent this problem, call extend_model_gpr with the use_names=False "+\
"option or refer \nto gene IDs instead of names in your strain design "+\
"computation. You may also rename\nthe conflicting genes, e.g., with "+\
"model.genes.get_by_id('ADK1').name = 'adk1'")
MAX_NAME_LEN = 230
def warning_name_too_long(id, p=""):
logging.warning(" GPR rule integration automatically generates new reactions and metabolites."+\
"\nOne of the generated reaction names is beyond or close to the limit of 255 "+\
"characters\npermitted by GLPK and Gurobi. The name of the newly generated "+\
"reaction or metabolite: \n "+id+",\ngenerated from reaction or metabolite:\n "+\
p+"\n"+"was therefore trimmed to:\n "+id[0:MAX_NAME_LEN]+".\nThis trimming is "+\
"usually safe, no guarantee is given. To avoid this message,\nuse the CPLEX "+\
"solver or consider simplifying GPR rules or gene names in your model.")
def truncate(id):
return id[0:MAX_NAME_LEN - 16] + hex(abs(hash(id)))[2:]
solver = search('(' + '|'.join(avail_solvers) + ')', model.solver.interface.__name__)[0]
# Split reactions when necessary
reac_map = {}
rev_reac = set()
del_reac = set()
for r in model.reactions:
reac_map.update({r.id: {}})
if not r.gene_reaction_rule:
reac_map[r.id].update({r.id: 1.0})
continue
if r.gene_reaction_rule and r.bounds[0] < 0:
r_rev = (r * -1)
if r.gene_reaction_rule and r.bounds[1] > 0:
r_rev.id = r.id + '_reverse_' + hex(hash(r))[8:]
r_rev.lower_bound = np.max([0, r_rev.lower_bound])
reac_map[r.id].update({r_rev.id: -1.0})
if len(r_rev.id) > MAX_NAME_LEN and solver in {GUROBI, GLPK}:
warning_name_too_long(r_rev.id, r.id)
r_rev.id = truncate(r_rev.id)
rev_reac.add(r_rev)
if r.gene_reaction_rule and r.bounds[1] > 0:
reac_map[r.id].update({r.id: 1.0})
r._lower_bound = np.max([0, r._lower_bound])
else:
del_reac.add(r)
model.remove_reactions(del_reac)
model.add_reactions(rev_reac)
# All reaction rules are provided in dnf.
for r in model.reactions:
if r.gene_reaction_rule: # if reaction has a gpr rule
dt = [s.strip() for s in r.gene_reaction_rule.split(' or ')]
for i, p in enumerate(dt.copy()):
ct = [s.strip() for s in p.replace('(', '').replace(')', '').split(' and ')]
for j, g in enumerate(ct.copy()):
gene_met_id = 'g_' + g
# Check name length of new metabolite
if len(gene_met_id) > MAX_NAME_LEN and solver in {GUROBI, GLPK}:
if truncate(gene_met_id) not in [m.id for m in model.metabolites]:
warning_name_too_long(gene_met_id, r.id)
gene_met_id = truncate(gene_met_id)
# if gene is not in model, add gene pseudoreaction and metabolite
if gene_met_id not in model.metabolites.list_attr('id'):
model.add_metabolites(Metabolite(gene_met_id))
gene = model.genes.get_by_id(g)
w = Reaction(gene.id)
if use_names: # if gene name is available and used in gki_cost and gko_cost
w.id = gene.name
# Check name length of new reaction
if len(w.id) > MAX_NAME_LEN and solver in {GUROBI, GLPK}:
warning_name_too_long(w.id, r.id)
w.id = truncate(w.id)
model.add_reactions([w])
w.reaction = '--> ' + gene_met_id
w._upper_bound = np.inf
ct[j] = gene_met_id
if len(ct) > 1:
ct_met_id = "_and_".join(ct)
# Check name length of new metabolite
if len(ct_met_id) > MAX_NAME_LEN and solver in {GUROBI, GLPK}:
if truncate(ct_met_id) not in [m.id for m in model.metabolites]:
warning_name_too_long(ct_met_id, r.id)
ct_met_id = truncate(ct_met_id)
if ct_met_id not in model.metabolites.list_attr('id'):
# if conjunct term is not in model, add pseudoreaction and metabolite
model.add_metabolites(Metabolite(ct_met_id))
w = Reaction("R_" + ct_met_id)
# Check name length of new reaction
if len(w.id) > MAX_NAME_LEN and solver in {GUROBI, GLPK}:
warning_name_too_long(w.id, r.id)
w.id = truncate(w.id)
model.add_reactions([w])
w.reaction = ' + '.join(ct) + '--> ' + ct_met_id
w._upper_bound = np.inf
dt[i] = ct_met_id
else:
dt[i] = gene_met_id
if len(dt) > 1:
dt_met_id = "_or_".join(dt)
# Check name length of new metabolite
if len(dt_met_id) > MAX_NAME_LEN and solver in {GUROBI, GLPK}:
if truncate(dt_met_id) not in [m.id for m in model.metabolites]:
warning_name_too_long(dt_met_id, r.id)
dt_met_id = truncate(dt_met_id)
if dt_met_id not in model.metabolites.list_attr('id'):
model.add_metabolites(Metabolite(dt_met_id))
for k, d in enumerate(dt):
w = Reaction("R" + str(k) + "_" + dt_met_id)
# Check name length of new reaction
if len(w.id) > MAX_NAME_LEN and solver in {GUROBI, GLPK}:
warning_name_too_long(w.id, r.id)
w.id = truncate(w.id)
model.add_reactions([w])
w.reaction = d + ' --> ' + dt_met_id
w._upper_bound = np.inf
else:
dt_met_id = dt[0]
r.add_metabolites({model.metabolites.get_by_id(dt_met_id): -1.0})
return reac_map
[docs]def extend_model_regulatory(model, reg_itv):
"""Extend a metabolic model to account for regulatory constraints
This function emulates regulatory interventions in a network. These can either be added
permanently or linked to a pseudoreation whose boundaries can be fixed to zero used
to activate the regulatory constraint.
Accounting for regulatory interventions, such as applying an upper or lower bound
to a reaction or gene pseudoreaction, can be achieved by combining different
pseudometabolites and reactions. For instance, to introduce the regulatory constraint:
2*r_1 + 3*r_2 <= 4
and make it 'toggleable', one adds 1 metabolite 'm' and 2 reactions, 'r_bnd' to account for
the bound/rhs and r_ctl to control whether the regulatory intervention is active or not:
dm/dt = 2*r_1 + 3*r_2 - r_bnd + r_ctl = 0, -inf <= r_bnd <= 4, -inf <= r_ctl <= inf
When r_ctl is fixed to zero, the constraint 2*r_1 + 3*r_2 <= 4 is enforced, otherwise,
the constraint is non binding, thus virtually non-existant. To use this mechanism for
strain design, we add the metabolite and reactions as described above and tag r_ctl as
knockout candidate. If the algorithm decides to knockout r_ctl, this means, it choses to
add the regulatory intervention 2*r_1 + 3*r_2 <= 4.
If the constraint is be added permanently, this function completely omits the r_ctl reaction.
Example:
reg_itv_costs = extend_model_regulatory(model, {'1 PDH + 1 PFL <= 5' : 1, '-EX_o2_e <= 2' : 1.5})
Args:
model (cobra.Model):
A metabolic model that is an instance of the cobra.Model class
reg_itv (dict or list of str or str):
A set of regulatory constraints that should be added to the model. If reg_itv is a
string or a list of strings, regulatory constraints are added permanently. If reg_itv
is a dict, regulatory interventions are added in a controllable manner. The id of the
reaction that controls the constraint is contained in the return variable. The constraint
to be added will be parsed from strings, so ensure that you use the correct reaction
identifiers. Valid inputs are:
reg_itv = '-EX_o2_e <= 2' # A single permanent regulatory constraint
reg_itv = ['1 PDH + 1 PFL <= 5', '-EX_o2_e <= 2'] # Two permanent constraints
reg_itv = {'1 PDH + 1 PFL <= 5' : 1, '-EX_o2_e <= 2' : 1.5} # Two controllable constraints
# one costs '1' and the other one '1.5' to be added. The function returns a dict with
# {'p1_PDH_p1_PFK_le_5' : 1 'nEX_o2_e_le_2' : 1.5}. Fixing the reaction
# p1_PDH_p1_PFK_le_5 to zero will activate the constraint in the model.
Returns:
(dict):
A dictionary that contains the cost of adding each constraint; e.g.,
{'p1_PDH_p1_PFK_le_5' : 1 'n1EX_o2_e_le_2' : 1.5}
"""
keywords = set(model.reactions.list_attr('id'))
if '' in keywords:
keywords.remove('')
if not isinstance(reg_itv, dict):
if not isinstance(reg_itv, str) and not isinstance(reg_itv, list):
raise Exception('reg_itv must be a string, a list or a dictionary.'\
'See function description for details.')
if isinstance(reg_itv, str):
reg_itv = {reg_itv: np.nan}
elif isinstance(reg_itv, list):
reg_itv = {r: np.nan for r in reg_itv}
regcost = {}
for k, v in reg_itv.copy().items():
# generate name for regulatory pseudoreaction
try:
constr = parse_constraints(k, keywords)[0]
except:
raise Exception('Regulatory constraints could not be parsed. Please revise.')
reacs_dict = constr[0]
eqsign = constr[1]
rhs = constr[2]
reg_name = ''
for l, w in reacs_dict.items():
if w < 0:
reg_name += 'n' + str(w) + '_' + l
else:
reg_name += 'p' + str(w) + '_' + l
reg_name += '_'
if eqsign == '=':
reg_name += 'eq_'
elif eqsign == '<=':
reg_name += 'le_'
elif eqsign == '>=':
reg_name += 'ge_'
reg_name += str(rhs).replace('-', 'n').replace('.', 'p')
reg_itv.pop(k)
reg_itv.update({reg_name: {'str': k, 'cost': v}})
reg_pseudomet_name = 'met_' + reg_name
# add pseudometabolite
m = Metabolite(reg_pseudomet_name)
model.add_metabolites(m)
# add pseudometabolite to stoichiometries
for l, w in reacs_dict.items():
r = model.reactions.get_by_id(l)
r.add_metabolites({m: w})
# add pseudoreaction that defines the bound
r_bnd = Reaction("bnd_" + reg_name)
model.add_reactions([r_bnd])
r_bnd.reaction = reg_pseudomet_name + ' --> '
if eqsign == '=':
r_bnd._lower_bound = -np.inf
r_bnd._upper_bound = rhs
r_bnd._lower_bound = rhs
elif eqsign == '<=':
r_bnd._lower_bound = -np.inf
r_bnd._upper_bound = rhs
elif eqsign == '>=':
r_bnd._upper_bound = np.inf
r_bnd._lower_bound = rhs
# add knockable pseudoreaction and add it to the kocost list
if not np.isnan(v):
r_ctl = Reaction(reg_name)
model.add_reactions([r_ctl])
r_ctl.reaction = '--> ' + reg_pseudomet_name
r_ctl._upper_bound = np.inf
r_ctl._lower_bound = -np.inf
regcost.update({reg_name: v})
return regcost
[docs]def compress_model(model, no_par_compress_reacs=set()):
"""Compress a metabolic model with a number of different techniques
The network compression routine removes blocked reactions, removes conservation
relations and then performs alternatingly lumps dependent (compress_model_efmtool)
and parallel (compress_model_parallel) reactions. The compression returns a compressed
network and a list of compression maps. Each map consists of a dictionary that contains
complete information for reversing the compression steps successively and expand
information obtained from the compressed model to the full model. Each entry of each
map contains the id of a compressed reaction, associated with the original reaction
names and their factor (provided as a rational number) with which they were lumped.
Furthermore, the user can select reactions that should be exempt from the parallel
compression. This is a critical feature for strain design computations. There is
currently no way to exempt reactions from the efmtool/dependency compression.
Example:
comression_map = compress_model(model,set('EX_etoh_e','PFL'))
Args:
model (cobra.Model):
A metabolic model that is an instance of the cobra.Model class
no_par_compress_reacs (set or list of str): (Default: set())
A set of reaction identifiers whose reactions should not be lumped with other
parallel reactions.
Returns:
(list of dict):
A list of compression maps. Each map is a dict that contains information for reversing
the compression steps successively and expand information obtained from the compressed
model to the full model. Each entry of each map contains the id of a compressed reaction,
associated with the original reaction identifiers and their factor with which they are
represented in the lumped reaction (provided as a rational number) with which they were
lumped.
"""
# Remove conservation relations.
logging.info(' Removing blocked reactions.')
remove_blocked_reactions(model)
logging.info(' Translating stoichiometric coefficients to rationals.')
stoichmat_coeff2rational(model)
logging.info(' Removing conservation relations.')
remove_conservation_relations(model)
parallel = False
run = 1
cmp_mapReac = []
numr = len(model.reactions)
while True:
if not parallel:
logging.info(' Compression ' + str(run) + ': Applying compression from EFM-tool module.')
reac_map_exp = compress_model_efmtool(model)
for new_reac, old_reac_val in reac_map_exp.items():
old_reacs_no_compress = [r for r in no_par_compress_reacs if r in old_reac_val]
if old_reacs_no_compress:
[no_par_compress_reacs.remove(r) for r in old_reacs_no_compress]
no_par_compress_reacs.add(new_reac)
else:
logging.info(' Compression ' + str(run) + ': Lumping parallel reactions.')
reac_map_exp = compress_model_parallel(model, no_par_compress_reacs)
remove_conservation_relations(model)
if numr > len(reac_map_exp):
logging.info(' Reduced to ' + str(len(reac_map_exp)) + ' reactions.')
# store the information for decompression in a Tuple
# (0) compression matrix, (1) reac_id dictornary {cmp_rid: {orig_rid1: factor1, orig_rid2: factor2}},
# (2) linear (True) or parallel (False) compression (3,4) ko and ki costs of expanded network
cmp_mapReac += [{
"reac_map_exp": reac_map_exp,
"parallel": parallel,
}]
if parallel:
parallel = False
else:
parallel = True
run += 1
numr = len(reac_map_exp)
else:
logging.info(' Last step could not reduce size further (' + str(numr) + ' reactions).')
logging.info(' Network compression completed. (' + str(run - 1) + ' compression iterations)')
logging.info(' Translating stoichiometric coefficients back to float.')
break
stoichmat_coeff2float(model)
return cmp_mapReac
[docs]def compress_modules(sd_modules, cmp_mapReac):
"""Compress strain design modules to match with a compressed model
When a strain design task has been specified with modules and the original metabolic model
was compressed, one needs to refit the strain design modules (objects of the SDModule class)
to the new compressed model. This function takes a list of modules and a compression map
and returns the strain design modules for a compressed network.
Example:
comression_map = compress_modules(sd_modules, cmp_mapReac)
Args:
model (list of SDModule):
A list of strain design modules
cmp_mapReac (list of dicts):
Compression map obtained from cmp_mapReac = compress_model(model)
Returns:
(list of SDModule):
A list of strain design modules for the compressed network
"""
sd_modules = modules_coeff2rational(sd_modules)
for cmp in cmp_mapReac:
reac_map_exp = cmp["reac_map_exp"]
parallel = cmp["parallel"]
if not parallel:
for new_reac, old_reac_val in reac_map_exp.items():
for i, m in enumerate(sd_modules):
for p in [CONSTRAINTS, INNER_OBJECTIVE, OUTER_OBJECTIVE, PROD_ID, MIN_GCP]:
if p in m and m[p] is not None:
param = m[p]
if p == CONSTRAINTS:
for j, c in enumerate(m[p]):
if np.any([k in old_reac_val for k in c[0].keys()]):
lumped_reacs = [k for k in c[0].keys() if k in old_reac_val]
c[0][new_reac] = np.sum([c[0].pop(k) * old_reac_val[k] for k in lumped_reacs])
if p in [INNER_OBJECTIVE, OUTER_OBJECTIVE, PROD_ID]:
if np.any([k in old_reac_val for k in param.keys()]):
lumped_reacs = [k for k in param.keys() if k in old_reac_val]
m[p][new_reac] = np.sum([param.pop(k) * old_reac_val[k] for k in lumped_reacs if k in old_reac_val])
sd_modules = modules_coeff2float(sd_modules)
return sd_modules
[docs]def compress_ki_ko_cost(kocost, kicost, cmp_mapReac):
"""Compress knockout/addition cost vectors to match with a compressed model
When knockout/addition cost vectors have been specified (as dicts) and the original
metabolic model was compressed, one needs to update the knockout/addition cost vectors.
This function takes care of this. In particular it makes sure that the resulting costs
are calculated correctly.
E.g.: r_ko_a (cost 1) and r_ko_b (cost 2) are lumped parallel: The resulting cost of r_ko_ab is 3
If they are lumped as dependent reactions the resulting cost is 1. If one of the two reactions
is an addition candidate, the resulting reaction will be an addition candidate when lumped as
dependent reactions and a knockout candidate when lumped in parallel. There are various possible
cases that are treated by this function.
Example:
kocost, kicost, cmp_mapReac = compress_ki_ko_cost(kocost, kicost, cmp_mapReac)
Args:
kocost, kicost (dict):
Knockout and addition cost vectors
cmp_mapReac (list of dicts):
Compression map obtained from cmp_mapReac = compress_model(model)
Returns:
(Tuple):
Updated vectors of KO costs and KI costs and an updated compression map that contains information
on how to expand strain designs and correctly distinguish between knockouts and additions.
"""
# kocost of lumped reactions: when reacs sequential: lowest of ko costs, when parallel: sum of ko costs
# kicost of lumped reactions: when reacs sequential: sum of ki costs, when parallel: lowest of ki costs
for cmp in cmp_mapReac:
reac_map_exp = cmp["reac_map_exp"]
parallel = cmp["parallel"]
cmp.update({KOCOST: kocost, KICOST: kicost})
if kocost:
ko_cost_new = {}
for r in reac_map_exp:
if np.any([s in kocost for s in reac_map_exp[r]]):
if not parallel and not np.any([s in kicost for s in reac_map_exp[r]]):
ko_cost_new[r] = np.min([kocost[s] for s in reac_map_exp[r] if s in kocost])
elif parallel:
ko_cost_new[r] = np.sum([kocost[s] for s in reac_map_exp[r] if s in kocost])
kocost = ko_cost_new
if kicost:
ki_cost_new = {}
for r in reac_map_exp:
if np.any([s in kicost for s in reac_map_exp[r]]):
if not parallel:
ki_cost_new[r] = np.sum([kicost[s] for s in reac_map_exp[r] if s in kicost])
elif parallel and not np.any([s in kocost for s in reac_map_exp[r]]):
ki_cost_new[r] = np.min([kicost[s] for s in reac_map_exp[r] if s in kicost])
kicost = ki_cost_new
return kocost, kicost, cmp_mapReac
[docs]def expand_sd(sd, cmp_mapReac):
"""Expand computed strain designs from a compressed to a full model
Needed after computing strain designs in a compressed model
Example:
expanded_sd = expand_sd(compressed_sds, cmp_mapReac)
Args:
sd (SDSolutions):
Solutions of a strain design computation that refer to a compressed model
cmp_mapReac (list of dicts):
Compression map obtained from cmp_mapReac = compress_model(model) and updated with
kocost, kicost, cmp_mapReac = compress_ki_ko_cost(kocost, kicost, cmp_mapReac)
Returns:
(SDSolutions):
Strain design solutions that refer to the uncompressed model
"""
# expand mcs by applying the compression steps in the reverse order
cmp_map = cmp_mapReac[::-1]
for exp in cmp_map:
reac_map_exp = exp["reac_map_exp"] # expansion map
ko_cost = exp[KOCOST]
ki_cost = exp[KICOST]
par_reac_cmp = exp["parallel"] # if parallel or sequential reactions were lumped
for r_cmp, r_orig in reac_map_exp.items():
if len(r_orig) > 1:
for m in sd.copy():
if r_cmp in m:
val = m[r_cmp]
del m[r_cmp]
if val < 0: # case: KO
if par_reac_cmp:
new_m = m.copy()
for d in r_orig:
if d in ko_cost:
new_m[d] = val
sd += [new_m]
else:
for d in r_orig:
if d in ko_cost:
new_m = m.copy()
new_m[d] = val
sd += [new_m]
elif val > 0: # case: KI
if par_reac_cmp:
for d in r_orig:
if d in ki_cost:
new_m = m.copy()
new_m[d] = val
# other reactions do not need to be knocked in
for f in [e for e in r_orig if (e in ki_cost) and e != d]:
new_m[f] = 0.0
sd += [new_m]
else:
new_m = m.copy()
for d in r_orig:
if d in ki_cost:
new_m[d] = val
sd += [new_m]
elif val == 0: # case: KI that was not introduced
new_m = m.copy() # assume that none of the expanded
for d in r_orig: # reactions are inserted, neither
if d in ki_cost: # parallel, nor sequential
new_m[d] = val
sd += [new_m]
sd.remove(m)
return sd
[docs]def filter_sd_maxcost(sd, max_cost, kocost, kicost):
"""Filter out strain designs that exceed the maximum allowed intervention costs
Returns:
(SDSolutions):
Strain design solutions complying with the intervention costs limit
"""
# eliminate mcs that are too expensive
if max_cost:
costs = [np.sum([kocost[k] if v < 0 else (kicost[k] if v > 0 else 0) for k, v in m.items()]) for m in sd]
sd = [sd[i] for i in range(len(sd)) if costs[i] <= max_cost + 1e-8]
# sort strain designs by intervention costs
[s.update({'**cost**': c}) for s, c in zip(sd, costs)]
sd.sort(key=lambda x: x.pop('**cost**'))
return sd
# compression function (mostly copied from efmtool)
[docs]def compress_model_parallel(model, protected_rxns=set()):
"""Compress model by lumping parallel reactions
Example:
cmp_mapReac = compress_model_parallel(model)
Args:
model (cobra.Model):
A metabolic model that is an instance of the cobra.Model class
Returns:
(dict):
A dict that contains information about the lumping done in the compression process.
E.g.: {'reaction_lumped1' : {'reaction_orig1' : 1 'reaction_orig2' : 1}, ...}
"""
#
# - exclude lumping of reactions with inhomogenous bounds
# - exclude protected reactions
old_num_reac = len(model.reactions)
old_objective = [r.objective_coefficient for r in model.reactions]
old_reac_ids = [r.id for r in model.reactions]
stoichmat_T = create_stoichiometric_matrix(model, 'lil').transpose()
factor = [d[0] if d else 1.0 for d in stoichmat_T.data]
A = (sparse.diags(factor) @ stoichmat_T)
lb = [r.lower_bound for r in model.reactions]
ub = [r.upper_bound for r in model.reactions]
fwd = sparse.lil_matrix([1. if (np.isinf(u) and f > 0 or np.isinf(l) and f < 0) else 0. for f, l, u in zip(factor, lb, ub)]).transpose()
rev = sparse.lil_matrix([1. if (np.isinf(l) and f > 0 or np.isinf(u) and f < 0) else 0. for f, l, u in zip(factor, lb, ub)]).transpose()
inh = sparse.lil_matrix([i+1 if not ((np.isinf(ub[i]) or ub[i] == 0) and (np.isinf(lb[i]) or lb[i] == 0)) \
else 0 for i in range(len(model.reactions))]).transpose()
A = sparse.hstack((A, fwd, rev, inh), 'csr')
# find equivalent/parallel reactions
subset_list = []
prev_found = []
protected = [True if r.id in protected_rxns else False for r in model.reactions]
hashes = [hash((tuple(A[i].indices), tuple(A[i].data))) for i in range(A.shape[0])]
for i in range(A.shape[0]):
if i in prev_found: # if reaction was already found to be identical to another one, skip.
continue
if protected[i]: # if protected, add 1:1 relationship, skip.
subset_list += [[i]]
continue
subset_i = [i]
for j in range(i + 1, A.shape[0]): # otherwise, identify parallel reactions
if not protected[j] and j not in prev_found:
# if np.all(A[i].indices == A[j].indices) and np.all(A[i].data == A[j].data):
if hashes[i] == hashes[j]:
subset_i += [j]
prev_found += [j]
if subset_i:
subset_list += [subset_i]
# lump parallel reactions (delete redundant)
del_rxns = [False] * len(model.reactions)
for rxn_idx in subset_list:
for i in range(1, len(rxn_idx)):
if len(model.reactions[rxn_idx[0]].id) + len(
model.reactions[rxn_idx[i]].id) < 220 and model.reactions[rxn_idx[0]].id[-3:] != '...':
model.reactions[rxn_idx[0]].id += '*' + model.reactions[rxn_idx[i]].id # combine names
elif not model.reactions[rxn_idx[0]].id[-3:] == '...':
model.reactions[rxn_idx[0]].id += '...'
del_rxns[rxn_idx[i]] = True
del_rxns = np.where(del_rxns)[0]
for i in range(len(del_rxns) - 1, -1, -1): # delete in reversed index order to keep indices valid
model.reactions[del_rxns[i]].remove_from_model(remove_orphans=True)
# create compression map
rational_map = {}
subT = np.zeros((old_num_reac, len(model.reactions)))
for i in range(subT.shape[1]):
for j in subset_list[i]:
subT[j, i] = 1
# rational_map is a dictionary that associates the new reaction with a dict of its original reactions and its scaling factors
rational_map.update({model.reactions[i].id: {old_reac_ids[j]: One() for j in subset_list[i]}})
new_objective = old_objective @ subT
for r, c in zip(model.reactions, new_objective):
r.objective_coefficient = c
return rational_map
[docs]def remove_blocked_reactions(model) -> List:
"""Remove blocked reactions from a network"""
blocked_reactions = [reac for reac in model.reactions if reac.bounds == (0, 0)]
model.remove_reactions(blocked_reactions)
return blocked_reactions
[docs]def remove_ext_mets(model):
"""Remove (unbalanced) external metabolites from the compartment External_Species"""
external_mets = [i for i, cpts in zip(model.metabolites, model.metabolites.list_attr("compartment")) if cpts == 'External_Species']
model.remove_metabolites(external_mets)
stoich_mat = create_stoichiometric_matrix(model)
obsolete_reacs = [reac for reac, b_rempty in zip(model.reactions, np.any(stoich_mat, 0)) if not b_rempty]
model.remove_reactions(obsolete_reacs)
[docs]def remove_conservation_relations(model):
"""Remove conservation relations in a model
This reduces the number of metabolites in a model while maintaining the
original flux space. This is a compression technique."""
stoich_mat = create_stoichiometric_matrix(model, array_type='lil')
basic_metabolites = efm.basic_columns_rat(stoich_mat.transpose().toarray(), tolerance=0)
dependent_metabolites = [model.metabolites[i].id for i in set(range(len(model.metabolites))) - set(basic_metabolites)]
for m in dependent_metabolites:
model.metabolites.get_by_id(m).remove_from_model()
# replace all stoichiometric coefficients with rationals.
[docs]def stoichmat_coeff2rational(model):
"""Convert coefficients from to stoichiometric matrix to rational numbers"""
num_reac = len(model.reactions)
for i in range(num_reac):
for k, v in model.reactions[i]._metabolites.items():
if isinstance(v, float) or isinstance(v, int):
if isinstance(v, int):
# v = int(v)
# n = int2jBigInteger(v)
# d = BigInteger.ONE
v = Rational(v) # for simplicity and actually slighlty faster (?)
else:
rational_conversion = 'base10' # This was formerly in the function parameters
v = nsimplify(v, rational=True, rational_conversion=rational_conversion)
# v = sympy.Rational(v)
model.reactions[i]._metabolites[k] = v # only changes coefficient in the model, not in the solver
elif not isinstance(v, Rational):
raise TypeError
# replace all stoichiometric coefficients with ints and floats
[docs]def stoichmat_coeff2float(model):
"""Convert coefficients from to stoichiometric matrix to floats"""
num_reac = len(model.reactions)
for i in range(num_reac):
for k, v in model.reactions[i]._metabolites.items():
if isinstance(v, float) or isinstance(v, int) or isinstance(v, Rational):
model.reactions[i]._metabolites[k] = float(v)
else:
raise Exception('unknown data type')
[docs]def modules_coeff2rational(sd_modules):
"""Convert coefficients occurring in SDModule objects to rational numbers"""
for i, module in enumerate(sd_modules):
for param in [CONSTRAINTS, INNER_OBJECTIVE, OUTER_OBJECTIVE, PROD_ID]:
if param in module and module[param] is not None:
if param == CONSTRAINTS:
for constr in module[CONSTRAINTS]:
for reac in constr[0].keys():
constr[0][reac] = nsimplify(constr[0][reac])
if param in [INNER_OBJECTIVE, OUTER_OBJECTIVE, PROD_ID]:
for reac in module[param].keys():
module[param][reac] = nsimplify(module[param][reac])
return sd_modules
[docs]def modules_coeff2float(sd_modules):
"""Convert coefficients occurring in SDModule objects to floats"""
for i, module in enumerate(sd_modules):
for param in [CONSTRAINTS, INNER_OBJECTIVE, OUTER_OBJECTIVE, PROD_ID]:
if param in module and module[param] is not None:
if param == CONSTRAINTS:
for constr in module[CONSTRAINTS]:
for reac in constr[0].keys():
constr[0][reac] = float(constr[0][reac])
if param in [INNER_OBJECTIVE, OUTER_OBJECTIVE, PROD_ID]:
for reac in module[param].keys():
module[param][reac] = float(module[param][reac])
return sd_modules
[docs]def remove_dummy_bounds(model):
"""Replace COBRA standard bounds with +/-inf
Retrieve the standard bounds from the COBRApy Configuration and replace model bounds
of the same value with +/-inf.
"""
cobra_conf = Configuration()
bound_thres = max((abs(cobra_conf.lower_bound), abs(cobra_conf.upper_bound)))
if any([any([abs(b) >= bound_thres for b in r.bounds]) for r in model.reactions]):
logging.warning(' Removing reaction bounds when larger than the cobra-threshold of ' + str(round(bound_thres)) + '.')
for i in range(len(model.reactions)):
if model.reactions[i].lower_bound <= -bound_thres:
model.reactions[i].lower_bound = -np.inf
if model.reactions[i].upper_bound >= bound_thres:
model.reactions[i].upper_bound = np.inf
[docs]def bound_blocked_or_irrevers_fva(model, solver=None):
"""Use FVA to determine the flux ranges. Use this information to update the model bounds
If flux ranges for a reaction are narrower than its bounds in the mode, these bounds can be omitted,
since other reactions must constrain the reaction flux. If (upper or lower) flux bounds are found to
be zero, the model bounds are updated to reduce the model complexity.
"""
# FVAs to identify blocked and irreversible reactions, as well as non-bounding bounds
flux_limits = fva(model)
if select_solver(solver) in [SCIP, GLPK]:
tol = 1e-10 # use tolerance for tightening problem bounds
else:
tol = 0.0
for (reac_id, limits) in flux_limits.iterrows():
r = model.reactions.get_by_id(reac_id)
# modify _lower_bound and _upper_bound to make changes permanent
if r.lower_bound < 0.0 and limits.minimum - tol > r.lower_bound:
r._lower_bound = -np.inf
if limits.minimum >= tol:
r._lower_bound = max([0.0, r._lower_bound])
if r.upper_bound > 0.0 and limits.maximum + tol < r.upper_bound:
r._upper_bound = np.inf
if limits.maximum <= -tol:
r._upper_bound = min([0.0, r._upper_bound])