#!/usr/bin/env python3
#
# Copyright 2022 Max Planck Insitute Magdeburg
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
"""A collection of functions for the LP-based analysis of metabolic networks"""
from cobra.core import Solution
from cobra.util import create_stoichiometric_matrix
from cobra import Configuration
from scipy import sparse
from scipy.spatial import ConvexHull
from math import log2
from straindesign import MILP_LP, parse_constraints, parse_linexpr, lineqlist2mat, linexpr2dict, \
linexprdict2mat, SDPool, IndicatorConstraints, avail_solvers
from re import search
from straindesign.names import *
from typing import Dict, Tuple
from pandas import DataFrame
from numpy import floor, sign, mod, nan, isnan, unique, inf, isinf, full, linspace, \
prod, array, mean, flip, ceil, floor, arctan2
from contextlib import redirect_stdout, redirect_stderr
from io import StringIO
import matplotlib.pyplot as plt
from matplotlib import use as set_matplotlib_backend
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
import logging
from straindesign.parse_constr import linexpr2mat, linexprdict2str
[docs]
def select_solver(solver=None, model=None) -> str:
"""Select a solver for subsequent MILP/LP computations
This function will determine the solver to be used for subsequend MILP/LP computations. If no
argument is provided, this function will try to determine the currently selected solver from the
COBRA configuration. If unavailable, the solver will be inferred from the packages available at
package initialization and one of the solvers will be picked and retured in the prioritized
order: 'glpk', 'cplex', 'gurobi', 'scip'
One may provide a solver or a model manually. This function then checks if the selected solver
is available, or else, if the solver indicated in the model is available. If yes, this function
returns the name of the solver as a str. If both arguments are specified, the function prefers
'solver' over 'model'.
Example:
solver = select_solver('cplex')
Args:
solver (optional (str)):
A user preferred solver, that should be checked for availability: 'glpk', 'cplex',
'gurobi' or 'scip'.
model (optional (cobra.Model)):
A metabolic model that is an instance of the cobra.Model class. The function will try to
dertermine the selected solver by accessing the field model.solver.
Returns:
(str):
The selected solver name as a str (one of the following: 'glpk', 'cplex', 'gurobi', 'scip').
"""
# first try to use selected solver
if solver:
if solver in avail_solvers:
return solver
else:
logging.warning('Selected solver ' + solver + ' not available. Using ' + list(avail_solvers)[0] + " instead.")
try:
# if no solver was defined, use solver specified in model
if hasattr(model, 'solver') and hasattr(model.solver, 'interface'):
solver = search('(' + '|'.join(avail_solvers) + ')', model.solver.interface.__name__)
if solver is not None:
return solver[0]
else:
logging.warning('Solver specified in model (' + model.solver.interface.__name__ + ') unavailable')
# if no solver specified in model, use solver from cobra configuration
cobra_conf = Configuration()
if hasattr(cobra_conf, 'solver'):
solver = search('(' + '|'.join(avail_solvers) + ')', cobra_conf.solver.__name__)
if solver is not None:
return solver[0]
else:
logging.warning('Solver specified in cobra config (' + cobra_conf.solver.__name__ + ') unavailable')
except:
pass
# if no solver is specified in cobra, fall back to list of available solvers and return the
# first one available.
return list(avail_solvers)[0]
[docs]
def idx2c(i, prev) -> 'list':
"""Helper function for parallel FVA
Builds the objective function for minimizing or maximizing the flux through the reaction
with the index floor(i / 2). If i is even, there is a maximization.
Args:
i (float):
An index between 0 and 2*num_reacs.
prev (optional (str)):
Index of the previously optimized reaction.
Returns:
(list):
An optimization vector.
"""
col = int(floor(i / 2))
sig = sign(mod(i, 2) - 0.5)
C = [[col, sig], [prev, 0.0]]
C_idx = [C[i][0] for i in range(len(C))]
C_idx = unique([C_idx.index(C_idx[i]) for i in range(len(C_idx))])
C = [C[i] for i in C_idx]
return C
def _fva_worker_cleanup():
"""Dispose the global LP and solver environment on worker exit."""
global lp_glob
try:
if lp_glob is not None and hasattr(lp_glob, 'solver'):
with redirect_stdout(StringIO()), redirect_stderr(StringIO()):
if lp_glob.solver == 'gurobi':
lp_glob.backend.dispose()
import gurobipy as gp
gp.disposeDefaultEnv()
elif lp_glob.solver == 'cplex':
lp_glob.backend.end()
lp_glob = None
except Exception:
pass
[docs]
def fva_worker_init(A_ineq, b_ineq, A_eq, b_eq, lb, ub, solver):
"""Helper function for parallel FVA
Initialize the LP that will be solved iteratively. Is executed on workers, not on main thread.
Args:
A_ineq, b_ineq, A_eq, b_eq, lb, ub:
The LP.
solver (str):
Solver to be used.
"""
global lp_glob
# redirect output to empty stream. Perhaps avoids some multithreading issues
with redirect_stdout(StringIO()), redirect_stderr(StringIO()):
lp_glob = MILP_LP(A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq, lb=lb, ub=ub, solver=solver)
if lp_glob.solver == 'cplex':
lp_glob.backend.parameters.threads.set(1)
#lp_glob.backend.parameters.lpmethod.set(1)
elif lp_glob.solver == 'gurobi':
lp_glob.backend.params.Threads = 1
lp_glob.prev = 0
# Register cleanup to properly release solver resources on worker exit
if solver in ('gurobi', 'cplex'):
import atexit
atexit.register(_fva_worker_cleanup)
[docs]
def fva_worker_compute(i) -> Tuple[int, float]:
"""Helper function for parallel FVA
Run a single LP as a step of FVA. Is executed on workers, not on main thread.
Args:
i (int):
Index of the computation step.
"""
global lp_glob
with redirect_stdout(StringIO()), redirect_stderr(StringIO()):
C = idx2c(i, lp_glob.prev)
if lp_glob.solver in ['cplex', 'gurobi']:
lp_glob.backend.set_objective_idx(C)
min_cx = lp_glob.backend.slim_solve()
else:
lp_glob.set_objective_idx(C)
min_cx = lp_glob.slim_solve()
lp_glob.prev = C[0][0]
return i, min_cx
# GLPK needs a workaround, because problems cannot be solved in a different thread
# which apparently happens with the multiprocess
[docs]
def fva_worker_init_glpk(A_ineq, b_ineq, A_eq, b_eq, lb, ub):
"""Helper function for parallel FVA
Initialize the LP for GLPK that will be solved iteratively. Is executed on workers, not on main thread.
Args:
A_ineq, b_ineq, A_eq, b_eq, lb, ub:
The LP.
"""
global lp_glob
lp_glob = {}
lp_glob['A_ineq'] = A_ineq
lp_glob['b_ineq'] = b_ineq
lp_glob['A_eq'] = A_eq
lp_glob['b_eq'] = b_eq
lp_glob['lb'] = lb
lp_glob['ub'] = ub
[docs]
def fva_worker_compute_glpk(i) -> Tuple[int, float]:
"""Helper function for parallel FVA
Run a single LP for GLPK as a step of FVA. Is executed on workers, not on main thread.
Args:
i (int):
Index of the computation step.
"""
global lp_glob
with redirect_stdout(StringIO()), redirect_stderr(StringIO()):
lp_i = MILP_LP(A_ineq=lp_glob['A_ineq'],
b_ineq=lp_glob['b_ineq'],
A_eq=lp_glob['A_eq'],
b_eq=lp_glob['b_eq'],
lb=lp_glob['lb'],
ub=lp_glob['ub'],
solver=GLPK)
col = int(floor(i / 2))
sig = sign(mod(i, 2) - 0.5)
lp_i.set_objective_idx([[col, sig]])
min_cx = lp_i.slim_solve()
return i, min_cx
[docs]
def fva(model, **kwargs) -> DataFrame:
"""Flux Variability Analysis (FVA)
Flux Variability Analysis determines the global flux ranges of reactions by minimizing and
maximizing the flux through all reactions of a given metabolic network. This FVA function
additionally allows the user to narrow down the flux states with additional constraints.
Uses an accelerated two-phase approach: global scan LPs with dual simplex
warm-start resolve ~50% of bounds cheaply, then individual LPs for the rest.
Large models (>= 200 reactions) are automatically compressed via coupled
reaction lumping. Multiprocessing is used when >= 1000 reactions and
cobra.Configuration().processes > 1.
Example:
flux_ranges = fva(model, constraints='EX_o2_e=0', solver='gurobi')
Args:
model (cobra.Model):
A metabolic model that is an instance of the cobra.Model class.
solver (optional (str)):
The solver that should be used for FVA.
constraints (optional (str) or (list of str) or (list of [dict,str,float])): (Default: '')
List of *linear* constraints to be applied on top of the model: signs + or -, scalar
factors for reaction rates, inclusive (in)equalities and a float value on the right hand
side. The parsing of the constraints input allows for some flexibility. Correct (and
identical) inputs are, for instance:
constraints='-EX_o2_e <= 5, ATPM = 20' or
constraints=['-EX_o2_e <= 5', 'ATPM = 20'] or
constraints=[[{'EX_o2_e':-1},'<=',5], [{'ATPM':1},'=',20]]
Returns:
(pandas.DataFrame):
A data frame containing the minimum and maximum attainable flux rates for all reactions.
"""
from straindesign.speedy_fva import speedy_fva
return speedy_fva(model, **kwargs)
[docs]
def fva_legacy(model, **kwargs) -> DataFrame:
"""Legacy FVA implementation (brute-force 2*n LPs, no scan/compression).
Kept as fallback for debugging. Use fva() for production.
"""
reaction_ids = model.reactions.list_attr("id")
numr = len(model.reactions)
if CONSTRAINTS in kwargs and kwargs[CONSTRAINTS]:
from straindesign.networktools import resolve_gene_constraints
kwargs[CONSTRAINTS] = resolve_gene_constraints(model, kwargs[CONSTRAINTS])
kwargs[CONSTRAINTS] = parse_constraints(kwargs[CONSTRAINTS], reaction_ids)
A_ineq, b_ineq, A_eq, b_eq = lineqlist2mat(kwargs[CONSTRAINTS], reaction_ids)
if SOLVER not in kwargs:
kwargs[SOLVER] = None
solver = select_solver(kwargs[SOLVER], model)
A_eq_base = sparse.csr_matrix(create_stoichiometric_matrix(model))
b_eq_base = [0] * len(model.metabolites)
if 'A_eq' in locals():
A_eq = sparse.vstack((A_eq_base, A_eq))
b_eq = b_eq_base + b_eq
else:
A_eq = A_eq_base
b_eq = b_eq_base
if 'A_ineq' not in locals():
A_ineq = sparse.csr_matrix((0, numr))
b_ineq = []
lb = [v.lower_bound for v in model.reactions]
ub = [v.upper_bound for v in model.reactions]
lp = MILP_LP(A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq, lb=lb, ub=ub, solver=solver)
_, _, status = lp.solve()
if status not in [OPTIMAL, UNBOUNDED]:
logging.info('FVA problem not feasible.')
return DataFrame(
{"minimum": [nan] * numr, "maximum": [nan] * numr},
index=reaction_ids,
)
processes = Configuration().processes
processes = min(processes, numr)
x = [nan] * 2 * numr
if processes > 1 and numr > 300:
with SDPool(processes, initializer=fva_worker_init,
initargs=(A_ineq, b_ineq, A_eq, b_eq, lb, ub, solver)) as pool:
chunk_size = len(reaction_ids) // processes
for i, value in pool.imap_unordered(fva_worker_compute, range(2 * numr),
chunksize=chunk_size):
x[i] = value
elif processes > 1 and numr > 500 and solver == GLPK:
with SDPool(processes, initializer=fva_worker_init_glpk,
initargs=(A_ineq, b_ineq, A_eq, b_eq, lb, ub)) as pool:
chunk_size = len(reaction_ids) // processes
for i, value in pool.imap_unordered(fva_worker_compute_glpk, range(2 * numr),
chunksize=chunk_size):
x[i] = value
else:
fva_worker_init(A_ineq, b_ineq, A_eq, b_eq, lb, ub, solver)
for i in range(2 * numr):
_, x[i] = fva_worker_compute(i)
# NaN retry
nan_remaining = [i for i in range(2 * numr) if isnan(x[i])]
if nan_remaining:
logging.warning(f'FVA: {len(nan_remaining)}/{2*numr} LP solves returned NaN, re-solving.')
_BATCH = 50
while nan_remaining:
lp_retry = MILP_LP(A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq,
lb=lb, ub=ub, solver=solver)
prev_retry = 0
for i in nan_remaining[:_BATCH]:
C = idx2c(i, prev_retry)
if solver in ('cplex', 'gurobi'):
lp_retry.backend.set_objective_idx(C)
x[i] = lp_retry.backend.slim_solve()
else:
lp_retry.set_objective_idx(C)
x[i] = lp_retry.slim_solve()
prev_retry = C[0][0]
old_count = len(nan_remaining)
nan_remaining = [i for i in nan_remaining if isnan(x[i])]
if len(nan_remaining) == old_count:
break
if nan_remaining:
for i in list(nan_remaining):
col = int(floor(i / 2))
sig = sign(mod(i, 2) - 0.5)
c_vec = [0.0] * numr
c_vec[col] = sig
lp_last = MILP_LP(c=c_vec, A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq,
lb=lb, ub=ub, solver=solver)
x[i] = lp_last.slim_solve()
nan_remaining = [i for i in nan_remaining if isnan(x[i])]
if nan_remaining:
logging.warning(f'FVA: {len(nan_remaining)} LP solves still NaN after all retries.')
x = [v if abs(v) >= 1e-11 else 0.0 for v in x]
return DataFrame(
{"minimum": [x[i] for i in range(1, 2 * numr, 2)],
"maximum": [-x[i] for i in range(0, 2 * numr, 2)]},
index=reaction_ids,
)
[docs]
def remove_redundant_bounds(model, **kwargs) -> DataFrame:
"""Remove non-binding bounds from a model using FVA.
Runs FVA and relaxes bounds that never bind at steady state:
- If fva_min > lb + tol: set lb = -inf (lower bound is not binding)
- If fva_max < ub - tol: set ub = +inf (upper bound is not binding)
Modifies the model IN-PLACE. Returns the FVA DataFrame.
Args:
model (cobra.Model):
A metabolic model. Modified in-place.
solver (optional (str)):
Solver for FVA.
constraints (optional):
Constraints passed through to fva().
compress (optional (bool)):
Compress before FVA (passed through).
threads (optional (int)):
Parallel threads for FVA (passed through).
tol (optional (float)): (Default: 1e-6)
Tolerance for considering a bound as binding.
Returns:
(pandas.DataFrame):
FVA results with 'minimum' and 'maximum' columns.
"""
tol = kwargs.pop('tol', 1e-6)
fva_result = fva(model, **kwargs)
for rxn in model.reactions:
fva_min = fva_result.loc[rxn.id, 'minimum']
fva_max = fva_result.loc[rxn.id, 'maximum']
if fva_min > rxn.lower_bound + tol:
rxn.lower_bound = -float('inf')
if fva_max < rxn.upper_bound - tol:
rxn.upper_bound = float('inf')
return fva_result
[docs]
def fba(model, **kwargs) -> Solution:
"""Flux Balance Analysis (FBA), parsimonius Flux Balance Analysis (pFBA),
Flux Balance Analysis optimizes a *linear objective function* in a space of steady-state
flux vectors given by a constraint-based metabolic model. FVA is often used to determine
the (stoichiometrically) maximal possible growth rate, or flux rate towards a particular
product. This FBA function allows to us a custom objective function and sense and
allows the user to narrow down the flux states with additional constraints. In addition,
one may use different types of parsimonious FBAs to either reduce the total sum of fluxes
or the total number of active reactions after the primary objective is optimized.
Example:
optim = fba(model, constraints='EX_o2_e=0', solver='gurobi', pfba=1)
Args:
model (cobra.Model):
A metabolic model that is an instance of the cobra.Model class. If no custom objective
function is provided, the model's objective function is retrieved from the fields
model.reactions[i].objective_coefficient.
solver (optional (str)):
The solver that should be used for FBA.
constraints (optional (str) or (list of str) or (list of [dict,str,float])): (Default: '')
List of *linear* constraints to be applied on top of the model: signs + or -, scalar
factors for reaction rates, inclusive (in)equalities and a float value on the right hand
side. The parsing of the constraints input allows for some flexibility. Correct (and
identical) inputs are, for instance:
constraints='-EX_o2_e <= 5, ATPM = 20' or
constraints=['-EX_o2_e <= 5', 'ATPM = 20'] or
constraints=[[{'EX_o2_e':-1},'<=',5], [{'ATPM':1},'=',20]]
obj (optional (str) or (dict)):
As a custom objective function, any linear expression can be used, either provided as a
single string or as a dict. Correct (and identical) inputs are, for instance:
inner_objective='BIOMASS_Ecoli_core_w_GAM'
inner_objective={'BIOMASS_Ecoli_core_w_GAM': 1}
obj_sense (optional (str)): (Default: 'maximize')
The optimization direction can be set either to 'maximize' (or 'max') or 'minimize' (or 'min').
pfba (optional (int)): (Default: 0)
The level of parsimonious FBA that should be applied. 0: no pFBA, only optmize the primary
objective, 1: minimize sum of fluxes after the primary objective is optimized, 2: minimize the
number of active reactions after the primary objective is optimized.
Returns:
(cobra.core.Solution):
A solution object that contains the objective value, an optimal flux vector and the optmization
status.
"""
from straindesign.networktools import resolve_gene_constraints
reaction_ids = model.reactions.list_attr("id")
if CONSTRAINTS in kwargs:
kwargs[CONSTRAINTS] = resolve_gene_constraints(model, kwargs[CONSTRAINTS])
kwargs[CONSTRAINTS] = parse_constraints(kwargs[CONSTRAINTS], reaction_ids)
A_ineq, b_ineq, A_eq, b_eq = lineqlist2mat(kwargs[CONSTRAINTS], reaction_ids)
else:
kwargs[CONSTRAINTS] = []
if 'obj' in kwargs and kwargs['obj'] is not None:
if type(kwargs['obj']) is str:
kwargs['obj'] = linexpr2dict(kwargs['obj'], reaction_ids)
c = linexprdict2mat(kwargs['obj'], reaction_ids).toarray()[0].tolist()
else:
c = [i.objective_coefficient for i in model.reactions]
if ('obj_sense' not in kwargs and model.objective_direction == 'max') or \
('obj_sense' in kwargs and kwargs['obj_sense'] not in ['min','minimize']):
obj_sense = 'maximize'
c = [-i for i in c]
else:
obj_sense = 'minimize'
if 'pfba' in kwargs:
pfba = kwargs['pfba']
else:
pfba = False
if SOLVER not in kwargs:
kwargs[SOLVER] = None
solver = select_solver(kwargs[SOLVER], model)
# prepare vectors and matrices
A_eq_base = create_stoichiometric_matrix(model)
A_eq_base = sparse.csr_matrix(A_eq_base)
b_eq_base = [0.0] * len(model.metabolites)
if 'A_eq' in locals():
A_eq = sparse.vstack((A_eq_base, A_eq))
b_eq = b_eq_base + b_eq
else:
A_eq = A_eq_base
b_eq = b_eq_base
if 'A_ineq' not in locals():
A_ineq = sparse.csr_matrix((0, len(model.reactions)))
b_ineq = []
lb = [v.lower_bound for v in model.reactions]
ub = [v.upper_bound for v in model.reactions]
# build LP
fba_prob = MILP_LP(c=c, A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq, lb=lb, ub=ub, solver=solver)
x, opt_cx, status = fba_prob.solve()
if obj_sense == 'minimize':
opt_cx = -opt_cx
if status == UNBOUNDED:
num_prob = MILP_LP(c=[-v for v in c], A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq, lb=lb, ub=ub, solver=solver)
min_cx = num_prob.slim_solve()
if min_cx <= 0 or isnan(min_cx):
num_prob.add_eq_constraints(c, [-1.0])
else:
num_prob.add_eq_constraints(c, min_cx)
x, _, _ = num_prob.solve()
elif status not in [OPTIMAL, UNBOUNDED]:
status = INFEASIBLE
if pfba and status == OPTIMAL: # for pfba, split all reversible reactions and minimize the total flux
numr = len(c)
if pfba == 2: # pfba mode 2 minimizes the number of active reactions
# fix optimal flux and do fva to get essential reactions and speed up minimization
kwargs_fva = kwargs.copy()
kwargs_fva[CONSTRAINTS].append([{reaction_ids[i]: c[i] for i in range(numr) if c[i] != 0}, '=', opt_cx])
fva_sol = fva(model, **kwargs_fva)
A_ineq_pfba2 = A_ineq.copy()
A_ineq_pfba2.resize(A_ineq.shape[0], 2 * numr)
A_eq_pfba2 = A_eq.copy()
A_eq_pfba2.resize(A_eq.shape[0], 2 * numr)
lb_pfba2 = lb + [0.0] * numr
# only set non-essential reactions as knockable
ub_pfba2 = ub + [0.0 if prod(sign(lim)) > 0 else 1.0 for _, lim in fva_sol.iterrows()]
c_pfba2 = [0.0] * numr + [-1.0] * numr
A_ic = sparse.csr_matrix(([1] * numr, ([j for j in range(numr)], [j for j in range(numr)])), [numr, 2 * numr])
ic = IndicatorConstraints([numr + j for j in range(numr)], A_ic, [0] * numr, 'E' * numr, [1.0] * numr)
pfba2_prob = MILP_LP(c=c_pfba2,
A_ineq=A_ineq_pfba2,
b_ineq=b_ineq,
A_eq=A_eq_pfba2,
b_eq=b_eq,
lb=lb_pfba2,
ub=ub_pfba2,
indic_constr=ic,
vtype='C' * numr + 'B' * numr,
solver=solver)
pfba2_prob.add_eq_constraints(c + [0] * numr, [opt_cx])
y, _, _ = pfba2_prob.solve()
zero_flux = [i for i, j in enumerate(range(numr, 2 * numr)) if y[j]]
lb = [l if i not in zero_flux else 0.0 for i, l in enumerate(lb)]
ub = [u if i not in zero_flux else 0.0 for i, u in enumerate(ub)]
A_ineq_pfba = sparse.hstack((A_ineq, -A_ineq))
A_eq_pfba = sparse.hstack((A_eq, -A_eq))
lb_pfba = [max((0, l)) for l in lb] + [max((0, -u)) for u in ub]
ub_pfba = [max((0, u)) for u in ub] + [max((0, -l)) for l in lb]
c_pfba = c + [-v for v in c]
pfba_prob = MILP_LP(c=[1.0] * 2 * numr,
A_ineq=A_ineq_pfba,
b_ineq=b_ineq,
A_eq=A_eq_pfba,
b_eq=b_eq,
lb=lb_pfba,
ub=ub_pfba,
solver=solver)
pfba_prob.add_eq_constraints(c_pfba, [opt_cx])
x, _, _ = pfba_prob.solve()
x = [x[i] - x[j] for i, j in enumerate(range(numr, 2 * numr))]
x = [v if abs(v) >= 1e-11 else 0.0 for v in x] # cut off for very small absolute values
fluxes = {reaction_ids[i]: x[i] for i in range(len(x))}
sol = Solution(objective_value=-opt_cx, status=status, fluxes=fluxes)
return sol
[docs]
def yopt(model, **kwargs) -> Solution:
"""Yield optmization (YOpt)
Yield optimization optimizes a *fractional objective function* in a space of steady-state
flux vectors given by a constraint-based metabolic model. Yield optimization employs linear
fractional programming, and is often utilized to determine the (stoichiometrically) maximal
possible product yield, that is, the fraction between the product exchange and the substrate
uptake flux. This function requires a custom fractional objective function specified by a
*linear* numerator and denominator terms. Coefficients in the linear numerator or denominator
expression can be used to optimize for carbon recovery, for instance:
objective: (3*pyruvate_ex)/(2*ac_up+6*glc_up)
The user may also specify the optimization sense. In addition, additional constraints can be
specified to narrow down the flux space.
Yield optimization can fail because of several reasons. Here is how the function reacts:
1. The model is infeasible:
The function returns infeasible with no flux vector
2. The denominator is fixed to zero:
The function returns infeasible with no flux vector
3. The numerator is unbounded:
The function returns unbounded with no flux vector
4. The denominator can become zero:
The function returns unbounded, and a flux vector is computed by fixing the
the denominator
Example:
optim = yopt(model, obj_num='2 EX_etoh_e', obj_den='-6 EX_glc__D_e', constraints='EX_o2_e=0')
Args:
model (cobra.Model):
A metabolic model that is an instance of the cobra.Model class.
obj_num ((str) or (dict)):
The numerator of the fractional objective function, provided as a linear expression,
either as a character string or as a dict. E.g.: obj_num='EX_prod_e' or
obj_num={'EX_prod_e': 1}
obj_den ((str) or (dict)):
The denominator of the fractional objective function, provided as a linear expression,
either as a character string or as a dict. E.g.: obj_num='1.0 EX_subst_e' or
obj_num={'EX_subst_e': 1}
obj_sense (optional (str)): (Default: 'maximize')
The optimization direction can be set either to 'maximize' (or 'max') or 'minimize' (or 'min').
solver (optional (str)):
The solver that should be used for YOpt.
constraints (optional (str) or (list of str) or (list of [dict,str,float])): (Default: '')
List of *linear* constraints to be applied on top of the model: signs + or -, scalar
factors for reaction rates, inclusive (in)equalities and a float value on the right hand
side. The parsing of the constraints input allows for some flexibility. Correct (and
identical) inputs are, for instance:
constraints='-EX_o2_e <= 5, ATPM = 20' or
constraints=['-EX_o2_e <= 5', 'ATPM = 20'] or
constraints=[[{'EX_o2_e':-1},'<=',5], [{'ATPM':1},'=',20]]
Returns:
(cobra.core.Solution):
A solution object that contains the objective value, an optimal flux vector and the optmization
status.
"""
reaction_ids = model.reactions.list_attr("id")
if 'obj_num' not in kwargs:
raise Exception('For a yield optimization, the numerator expression must be provided under the keyword "obj_num".')
else:
if type(kwargs['obj_num']) is not dict:
obj_num = linexpr2mat(kwargs['obj_num'], reaction_ids)
else:
obj_num = linexprdict2mat(kwargs['obj_num'], reaction_ids)
if 'obj_den' not in kwargs:
raise Exception('For a yield optimization, the denominator expression must be provided under the keyword "obj_den".')
else:
if type(kwargs['obj_den']) is not dict:
obj_den = linexpr2mat(kwargs['obj_den'], reaction_ids)
else:
obj_den = linexprdict2mat(kwargs['obj_den'], reaction_ids)
if 'obj_sense' not in kwargs or kwargs['obj_sense'] not in ['min', 'minimize']:
obj_sense = 'maximize'
else:
obj_sense = 'minimize'
obj_num = -obj_num
if CONSTRAINTS in kwargs:
kwargs[CONSTRAINTS] = parse_constraints(kwargs[CONSTRAINTS], reaction_ids)
A_ineq, b_ineq, A_eq, b_eq = lineqlist2mat(kwargs[CONSTRAINTS], reaction_ids)
if SOLVER not in kwargs:
kwargs[SOLVER] = None
solver = select_solver(kwargs[SOLVER], model)
# prepare vectors and matrices for base problem
A_eq_base = create_stoichiometric_matrix(model)
A_eq_base = sparse.csr_matrix(A_eq_base)
b_eq_base = [0] * len(model.metabolites)
if 'A_eq' in locals():
A_eq = sparse.vstack((A_eq_base, A_eq), 'csr')
b_eq = b_eq_base + b_eq
else:
A_eq = A_eq_base
b_eq = b_eq_base
if 'A_ineq' not in locals():
A_ineq = sparse.csr_matrix((0, len(model.reactions)))
b_ineq = []
# Integrate upper and lower bounds into A_ineq and b_ineq
real_lb = [i for i, v in enumerate(model.reactions.list_attr('lower_bound')) if not isinf(v)]
real_ub = [i for i, v in enumerate(model.reactions.list_attr('upper_bound')) if not isinf(v)]
sparse_lb = sparse.coo_matrix(([-1] * len(real_lb), (range(len(real_lb)), real_lb)), (len(real_lb), A_ineq.shape[1]))
sparse_ub = sparse.coo_matrix(([1] * len(real_ub), (range(len(real_ub)), real_ub)), (len(real_ub), A_ineq.shape[1]))
A_ineq = sparse.vstack((A_ineq, sparse_lb, sparse_ub), 'csr')
b_ineq = b_ineq + [-model.reactions[i].lower_bound for i in real_lb] + \
[ model.reactions[i].upper_bound for i in real_ub]
# Analyze maximum and minimum value of denominator function to decide whether to fix it to +1 or -1 or abort computation
den_sign = []
den_prob = MILP_LP(c=obj_den.todense().tolist()[0], A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq, solver=solver)
_, min_denx, status_i = den_prob.solve()
# check model feasibility
if status_i not in [OPTIMAL, UNBOUNDED]:
fluxes = {reaction_ids[i]: nan for i in range(len(reaction_ids))}
return Solution(objective_value=nan, status=INFEASIBLE, fluxes=fluxes)
# is minimum of denominator term negative?
if min_denx < 0:
den_sign += [-1]
# is maximum of denominator term positive?
den_prob.set_objective((-obj_den).todense().tolist()[0])
_, max_denx, _ = den_prob.solve()
if max_denx < 0:
den_sign += [1]
# is denominator fixed to zero
if not den_sign:
logging.error('Denominator term can only take the value 0. Yield computation impossible.')
fluxes = {reaction_ids[i]: nan for i in range(len(reaction_ids))}
return Solution(objective_value=nan, status=INFEASIBLE, fluxes=fluxes)
# Create linear fractional problem (LFP)
# A variable is added here to scale the right hand side of the original problem
A_ineq_lfp = sparse.hstack((A_ineq, sparse.csr_matrix([-b for b in b_ineq]).transpose()), 'csr')
b_ineq_lfp = [0 for _ in b_ineq]
A_eq_lfp = sparse.vstack(( sparse.hstack((A_eq,sparse.csr_matrix([-b for b in b_eq]).transpose())),\
sparse.hstack((obj_den,sparse.csr_matrix((1, 1))))),'csr')
opt_cx = inf
for d in den_sign:
b_eq_lfp = [0 for _ in b_eq] + [d]
# build LP
yopt_prob = MILP_LP(c=(-d * obj_num).todense().tolist()[0] + [0],
A_ineq=A_ineq_lfp,
b_ineq=b_ineq_lfp,
A_eq=A_eq_lfp,
b_eq=b_eq_lfp,
solver=solver)
x_i, opt_i, status_i = yopt_prob.solve()
if opt_i < opt_cx:
x = x_i
opt_cx = opt_i
status = status_i
if status is OPTIMAL:
factor = x[-1] # get factor from LFP
if factor == 0:
factor = 1
fluxes = {r: x[i] / factor for i, r in enumerate(reaction_ids)}
if obj_sense == 'maximize':
opt_cx = -opt_cx # correct sign (maximization)
sol = Solution(objective_value=opt_cx, status=status, fluxes=fluxes)
if x[-1] == 0:
sol.scalable = True
logging.info('Solution flux vector may be scaled with an arbitrary factor.')
else:
sol.scalable = False
return sol
elif status is UNBOUNDED:
opt_cx = nan
# check if numerator can be nonzero when denominator is zero
fct = 1 - 2 * (obj_sense == 'maximize')
den_prob.set_objective((fct * obj_num).todense().tolist()[0])
den_prob.add_eq_constraints(obj_den, [0])
x, max_num, status_i = den_prob.solve()
if isinf(max_num) or status_i == INFEASIBLE: # if numerator is still unbounded, generate a fixed solution
num_prob = MILP_LP(c=(-fct * obj_num).todense().tolist()[0], A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq, solver=solver)
min_num = num_prob.slim_solve()
if min_num <= 0:
num_prob.add_eq_constraints((-fct * obj_num).todense().tolist()[0], [1])
else:
num_prob.add_eq_constraints((-fct * obj_num).todense().tolist()[0], min_num)
x, opt_i, status_i = num_prob.solve()
if not isnan(max_num):
if max_num == 0:
num_prob = MILP_LP(c=(-fct * obj_num).todense().tolist()[0],
A_ineq=A_ineq,
b_ineq=b_ineq,
A_eq=A_eq,
b_eq=b_eq,
solver=solver)
num_prob.add_eq_constraints((obj_den).todense().tolist()[0], [0])
x, opt_i, status_i = num_prob.solve()
fluxes = {r: x[i] for i, r in enumerate(reaction_ids)}
logging.warning('Yield is undefined because denominator can become zero. Solution '\
'flux vector maximizes the numerator.')
sol = Solution(objective_value=opt_cx, status=status, fluxes=fluxes)
sol.scalable = False
else:
fluxes = {r: x[i] for i, r in enumerate(reaction_ids)}
logging.warning('Yield is undefined because denominator can become zero. Solution '\
'flux vector maximizes the numerator.')
sol = Solution(objective_value=opt_cx, status=status, fluxes=fluxes)
sol.scalable = False
return sol
else:
if obj_sense == 'maximize':
opt_cx = inf
else:
opt_cx = -inf
fluxes = {r: x[i] for i, r in enumerate(reaction_ids)}
logging.warning('Yield is infinite because the numerator is unbounded.')
sol = Solution(objective_value=opt_cx, status=status, fluxes=fluxes)
sol.scalable = True
return sol
else:
status = INFEASIBLE
def _make_fix_constraint(axes, ax_idx, ax_type, value):
"""Create an equality constraint fixing axis ax_idx to value."""
if ax_type == 'rate':
return [axes[ax_idx][0], '=', value]
else: # yield
merged = dict(axes[ax_idx][0])
for k, v in axes[ax_idx][1].items():
merged[k] = merged.get(k, 0) - v * value
return [merged, '=', 0]
def _optimize_axis(model, ax_idx, axes, ax_type, constraints, solver, sense):
"""Optimize one axis subject to constraints. Returns (value, status)."""
if ax_type == 'rate':
sol = fba(model, obj=axes[ax_idx][0], constraints=constraints, solver=solver, obj_sense=sense)
else: # yield
sol = yopt(model, obj_num=axes[ax_idx][0], obj_den=axes[ax_idx][1],
constraints=constraints, solver=solver, obj_sense=sense)
return sol
def _detect_degeneracy(val_limits, num_axes):
"""Classify solution space dimensionality based on axis ranges."""
degenerate = []
for vmin, vmax in val_limits:
scale = max(1.0, abs(vmin), abs(vmax))
degenerate.append(abs(vmax - vmin) < 1e-8 * scale)
n_degen = sum(degenerate)
if n_degen == num_axes:
return 'point', degenerate
elif n_degen == num_axes - 1:
return 'line', degenerate
elif n_degen == num_axes - 2:
return 'plane', degenerate
return 'full', degenerate
def _trace_polygon_rate_rate(model, axes, constraints, solver):
"""Trace the exact convex polygon boundary for rate-rate 2D plots.
Uses recursive normal-bisection: finds 4 initial extremes, then refines
each edge by optimizing along the outward normal. O(V) LPs for V vertices.
"""
ax0_coeff = axes[0][0]
ax1_coeff = axes[1][0]
def _fba_project(obj_dict):
sol = fba(model, obj=obj_dict, constraints=constraints, solver=solver, obj_sense='maximize')
if sol.status not in [OPTIMAL]:
return None
x0 = sum(c * sol.fluxes.get(r, 0) for r, c in ax0_coeff.items())
x1 = sum(c * sol.fluxes.get(r, 0) for r, c in ax1_coeff.items())
return (ceil_dec(x0, 9), ceil_dec(x1, 9))
# Step 1: Find 4 extremes
extremes = []
for coeff, sense in [(ax0_coeff, 'maximize'), (ax0_coeff, 'minimize'),
(ax1_coeff, 'maximize'), (ax1_coeff, 'minimize')]:
sol = fba(model, obj=coeff, constraints=constraints, solver=solver, obj_sense=sense)
if sol.status == OPTIMAL:
x0 = sum(c * sol.fluxes.get(r, 0) for r, c in ax0_coeff.items())
x1 = sum(c * sol.fluxes.get(r, 0) for r, c in ax1_coeff.items())
extremes.append((ceil_dec(x0, 9), ceil_dec(x1, 9)))
if len(extremes) < 2:
return extremes if extremes else [(0, 0)]
# Step 2: Deduplicate
tol = 1e-8
unique_pts = []
for p in extremes:
if not any(abs(p[0] - q[0]) < tol and abs(p[1] - q[1]) < tol for q in unique_pts):
unique_pts.append(p)
if len(unique_pts) == 1:
return unique_pts
# Step 3: Order counterclockwise via atan2
cx = sum(p[0] for p in unique_pts) / len(unique_pts)
cy = sum(p[1] for p in unique_pts) / len(unique_pts)
unique_pts.sort(key=lambda p: arctan2(p[1] - cy, p[0] - cx))
# Step 4: Recursive edge refinement
diameter = max(
((a[0]-b[0])**2 + (a[1]-b[1])**2)**0.5
for a in unique_pts for b in unique_pts
)
min_edge = 1e-10 * diameter if diameter > 0 else 1e-10
def _refine(vi, vj, depth):
if depth > 50:
return [vi]
edge_len = ((vi[0]-vj[0])**2 + (vi[1]-vj[1])**2)**0.5
if edge_len < min_edge:
return [vi]
# Outward normal (perpendicular to edge, pointing outward from centroid)
dx, dy = vj[0] - vi[0], vj[1] - vi[1]
nx, ny = dy, -dx # rotate 90 degrees
# Ensure normal points outward (away from centroid)
mid_x, mid_y = (vi[0] + vj[0]) / 2, (vi[1] + vj[1]) / 2
if nx * (mid_x - cx) + ny * (mid_y - cy) < 0:
nx, ny = -nx, -ny
# Combine into single LP objective
obj = {}
for r, c in ax0_coeff.items():
obj[r] = obj.get(r, 0) + nx * c
for r, c in ax1_coeff.items():
obj[r] = obj.get(r, 0) + ny * c
p_new = _fba_project(obj)
if p_new is None:
return [vi]
# Check if p_new is outside the edge (beyond tolerance)
# Project p_new onto edge normal direction
dist = nx * (p_new[0] - vi[0]) + ny * (p_new[1] - vi[1])
norm_len = (nx**2 + ny**2)**0.5
if norm_len > 0:
dist /= norm_len
if dist > tol and edge_len > min_edge:
# Check p_new is not a duplicate of vi or vj
if ((abs(p_new[0]-vi[0]) < tol and abs(p_new[1]-vi[1]) < tol) or
(abs(p_new[0]-vj[0]) < tol and abs(p_new[1]-vj[1]) < tol)):
return [vi]
left = _refine(vi, p_new, depth + 1)
right = _refine(p_new, vj, depth + 1)
return left + right
return [vi]
# Refine all edges of the polygon
refined = []
n = len(unique_pts)
for i in range(n):
vi = unique_pts[i]
vj = unique_pts[(i + 1) % n]
refined.extend(_refine(vi, vj, 0))
# Close polygon
if refined and refined[0] != refined[-1]:
refined.append(refined[0])
return refined
def _trace_boundary_adaptive(model, axes, ax_types, constraints, solver, max_depth=15):
"""Trace upper and lower boundaries adaptively for yield or mixed axes.
Returns (upper_boundary, lower_boundary) as sorted lists of (x, y) tuples.
Uses recursive midpoint refinement where linear interpolation error exceeds tolerance.
"""
def _fix_and_opt(x_val, sense):
constr = constraints.copy()
constr.append(_make_fix_constraint(axes, 0, ax_types[0], x_val))
sol = _optimize_axis(model, 1, axes, ax_types[1], constr, solver, sense)
if sol.status in [OPTIMAL]:
return ceil_dec(sol.objective_value, 9) if sense == 'minimize' else floor_dec(sol.objective_value, 9)
return nan
# Step 1: axis-0 range endpoints (already known from val_limits, but we need y values)
x_min, x_max = ceil_dec(
_optimize_axis(model, 0, axes, ax_types[0], constraints, solver, 'minimize').objective_value, 8
), floor_dec(
_optimize_axis(model, 0, axes, ax_types[0], constraints, solver, 'maximize').objective_value, 8
)
# Step 2: y values at endpoints
y_min_at_xmin = _fix_and_opt(x_min, 'minimize')
y_max_at_xmin = _fix_and_opt(x_min, 'maximize')
y_min_at_xmax = _fix_and_opt(x_max, 'minimize')
y_max_at_xmax = _fix_and_opt(x_max, 'maximize')
# Upper boundary
upper = [(x_min, y_max_at_xmin), (x_max, y_max_at_xmax)]
upper = [(x, y) for x, y in upper if not isnan(y)]
# Lower boundary
lower = [(x_min, y_min_at_xmin), (x_max, y_min_at_xmax)]
lower = [(x, y) for x, y in lower if not isnan(y)]
if not upper and not lower:
return [], []
# Compute y_range for tolerance
all_y = [y for _, y in upper + lower]
y_range = max(all_y) - min(all_y) if all_y else 0
abs_tol = max(1e-6, 1e-3 * abs(y_range))
def _refine_boundary(boundary, sense, depth):
if depth > max_depth or len(boundary) < 2:
return boundary
new_boundary = [boundary[0]]
changed = False
for i in range(len(boundary) - 1):
x_left, y_left = boundary[i]
x_right, y_right = boundary[i + 1]
if abs(x_right - x_left) < 1e-10:
new_boundary.append(boundary[i + 1])
continue
x_mid = (x_left + x_right) / 2
y_actual = _fix_and_opt(x_mid, sense)
if isnan(y_actual):
new_boundary.append(boundary[i + 1])
continue
y_interp = (y_left + y_right) / 2
if abs(y_actual - y_interp) > abs_tol:
new_boundary.append((x_mid, y_actual))
changed = True
new_boundary.append(boundary[i + 1])
if changed:
return _refine_boundary(new_boundary, sense, depth + 1)
return new_boundary
upper = _refine_boundary(upper, 'maximize', 0)
lower = _refine_boundary(lower, 'minimize', 0)
return upper, lower
def _trace_polytope_3d_rate(model, axes, constraints, solver):
"""Trace exact 3D convex polytope for all-rate axes via face-normal refinement.
Finds 6 initial extremes, builds convex hull, then iteratively refines
by optimizing along each face's outward normal. Converges when no face
produces a new vertex.
"""
coeffs = [axes[i][0] for i in range(3)]
tol = 1e-8
def _project(sol):
return tuple(
ceil_dec(sum(c * sol.fluxes.get(r, 0) for r, c in coeffs[j].items()), 9)
for j in range(3)
)
def _is_dup(pt, pts):
return any(all(abs(pt[k] - q[k]) < tol for k in range(3)) for q in pts)
# Find 6 extremes (max/min of each axis)
vertices = []
for i in range(3):
for sense in ['maximize', 'minimize']:
sol = fba(model, obj=coeffs[i], constraints=constraints, solver=solver, obj_sense=sense)
if sol.status == OPTIMAL:
pt = _project(sol)
if not _is_dup(pt, vertices):
vertices.append(pt)
if len(vertices) < 4:
return [array(v) for v in vertices], []
# Iterative face-normal refinement
for _ in range(20):
try:
hull = ConvexHull(array(vertices))
except Exception:
break
new_found = False
for eq in hull.equations:
normal = eq[:3] # outward-pointing face normal from ConvexHull
obj = {}
for k in range(3):
for r, c in coeffs[k].items():
obj[r] = obj.get(r, 0) + normal[k] * c
sol = fba(model, obj=obj, constraints=constraints, solver=solver, obj_sense='maximize')
if sol.status != OPTIMAL:
continue
p_new = _project(sol)
if not _is_dup(p_new, vertices):
vertices.append(p_new)
new_found = True
if not new_found:
break
# Final hull — extract triangles and merged polygon faces
try:
hull = ConvexHull(array(vertices))
triang = hull.simplices.tolist()
face_polys = _hull_face_polygons(hull)
except Exception:
triang = []
face_polys = None
return [array(v) for v in vertices], triang, face_polys
def _hull_face_polygons(hull):
"""Merge coplanar ConvexHull simplices into polygon faces.
Returns a list of faces, each face being a list of vertex indices
ordered counterclockwise (viewed from outside).
"""
from collections import defaultdict
# Group simplices by face equation (rounded for coplanarity check)
face_groups = defaultdict(set)
for i, eq in enumerate(hull.equations):
key = tuple(round(v, 5) for v in eq)
for vi in hull.simplices[i]:
face_groups[key].add(vi)
faces = []
for eq_key, vertex_indices in face_groups.items():
verts = list(vertex_indices)
if len(verts) < 3:
continue
normal = array(eq_key[:3])
pts = hull.points[verts]
centroid = pts.mean(axis=0)
# Build orthonormal basis in the face plane
ref = array([1, 0, 0]) if abs(normal[0]) < 0.9 else array([0, 1, 0])
u = ref - normal * normal.dot(ref)
u = u / (u.dot(u) ** 0.5)
v = array([normal[1]*u[2] - normal[2]*u[1],
normal[2]*u[0] - normal[0]*u[2],
normal[0]*u[1] - normal[1]*u[0]])
# Sort by angle in face plane
angles = [arctan2((pt - centroid).dot(v), (pt - centroid).dot(u)) for pt in pts]
order = sorted(range(len(verts)), key=lambda i: angles[i])
faces.append([verts[i] for i in order])
return faces
def _trace_3d_slice_polygon(model, axes, ax_type, val_limits, constraints, solver, points):
"""3D surface for 1-yield + 2-rate: slice along yield axis, trace rate-rate polygon per slice."""
yield_idx = next(i for i, t in enumerate(ax_type) if t == 'yield')
rate_indices = [i for i, t in enumerate(ax_type) if t == 'rate']
y_space = linspace(val_limits[yield_idx][0], val_limits[yield_idx][1], num=points).tolist()
datapoints = []
slice_idx_lists = []
for y_val in y_space:
constr = constraints.copy()
constr.append(_make_fix_constraint(axes, yield_idx, ax_type[yield_idx], y_val))
sub_axes = [axes[rate_indices[0]], axes[rate_indices[1]]]
polygon_2d = _trace_polygon_rate_rate(model, sub_axes, constr, solver)
if not polygon_2d or len(polygon_2d) < 2:
continue
# Remove closing duplicate if present
if len(polygon_2d) > 1 and polygon_2d[0] == polygon_2d[-1]:
polygon_2d = polygon_2d[:-1]
if not polygon_2d:
continue
# Align polygons: rotate to start at max rate_indices[0] value
max_i = max(range(len(polygon_2d)), key=lambda i: (polygon_2d[i][0], polygon_2d[i][1]))
polygon_2d = polygon_2d[max_i:] + polygon_2d[:max_i]
# Convert 2D → 3D
idx_list = []
for pt in polygon_2d:
p3d = [0.0, 0.0, 0.0]
p3d[rate_indices[0]] = pt[0]
p3d[rate_indices[1]] = pt[1]
p3d[yield_idx] = y_val
idx_list.append(len(datapoints))
datapoints.append(array(p3d))
slice_idx_lists.append(idx_list)
if not slice_idx_lists:
return [], [], []
# Stitch adjacent slice polygons (close the ring by appending first index)
triang = []
for s in range(len(slice_idx_lists) - 1):
strip_a = slice_idx_lists[s] + [slice_idx_lists[s][0]]
strip_b = slice_idx_lists[s + 1] + [slice_idx_lists[s + 1][0]]
_triangulate_strips(strip_a, strip_b, datapoints, triang)
# Cap first and last slices (fan triangulation)
first = slice_idx_lists[0]
if len(first) >= 3:
for i in range(1, len(first) - 1):
triang.append([first[0], first[i + 1], first[i]])
last = slice_idx_lists[-1]
if len(last) >= 3:
for i in range(1, len(last) - 1):
triang.append([last[0], last[i], last[i + 1]])
return datapoints, triang, slice_idx_lists
def _triangulate_strips(strip_a, strip_b, datapoints, triang, flip_winding=False):
"""Connect two ordered index strips into triangles (for 3D mesh construction).
Walks both strips simultaneously, advancing whichever side has the shorter
diagonal, creating a triangle strip between two contours.
"""
if len(strip_a) < 1 or len(strip_b) < 1:
return
i, j = 0, 0
while i < len(strip_a) - 1 or j < len(strip_b) - 1:
if i >= len(strip_a) - 1:
tri = [strip_a[i], strip_b[j], strip_b[j + 1]]
j += 1
elif j >= len(strip_b) - 1:
tri = [strip_a[i], strip_b[j], strip_a[i + 1]]
i += 1
else:
# Choose shorter diagonal
pa_next = datapoints[strip_a[i + 1]]
pb_next = datapoints[strip_b[j + 1]]
pa_curr = datapoints[strip_a[i]]
pb_curr = datapoints[strip_b[j]]
d1 = sum((pa_next[k] - pb_curr[k])**2 for k in range(len(pa_next)))
d2 = sum((pa_curr[k] - pb_next[k])**2 for k in range(len(pa_curr)))
if d1 < d2:
tri = [strip_a[i], strip_b[j], strip_a[i + 1]]
i += 1
else:
tri = [strip_a[i], strip_b[j], strip_b[j + 1]]
j += 1
if flip_winding:
tri = [tri[0], tri[2], tri[1]]
triang.append(tri)
[docs]
def plot_flux_space(model, axes, **kwargs) -> Tuple[list, list, list]:
"""Plot projections of the space of steady-state flux vectors onto two or three dimensions.
This function uses LP and matplotlib to generate lower dimensional representations of the
flux space. Custom *linear* or *fractional-linear* expressions can be used for the plot
axis. The most commonly used flux space reprentations are the *production envelope* that
plots the growth rate (x) vs the product synthesis rate (y) and the *yield space plot*
that plots the biomass yield (x) vs the product yiel (y). One may specify additional
constraints to investigate subspaces of the metabolic steady-state flux space.
Example:
plot_flux_space(model,('BIOMASS_Ecoli_core_w_GAM','EX_etoh_e'))
Args:
model (cobra.Model):
A metabolic model that is an instance of the cobra.Model class.
axes ((list of lists) or (list of str)):
A set of linear expressions that specify which reactions/expressions/dimensions should be
used on the axis. Examples: axes=['BIOMASS_Ecoli_core_w_GAM','EX_etoh_e'] or
axes=[['BIOMASS_Ecoli_core_w_GAM','-EX_glc_e'],['EX_etoh_e','-EX_glc_e']] or
axes=[['BIOMASS_Ecoli_core_w_GAM'],['EX_etoh_e','-EX_glc_e']]
solver (optional (str)):
The solver that should be used for scanning the flux space.
constraints (optional (str) or (list of str) or (list of [dict,str,float])): (Default: '')
List of *linear* constraints to be applied on top of the model: signs + or -, scalar
factors for reaction rates, inclusive (in)equalities and a float value on the right hand
side. The parsing of the constraints input allows for some flexibility. Correct (and
identical) inputs are, for instance:
constraints='-EX_o2_e <= 5, ATPM = 20' or
constraints=['-EX_o2_e <= 5', 'ATPM = 20'] or
constraints=[[{'EX_o2_e':-1},'<=',5], [{'ATPM':1},'=',20]]
plt_backend (optional (str)):
The matplotlib backend that should be used for plotting:
interactive backends: GTK3Agg, GTK3Cairo, GTK4Agg, GTK4Cairo, MacOSX, nbAgg, QtAgg, QtCairo,
TkAgg, TkCairo, WebAgg, WX, WXAgg, WXCairo, Qt5Agg, Qt5Cairo
non-interactive backends: agg, cairo, pdf, pgf, ps, svg, template
show (optional (bool)): (Default: True)
Should matplotlib show the plot or should it stop after plot generation. show=False can
be useful if flux spaces should be plotted and saved in a non-interactive environment,
multiple flux spaces should be plotted at once or the plot should be modified before been shown.
cmap (optional (str)): (Default: 'managua')
The matplotlib colormap used for 3D face coloring.
points (optional (int)): (Default: 25 (3D) or 40 (2D))
Controls resolution of non-exact (approximate) plot regions. For rate-only axes, boundary
tracing finds exact vertices and this parameter is ignored. For yield axes, this controls:
- 2D yield plots: max refinement depth = max(5, log2(points))
- 3D with 1 yield axis: number of slices along the yield axis
- 3D with 2+ yield axes: number of grid intervals per axis
Returns:
(Tuple):
(datapoints, triang, plot1). The array of datapoints from which the plot was generated. These
datapoints are optimal values for different optimizations within the flux space. The triang
variable contains information about which datapoints need to be connected in triangles to
render a closed surface. The last variable contains the matplotlib object.
"""
reaction_ids = model.reactions.list_attr("id")
if CONSTRAINTS in kwargs:
kwargs[CONSTRAINTS] = parse_constraints(kwargs[CONSTRAINTS], reaction_ids)
else:
kwargs[CONSTRAINTS] = []
if SOLVER not in kwargs:
kwargs[SOLVER] = None
solver = select_solver(kwargs[SOLVER], model)
if 'plt_backend' in kwargs:
set_matplotlib_backend(kwargs['plt_backend'])
if 'show' not in kwargs:
show = True
else:
show = kwargs['show']
cmap = kwargs.get('cmap', 'managua')
axes = [list(ax) if not isinstance(ax, str) else [ax] for ax in axes] # cast to list of lists
num_axes = len(axes)
if num_axes not in [2, 3]:
raise Exception('Please define 2 or 3 axes as a list of tuples [ax1, ax2, (optional) ax3] with ax1 = (den,num).\n'+\
'"den" and "num" being linear expressions.')
if 'points' in kwargs:
points = kwargs['points']
else:
if num_axes == 2:
points = 40
else:
points = 25
ax_name = ["" for _ in range(num_axes)]
ax_limits = [(nan, nan) for _ in range(num_axes)]
val_limits = [(nan, nan) for _ in range(num_axes)]
ax_type = ["" for _ in range(num_axes)]
for i, ax in enumerate(axes):
if len(ax) == 1:
ax_type[i] = 'rate'
elif len(ax) == 2:
ax_type[i] = 'yield'
if ax_type[i] == 'rate':
ax[0] = parse_linexpr(ax[0], reaction_ids)[0]
ax_name[i] = linexprdict2str(ax[0])
sol_min = fba(model, obj=ax[0], constraints=kwargs[CONSTRAINTS], solver=solver, obj_sense='minimize')
sol_max = fba(model, obj=ax[0], constraints=kwargs[CONSTRAINTS], solver=solver, obj_sense='maximize')
# abort if any of the fluxes are unbounded or undefined
inval = [i + 1 for i, v in enumerate([sol_min, sol_max]) if v.status == UNBOUNDED or v.status == INFEASIBLE]
if any(inval):
raise Exception('One of the specified reactions is unbounded or problem is infeasible. Plot cannot be generated.')
elif ax_type[i] == 'yield':
ax[0] = parse_linexpr(ax[0], reaction_ids)[0]
ax[1] = parse_linexpr(ax[1], reaction_ids)[0]
ax_name[i] = '(' + linexprdict2str(ax[0]) + ') / (' + linexprdict2str(ax[1]) + ')'
sol_min = yopt(model, obj_num=ax[0], obj_den=ax[1], constraints=kwargs[CONSTRAINTS], solver=solver, obj_sense='minimize')
sol_max = yopt(model, obj_num=ax[0], obj_den=ax[1], constraints=kwargs[CONSTRAINTS], solver=solver, obj_sense='maximize')
# abort if any of the yields are unbounded or undefined
inval = [i + 1 for i, v in enumerate([sol_min, sol_max]) if v.status == UNBOUNDED or v.status == INFEASIBLE]
if any(inval):
raise Exception('One of the specified yields is unbounded or undefined or problem is infeasible. Plot cannot be generated.')
val_limits[i] = [ceil_dec(sol_min.objective_value, 8), floor_dec(sol_max.objective_value, 8)]
ax_limits[i] = [min((0, val_limits[i][0])), max((0, val_limits[i][1]))]
# Ensure non-singular axis limits (avoid identical low/high)
if ax_limits[i][0] == ax_limits[i][1]:
pad = max(0.5, abs(ax_limits[i][0]) * 0.1)
ax_limits[i] = [ax_limits[i][0] - pad, ax_limits[i][1] + pad]
# Detect degeneracy
degen, degen_axes = _detect_degeneracy(val_limits, num_axes)
if num_axes == 2:
# === 2D dispatch ===
if degen == 'point':
plot1 = plt.plot([val_limits[0][0]], [val_limits[1][0]], 'o')[0]
plot1.axes.set_xlabel(ax_name[0])
plot1.axes.set_ylabel(ax_name[1])
plot1.axes.set_xlim(ax_limits[0][0] * 1.05, ax_limits[0][1] * 1.05)
plot1.axes.set_ylim(ax_limits[1][0] * 1.05, ax_limits[1][1] * 1.05)
if show:
try:
plt.show()
except UserWarning as e:
if 'FigureCanvasTemplate is non-interactive' in str(e):
logging.warning('warning: Interactive plot not supported in current execution environment.')
return [[val_limits[0][0], val_limits[1][0]]], [], plot1
if degen == 'line':
# Determine which axis is degenerate and trace the other
if degen_axes[0]:
# x is fixed, trace y range
x_pts = [val_limits[0][0], val_limits[0][0]]
y_pts = [val_limits[1][0], val_limits[1][1]]
else:
# y is fixed, trace x range
x_pts = [val_limits[0][0], val_limits[0][1]]
y_pts = [val_limits[1][0], val_limits[1][0]]
plot1 = plt.plot(x_pts, y_pts, linewidth=1.5)[0]
plot1.axes.set_xlabel(ax_name[0])
plot1.axes.set_ylabel(ax_name[1])
plot1.axes.set_xlim(ax_limits[0][0] * 1.05, ax_limits[0][1] * 1.05)
plot1.axes.set_ylim(ax_limits[1][0] * 1.05, ax_limits[1][1] * 1.05)
if show:
try:
plt.show()
except UserWarning as e:
if 'FigureCanvasTemplate is non-interactive' in str(e):
logging.warning('warning: Interactive plot not supported in current execution environment.')
datapoints = [[x, y] for x, y in zip(x_pts, y_pts)]
return datapoints, [], plot1
# Full 2D: choose algorithm based on axis types
if all(t == 'rate' for t in ax_type):
vertices = _trace_polygon_rate_rate(model, axes, kwargs[CONSTRAINTS], solver)
else:
adapt_depth = max(5, int(log2(max(points, 2))))
upper, lower = _trace_boundary_adaptive(
model, axes, ax_type, kwargs[CONSTRAINTS], solver, max_depth=adapt_depth)
# Build polygon from upper (left-to-right) + reversed lower (right-to-left)
if upper and lower:
vertices = upper + list(reversed(lower))
elif upper:
vertices = upper
elif lower:
vertices = lower
else:
vertices = []
# Close polygon if endpoints don't match
if len(vertices) > 1 and vertices[0] != vertices[-1]:
vertices.append(vertices[0])
if not vertices:
raise Exception('Could not trace any boundary. Problem may be infeasible.')
# Check for collinear degeneracy (polygon traced to a line or point)
unique_verts = []
for v in vertices:
if not any(abs(v[0] - u[0]) < 1e-10 and abs(v[1] - u[1]) < 1e-10 for u in unique_verts):
unique_verts.append(v)
is_collinear = False
if len(unique_verts) <= 2:
is_collinear = True
elif len(unique_verts) >= 3:
# Check if all points lie on a single line
x0, y0 = unique_verts[0]
dx, dy = unique_verts[1][0] - x0, unique_verts[1][1] - y0
span = max(abs(dx), abs(dy), 1e-12)
is_collinear = all(
abs((v[0] - x0) * dy - (v[1] - y0) * dx) / span < 1e-6
for v in unique_verts[2:])
if is_collinear and len(unique_verts) <= 1:
# Collapsed to a point
plot1 = plt.plot([unique_verts[0][0]], [unique_verts[0][1]], 'o')[0]
plot1.axes.set_xlabel(ax_name[0])
plot1.axes.set_ylabel(ax_name[1])
plot1.axes.set_xlim(ax_limits[0][0] * 1.05, ax_limits[0][1] * 1.05)
plot1.axes.set_ylim(ax_limits[1][0] * 1.05, ax_limits[1][1] * 1.05)
if show:
try:
plt.show()
except UserWarning as e:
if 'FigureCanvasTemplate is non-interactive' in str(e):
logging.warning('warning: Interactive plot not supported in current execution environment.')
return [list(unique_verts[0])], [], plot1
if is_collinear:
# Collapsed to a line — sort along the line direction and draw
x0, y0 = unique_verts[0]
dx, dy = unique_verts[-1][0] - x0, unique_verts[-1][1] - y0
unique_verts.sort(key=lambda v: (v[0] - x0) * dx + (v[1] - y0) * dy)
x_pts = [v[0] for v in unique_verts]
y_pts = [v[1] for v in unique_verts]
plot1 = plt.plot(x_pts, y_pts, linewidth=1.5)[0]
plot1.axes.set_xlabel(ax_name[0])
plot1.axes.set_ylabel(ax_name[1])
plot1.axes.set_xlim(ax_limits[0][0] * 1.05, ax_limits[0][1] * 1.05)
plot1.axes.set_ylim(ax_limits[1][0] * 1.05, ax_limits[1][1] * 1.05)
if show:
try:
plt.show()
except UserWarning as e:
if 'FigureCanvasTemplate is non-interactive' in str(e):
logging.warning('warning: Interactive plot not supported in current execution environment.')
datapoints = [[x, y] for x, y in zip(x_pts, y_pts)]
return datapoints, [], plot1
# Build datapoints and triangulation for return value
datapoints = [[v[0], v[1]] for v in vertices]
# Fan triangulation (for return compatibility)
n_v = len(vertices)
if n_v > 2 and vertices[0] == vertices[-1]:
n_v -= 1 # don't count the closing duplicate
triang = [[0, i, i + 1] for i in range(1, n_v - 1)] if n_v >= 3 else []
# Plot
x = [v[0] for v in vertices]
y = [v[1] for v in vertices]
plot1 = plt.fill(x, y, linewidth=0.5)[0]
plot1.axes.set_xlabel(ax_name[0])
plot1.axes.set_ylabel(ax_name[1])
plot1.axes.set_xlim(ax_limits[0][0] * 1.05, ax_limits[0][1] * 1.05)
plot1.axes.set_ylim(ax_limits[1][0] * 1.05, ax_limits[1][1] * 1.05)
if any(t == 'yield' for t in ax_type):
plot1.axes.set_title('approximate', fontsize=8, color='gray')
if show:
try:
plt.show()
except UserWarning as e:
if 'FigureCanvasTemplate is non-interactive' in str(e):
logging.warning('warning: Interactive plot not supported in current execution environment.')
return datapoints, triang, plot1
elif num_axes == 3:
# === 3D dispatch ===
if degen == 'point':
ax3 = plt.figure().add_subplot(projection='3d')
ax3.scatter([val_limits[0][0]], [val_limits[1][0]], [val_limits[2][0]], s=50)
ax3.set_xlabel(ax_name[0])
ax3.set_ylabel(ax_name[1])
ax3.set_zlabel(ax_name[2])
ax3.set_xlim(ax_limits[0])
ax3.set_ylim(ax_limits[1])
ax3.set_zlim(ax_limits[2])
if show:
try:
plt.show()
except UserWarning as e:
if 'FigureCanvasTemplate is non-interactive' in str(e):
logging.warning('warning: Interactive plot not supported in current execution environment.')
return [[val_limits[0][0], val_limits[1][0], val_limits[2][0]]], [], ax3
if degen == 'line':
# Find the non-degenerate axis and trace it
free_ax = [i for i, d in enumerate(degen_axes) if not d][0]
pts_3d = []
for val in [val_limits[free_ax][0], val_limits[free_ax][1]]:
pt = [val_limits[i][0] for i in range(3)]
pt[free_ax] = val
pts_3d.append(pt)
ax3 = plt.figure().add_subplot(projection='3d')
ax3.plot3D([p[0] for p in pts_3d], [p[1] for p in pts_3d], [p[2] for p in pts_3d], linewidth=1.5)
ax3.set_xlabel(ax_name[0])
ax3.set_ylabel(ax_name[1])
ax3.set_zlabel(ax_name[2])
ax3.set_xlim(ax_limits[0])
ax3.set_ylim(ax_limits[1])
ax3.set_zlim(ax_limits[2])
if show:
try:
plt.show()
except UserWarning as e:
if 'FigureCanvasTemplate is non-interactive' in str(e):
logging.warning('warning: Interactive plot not supported in current execution environment.')
return pts_3d, [], ax3
if degen == 'plane':
# One axis is degenerate — trace 2D boundary in the free plane
free_axes = [i for i, d in enumerate(degen_axes) if not d]
fixed_axis = [i for i, d in enumerate(degen_axes) if d][0]
fixed_val = val_limits[fixed_axis][0]
constr_plane = kwargs[CONSTRAINTS].copy()
constr_plane.append(_make_fix_constraint(axes, fixed_axis, ax_type[fixed_axis], fixed_val))
sub_axes = [axes[free_axes[0]], axes[free_axes[1]]]
sub_types = [ax_type[free_axes[0]], ax_type[free_axes[1]]]
if all(t == 'rate' for t in sub_types):
polygon_2d = _trace_polygon_rate_rate(model, sub_axes, constr_plane, solver)
else:
upper, lower = _trace_boundary_adaptive(model, sub_axes, sub_types, constr_plane, solver)
polygon_2d = (upper + list(reversed(lower))) if upper and lower else (upper or lower)
if len(polygon_2d) > 1 and polygon_2d[0] != polygon_2d[-1]:
polygon_2d.append(polygon_2d[0])
datapoints = []
for pt in polygon_2d:
p3d = [0.0, 0.0, 0.0]
p3d[free_axes[0]] = pt[0]
p3d[free_axes[1]] = pt[1]
p3d[fixed_axis] = fixed_val
datapoints.append(array(p3d))
n_pts = len(datapoints)
if n_pts > 1 and all(abs(datapoints[0][k] - datapoints[-1][k]) < 1e-8 for k in range(3)):
n_pts -= 1
triang = [[0, i, i + 1] for i in range(1, n_pts - 1)] if n_pts >= 3 else []
face_polys = None
else:
# 'full' — dispatch based on number of yield axes
n_yields = sum(1 for t in ax_type if t == 'yield')
face_polys = None # set by polytope path only
slice_outlines = None # set by slice path only
if n_yields == 0:
# All rate: exact 3D polytope via face-normal refinement
datapoints, triang, face_polys = _trace_polytope_3d_rate(model, axes, kwargs[CONSTRAINTS], solver)
elif n_yields == 1:
# 1 yield + 2 rate: slice along yield, trace rate-rate polygon per slice
datapoints, triang, slice_outlines = _trace_3d_slice_polygon(
model, axes, ax_type, val_limits, kwargs[CONSTRAINTS], solver, points)
else:
# 2+ yields: grid-based scanning (fallback)
x_space = linspace(val_limits[0][0], val_limits[0][1], num=points).tolist()
slices = []
for x_val in x_space:
constr_x = kwargs[CONSTRAINTS].copy()
constr_x.append(_make_fix_constraint(axes, 0, ax_type[0], x_val))
sol_y_min = _optimize_axis(model, 1, axes, ax_type[1], constr_x, solver, 'minimize')
sol_y_max = _optimize_axis(model, 1, axes, ax_type[1], constr_x, solver, 'maximize')
if sol_y_min.status not in [OPTIMAL] or sol_y_max.status not in [OPTIMAL]:
continue
y_lo = ceil_dec(sol_y_min.objective_value, 9)
y_hi = floor_dec(sol_y_max.objective_value, 9)
if isnan(y_lo) or isnan(y_hi):
continue
if abs(y_hi - y_lo) < 1e-10:
y_space = [y_lo]
else:
n_pts = max(3, int(points * abs(y_hi - y_lo) / max(1e-10,
max(abs(val_limits[1][1] - val_limits[1][0]), 1e-10))))
n_pts = min(n_pts, points)
y_space = linspace(y_lo, y_hi, n_pts).tolist()
upper_slice = []
lower_slice = []
for y_val in y_space:
constr_xy = constr_x.copy()
constr_xy.append(_make_fix_constraint(axes, 1, ax_type[1], y_val))
sol_z_min = _optimize_axis(model, 2, axes, ax_type[2], constr_xy, solver, 'minimize')
sol_z_max = _optimize_axis(model, 2, axes, ax_type[2], constr_xy, solver, 'maximize')
z_lo = ceil_dec(sol_z_min.objective_value, 9) if sol_z_min.status in [OPTIMAL] else nan
z_hi = floor_dec(sol_z_max.objective_value, 9) if sol_z_max.status in [OPTIMAL] else nan
if not isnan(z_lo) and not isnan(z_hi):
upper_slice.append((y_val, z_hi))
lower_slice.append((y_val, z_lo))
if upper_slice:
slices.append((x_val, upper_slice, lower_slice))
if not slices:
raise Exception('No feasible slices found. Problem may be infeasible.')
datapoints = []
datapoints_top = []
datapoints_bottom = []
for x_val, upper_slice, lower_slice in slices:
top_ids = []
bot_ids = []
for (y_val, z_hi), (_, z_lo) in zip(upper_slice, lower_slice):
top_ids.append(len(datapoints))
datapoints.append(array([x_val, y_val, z_hi]))
bot_ids.append(len(datapoints))
datapoints.append(array([x_val, y_val, z_lo]))
datapoints_top.append(top_ids)
datapoints_bottom.append(bot_ids)
triang = []
for s in range(len(slices) - 1):
_triangulate_strips(datapoints_top[s], datapoints_top[s + 1], datapoints, triang)
_triangulate_strips(datapoints_bottom[s], datapoints_bottom[s + 1], datapoints, triang,
flip_winding=True)
front_top = [t[0] for t in datapoints_top]
front_bot = [b[0] for b in datapoints_bottom]
_triangulate_strips(front_top, front_bot, datapoints, triang, flip_winding=True)
back_top = [t[-1] for t in datapoints_top]
back_bot = [b[-1] for b in datapoints_bottom]
_triangulate_strips(back_top, back_bot, datapoints, triang)
_triangulate_strips(datapoints_top[0], datapoints_bottom[0], datapoints, triang)
_triangulate_strips(datapoints_top[-1], datapoints_bottom[-1], datapoints, triang,
flip_winding=True)
if not datapoints:
raise Exception('No feasible points found. Problem may be infeasible.')
# 3D plot
x = [d[0] for d in datapoints]
y = [d[1] for d in datapoints]
z = [d[2] for d in datapoints]
ax3 = plt.figure().add_subplot(projection='3d')
ax3.dist = 10
ax3.azim = 30
ax3.elev = 10
ax3.set_xlim(ax_limits[0])
ax3.set_ylim(ax_limits[1])
ax3.set_zlim(ax_limits[2])
ax3.set_xlabel(ax_name[0])
ax3.set_ylabel(ax_name[1])
ax3.set_zlabel(ax_name[2])
if any(t == 'yield' for t in ax_type):
ax3.set_title('approximate', fontsize=8, color='gray')
def _normal_color(face_pts):
"""Compute color value from face normal direction."""
pts = array(face_pts)
if len(pts) < 3:
return 0.5
e1 = pts[1] - pts[0]
e2 = pts[2] - pts[0]
normal = array([e1[1]*e2[2] - e1[2]*e2[1],
e1[2]*e2[0] - e1[0]*e2[2],
e1[0]*e2[1] - e1[1]*e2[0]])
length = (normal.dot(normal)) ** 0.5
if length > 0:
normal = normal / length
# Map normal direction to scalar: use spherical angles
return arctan2(normal[1], normal[0]) + 1.5 * normal[2]
if face_polys is not None and face_polys:
# Polytope: render merged polygon faces with Poly3DCollection
poly_verts = [[datapoints[i].tolist() for i in face] for face in face_polys]
color_vals = [_normal_color(verts) for verts in poly_verts]
mn, mx = min(color_vals), max(color_vals)
rng = mx - mn if mx > mn else 1
face_colors = plt.get_cmap(cmap)([(c - mn) / rng for c in color_vals])
face_colors[:, 3] = 1.0
collection = Poly3DCollection(poly_verts, facecolors=face_colors,
edgecolors='black', linewidths=1.0)
ax3.add_collection3d(collection)
plot1 = collection
elif triang:
# Triangle mesh: render edgeless faces, color by normal
tri_verts = [[datapoints[i].tolist() for i in t] for t in triang]
color_vals = [_normal_color(verts) for verts in tri_verts]
mn, mx = min(color_vals), max(color_vals)
rng = mx - mn if mx > mn else 1
face_colors = plt.get_cmap(cmap)([(c - mn) / rng for c in color_vals])
face_colors[:, 3] = 1.0
collection = Poly3DCollection(tri_verts, facecolors=face_colors,
edgecolors='none', linewidths=0)
ax3.add_collection3d(collection)
# Draw slice polygon outlines (exact contour at each yield level)
if slice_outlines:
for idx_list in slice_outlines:
pts = array([datapoints[i] for i in idx_list])
pts = array(list(pts) + [pts[0]]) # close the loop
ax3.plot(pts[:, 0], pts[:, 1], pts[:, 2],
color='gray', linewidth=0.4)
plot1 = collection
else:
plot1 = ax3.scatter(x, y, z, s=20)
if show:
try:
plt.show()
except UserWarning as e:
if 'FigureCanvasTemplate is non-interactive' in str(e):
logging.warning('warning: Interactive plot not supported in current execution environment.')
return datapoints, triang, plot1
[docs]
def ceil_dec(v, n):
"""Round up v to n decimals"""
return ceil(v * (10**n)) / (10**n)
[docs]
def floor_dec(v, n):
"""Round down v to n decimals"""
return floor(v * (10**n)) / (10**n)