"""Accelerated FVA using scan LPs and bound scanning.
Standard FVA solves 2*n independent LPs (max and min for each reaction).
This implementation reduces LP count via a two-phase approach:
Phase 1 — Scan LPs (cheap, resolve ~50-70% of bounds):
a. v=0 feasibility: free resolutions when zero flux is feasible
b. min(sum(|x|)): pushes reactions toward zero, resolves lb=0/ub=0 bounds
c. Push-to-bounds: directed objectives push unresolved reactions toward
their variable bounds, with dual simplex warm-start for fast re-solves
Each scan LP solution is processed by bound scanning (vectorized at-bound
check that marks reactions whose flux equals their variable bound).
Phase 2 — Individual LPs for remaining unresolved objectives:
Sequential (warm-started) or parallel (SDPool) dispatch depending on
problem size and thread count. Sequential mode includes co-optimization
scanning: each LP vertex is checked for other reactions at their bounds.
"""
import logging
import time as _time
import numpy as np
from scipy import sparse
from math import nan
from cobra.util.array import create_stoichiometric_matrix
from pandas import DataFrame
from cobra import Configuration
from straindesign.lptools import (
select_solver, idx2c, fva_worker_init, fva_worker_compute,
fva_worker_init_glpk, fva_worker_compute_glpk,
)
from straindesign.solver_interface import MILP_LP
from straindesign.pool import SDPool
from straindesign.parse_constr import parse_constraints, lineqlist2mat
from straindesign.names import CONSTRAINTS, SOLVER, OPTIMAL, UNBOUNDED, GLPK, LP_METHOD_DUAL
from straindesign.networktools import suppress_lp_context
from straindesign.compression import (
compress_cobra_model, CompressionMethod, remove_conservation_relations,
stoichmat_coeff2rational, remove_blocked_reactions,
)
# ---------------------------------------------------------------------------
# Compression helpers
# ---------------------------------------------------------------------------
def _compress_for_fva(model):
"""Copy and compress model for FVA (single-pass coupled compression + conservation removal).
Suppresses optlang LP updates and skips solver deepcopy (not needed —
speedy_fva builds its own MILP_LP objects). Uses a single nullspace
compression pass (no recursive iteration) since FVA only needs the
first-order couplings.
Returns (compressed_model, cmp_maps) where cmp_maps is a list of
{compressed_id: {orig_id: Fraction_factor}} dicts, one per compression round
that actually reduced the reaction count.
"""
cmp_maps = []
with suppress_lp_context(model):
# Fast copy: swap solver with empty stub so deepcopy(solver) is cheap
# (~0.3s vs ~3.3s on iML1515). Safe because speedy_fva builds its own
# MILP_LP and the compression pipeline is solver-independent.
saved_solver = model._solver
model._solver = model.problem.Model()
try:
cmp_model = model.copy()
finally:
model._solver = saved_solver
remove_blocked_reactions(cmp_model)
stoichmat_coeff2rational(cmp_model)
n_before = len(cmp_model.reactions)
# Single-pass coupled compression (NULLSPACE only, no RECURSIVE iteration)
for r in cmp_model.reactions:
r.gene_reaction_rule = ''
result = compress_cobra_model(cmp_model,
methods=[CompressionMethod.NULLSPACE],
in_place=True)
rmap = result.reaction_map
if len(rmap) < n_before:
cmp_maps.append(rmap)
# Coupling can create new dependent rows — remove for clean LU factorization
remove_conservation_relations(cmp_model)
return cmp_model, cmp_maps
def _map_constraints(parsed_constraints, cmp_maps, cmp_reaction_ids):
"""Map parsed constraint dicts through compression mappings.
Each constraint is [coeff_dict, operator, rhs]. For each compression step,
original reaction IDs in coeff_dict are replaced by their compressed ID with
the coefficient scaled by the coupling factor (v_orig = factor * v_compressed).
"""
cmp_ids = set(cmp_reaction_ids)
for rmap in cmp_maps:
for cmp_id, orig_map in rmap.items():
for constraint in parsed_constraints:
coeff_dict = constraint[0]
lumped = [k for k in coeff_dict if k in orig_map]
if lumped:
coeff_dict[cmp_id] = sum(
coeff_dict.pop(k) * float(orig_map[k]) for k in lumped
)
# Remove references to reactions not in compressed model (e.g., blocked)
for constraint in parsed_constraints:
for k in list(constraint[0].keys()):
if k not in cmp_ids:
del constraint[0][k]
return parsed_constraints
def _expand_fva(fva_cmp, cmp_maps, orig_reaction_ids):
"""Expand compressed FVA results back to original reaction space.
Reverses through compression steps, expanding lumped reactions using
v_orig = factor * v_compressed (with min/max swap when factor < 0).
"""
result_min = {}
result_max = {}
for rxn_id in fva_cmp.index:
result_min[rxn_id] = fva_cmp.loc[rxn_id, 'minimum']
result_max[rxn_id] = fva_cmp.loc[rxn_id, 'maximum']
for rmap in reversed(cmp_maps):
for cmp_id, orig_map in rmap.items():
if len(orig_map) <= 1:
continue # singleton — ID unchanged
cmp_min = result_min.pop(cmp_id, 0.0)
cmp_max = result_max.pop(cmp_id, 0.0)
for orig_id, factor in orig_map.items():
f = float(factor)
if f > 0:
result_min[orig_id] = f * cmp_min
result_max[orig_id] = f * cmp_max
else:
result_min[orig_id] = f * cmp_max
result_max[orig_id] = f * cmp_min
# Blocked reactions (removed by remove_blocked_reactions): min=max=0
for rxn_id in orig_reaction_ids:
if rxn_id not in result_min:
result_min[rxn_id] = 0.0
result_max[rxn_id] = 0.0
return DataFrame(
{"minimum": [result_min[r] for r in orig_reaction_ids],
"maximum": [result_max[r] for r in orig_reaction_ids]},
index=orig_reaction_ids,
)
# ---------------------------------------------------------------------------
# Global scan LP helper
# ---------------------------------------------------------------------------
def _build_abssum_lp(S_eq, b_eq, A_ineq, b_ineq, lb, ub, solver, BIG=1000.0):
"""Build LP for min sum(|x|) via variable splitting.
For each reaction j:
- Forward only (lb >= 0): |x_j| = x_j, obj coeff = +1
- Backward only (ub <= 0): |x_j| = -x_j, obj coeff = -1
- Reversible (lb < 0 < ub): split x_j = p_j - n_j (p_j, n_j >= 0)
with auxiliary equality x_j - p_j + n_j = 0, obj = +1 on both
Returns (lp, n_orig) where:
- lp: MILP_LP ready to solve (x[:n_orig] gives the reaction fluxes)
- n_orig: number of original reaction variables
"""
n = len(lb)
tol = 1e-9
# Classify reactions
fwd = lb >= -tol # forward-only or fixed
bwd = ub <= tol # backward-only or fixed
rev = (~fwd) & (~bwd) # truly reversible
n_rev = int(rev.sum())
rev_idx = np.where(rev)[0]
# Extended variable vector: [x_0..x_{n-1}, p_0..p_{k-1}, n_0..n_{k-1}]
n_ext = n + 2 * n_rev
# Objective: min sum(|x|)
c = np.zeros(n_ext)
for j in range(n):
if rev[j]:
pass # handled via p/n below
elif bwd[j]:
c[j] = -1.0 # |x_j| = -x_j
else:
c[j] = 1.0 # |x_j| = x_j (fwd or fixed)
for k, j in enumerate(rev_idx):
c[n + k] = 1.0 # p_k
c[n + n_rev + k] = 1.0 # n_k
# Equality constraints: original S*x = 0 (+ extras) + splitting equalities
# Splitting: x_j - p_k + n_k = 0 for each reversible reaction k
if n_rev > 0:
# Build splitting equality block: n_rev rows x n_ext cols
rows_split = np.arange(n_rev)
# x_j coefficient = 1
cols_x = rev_idx
data_x = np.ones(n_rev)
# p_k coefficient = -1
cols_p = np.arange(n, n + n_rev)
data_p = -np.ones(n_rev)
# n_k coefficient = +1
cols_n = np.arange(n + n_rev, n + 2 * n_rev)
data_n = np.ones(n_rev)
rows_all = np.concatenate([rows_split, rows_split, rows_split])
cols_all = np.concatenate([cols_x, cols_p, cols_n])
data_all = np.concatenate([data_x, data_p, data_n])
A_split = sparse.csr_matrix((data_all, (rows_all, cols_all)),
shape=(n_rev, n_ext))
# Extend original equalities to n_ext columns
S_ext = sparse.hstack([S_eq, sparse.csr_matrix((S_eq.shape[0], 2 * n_rev))])
A_eq_full = sparse.vstack([S_ext, A_split])
b_eq_full = list(b_eq) + [0.0] * n_rev
if A_ineq.shape[0] > 0:
A_ineq_ext = sparse.hstack([A_ineq, sparse.csr_matrix((A_ineq.shape[0], 2 * n_rev))])
else:
A_ineq_ext = sparse.csr_matrix((0, n_ext))
else:
A_eq_full = S_eq
b_eq_full = list(b_eq)
A_ineq_ext = A_ineq
# Bounds
lb_ext = np.zeros(n_ext)
ub_ext = np.zeros(n_ext)
lb_ext[:n] = lb
ub_ext[:n] = ub
# Clamp inf bounds for objective push (doesn't affect feasibility)
for j in range(n):
if np.isinf(lb_ext[j]):
lb_ext[j] = -BIG
if np.isinf(ub_ext[j]):
ub_ext[j] = BIG
# p_k bounds: [0, ub_j] (clamped)
for k, j in enumerate(rev_idx):
lb_ext[n + k] = 0.0
ub_ext[n + k] = min(ub[j], BIG)
# n_k bounds: [0, -lb_j] (clamped)
for k, j in enumerate(rev_idx):
lb_ext[n + n_rev + k] = 0.0
ub_ext[n + n_rev + k] = min(-lb[j], BIG)
lp = MILP_LP(c=c.tolist(), A_ineq=A_ineq_ext, b_ineq=list(b_ineq),
A_eq=A_eq_full, b_eq=b_eq_full,
lb=lb_ext.tolist(), ub=ub_ext.tolist(), solver=solver)
return lp, n
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
[docs]
def speedy_fva(model, **kwargs):
"""Accelerated FVA using global scan LPs and KKT-based optimality propagation.
Returns the same DataFrame as straindesign.lptools.fva.
Parameters
----------
model : cobra.Model
solver : str, optional
constraints : str or list, optional
compress : bool or None, optional (default None)
Compress the model (coupled compression + conservation removal) before
running FVA, then expand results back. None = auto (True if n >= 200).
precheck : bool or None, optional (default None)
Run global scan LPs (min/max sum(|x|)) in Phase 1 to pre-resolve many
objectives without individual LPs. None = auto (always True).
threads : int or None, optional (default None)
Number of parallel workers for Phase 2 dispatch. None = auto
(Configuration().processes if n >= 1000, else 1).
verbose : bool, optional (default False)
Returns
-------
pandas.DataFrame with columns 'minimum' and 'maximum', indexed by reaction ID.
"""
compress = kwargs.pop('compress', None)
precheck = kwargs.pop('precheck', None)
threads = kwargs.pop('threads', None)
orig_reaction_ids = model.reactions.list_attr("id")
n_original = len(orig_reaction_ids)
cmp_maps = []
# Auto-tuning
if compress is None:
compress = n_original >= 200
if precheck is None:
precheck = True
if threads is None:
threads = Configuration().processes if n_original >= 1000 else 1
threads = max(1, int(threads))
t_phase = {}
_tp0 = _time.perf_counter()
if compress:
model, cmp_maps = _compress_for_fva(model)
t_phase['compress'] = _time.perf_counter() - _tp0
reaction_ids = model.reactions.list_attr("id")
n_orig = len(reaction_ids)
has_constraints = CONSTRAINTS in kwargs and kwargs[CONSTRAINTS]
if has_constraints:
from straindesign.networktools import resolve_gene_constraints
kwargs[CONSTRAINTS] = resolve_gene_constraints(model, kwargs[CONSTRAINTS])
kwargs[CONSTRAINTS] = parse_constraints(kwargs[CONSTRAINTS], orig_reaction_ids)
if cmp_maps:
kwargs[CONSTRAINTS] = _map_constraints(
kwargs[CONSTRAINTS], cmp_maps, reaction_ids)
A_ineq_extra, b_ineq_extra, A_eq_extra, b_eq_extra = lineqlist2mat(
kwargs[CONSTRAINTS], reaction_ids)
if SOLVER not in kwargs:
kwargs[SOLVER] = None
solver = select_solver(kwargs[SOLVER], model)
verbose = kwargs.get('verbose', False)
# ------------------------------------------------------------------
# Phase 0: Setup
# ------------------------------------------------------------------
S = sparse.csr_matrix(create_stoichiometric_matrix(model))
m_S = S.shape[0]
b_eq_base = [0.0] * m_S
if has_constraints:
A_eq = sparse.vstack((S, A_eq_extra))
b_eq = b_eq_base + b_eq_extra
A_ineq = A_ineq_extra
b_ineq = b_ineq_extra
else:
A_eq = S
b_eq = b_eq_base
A_ineq = sparse.csr_matrix((0, n_orig))
b_ineq = []
lb = np.array([v.lower_bound for v in model.reactions], dtype=np.float64)
ub = np.array([v.upper_bound for v in model.reactions], dtype=np.float64)
lp = MILP_LP(A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq,
lb=lb.tolist(), ub=ub.tolist(), solver=solver)
_, _, status = lp.solve()
if status not in [OPTIMAL, UNBOUNDED]:
logging.info('FVA problem not feasible.')
return DataFrame(
{"minimum": [nan] * n_orig, "maximum": [nan] * n_orig},
index=reaction_ids,
)
# Initialize tracking
incumbent_max = np.full(n_orig, -np.inf)
incumbent_min = np.full(n_orig, np.inf)
res_max = np.zeros(n_orig, dtype=bool)
res_min = np.zeros(n_orig, dtype=bool)
# Pre-resolve fixed reactions
fixed = np.abs(ub - lb) < 1e-12
res_max[fixed] = True
res_min[fixed] = True
incumbent_max[fixed] = ub[fixed]
incumbent_min[fixed] = lb[fixed]
# Stats
lps_solved = 0
total_bound_resolved = 0
t_solve = 0.0
tol_bound = 1e-9
def _bound_scan(x_local):
nonlocal total_bound_resolved
at_ub = (~res_max) & (np.abs(x_local - ub) < tol_bound)
n_ub = int(at_ub.sum())
if n_ub:
res_max[at_ub] = True
incumbent_max[at_ub] = ub[at_ub]
total_bound_resolved += n_ub
at_lb = (~res_min) & (np.abs(x_local - lb) < tol_bound)
n_lb = int(at_lb.sum())
if n_lb:
res_min[at_lb] = True
incumbent_min[at_lb] = lb[at_lb]
total_bound_resolved += n_lb
t_phase['setup'] = _time.perf_counter() - _tp0 - t_phase['compress']
# ------------------------------------------------------------------
# Phase 1: Global scan LPs + v=0 feasibility
# ------------------------------------------------------------------
_tp1 = _time.perf_counter()
# 1a: v=0 feasibility check — free resolutions, no LP needed
v0_feasible = (not np.any(lb > tol_bound) and not np.any(ub < -tol_bound)
and not has_constraints)
if v0_feasible:
zero_lb = np.abs(lb) < tol_bound
newly_min = zero_lb & (~res_min)
res_min[newly_min] = True
incumbent_min[newly_min] = 0.0
total_bound_resolved += int(newly_min.sum())
zero_ub = np.abs(ub) < tol_bound
newly_max = zero_ub & (~res_max)
res_max[newly_max] = True
incumbent_max[newly_max] = 0.0
total_bound_resolved += int(newly_max.sum())
if precheck:
# 1b: min(sum(|x|)) scan — pushes reactions toward zero
# Effective for resolving lb=0 / ub=0 bounds in one shot.
scan_lp, n_scan = _build_abssum_lp(
A_eq, b_eq, A_ineq, b_ineq, lb, ub, solver)
scan_lp.set_lp_method(LP_METHOD_DUAL)
n_ext = len(scan_lp.c)
t0 = _time.perf_counter()
x_list_scan, _, scan_status = scan_lp.solve()
t_solve += _time.perf_counter() - t0
lps_solved += 1
resolved_absmin = 0
if scan_status == OPTIMAL:
x_scan = np.array(x_list_scan[:n_scan], dtype=np.float64)
before = int(res_max.sum() + res_min.sum())
_bound_scan(x_scan)
np.maximum(incumbent_max, x_scan, out=incumbent_max)
np.minimum(incumbent_min, x_scan, out=incumbent_min)
resolved_absmin = int(res_max.sum() + res_min.sum()) - before
if verbose:
n_done_iter = int(res_max.sum() + res_min.sum())
logging.debug(
f" Phase 1 min|x|: +{resolved_absmin} "
f"({n_done_iter}/{2*n_orig} resolved)")
# 1c: Iterative push-to-bounds — directed per-reaction objectives
# Push unresolved-max reactions toward ub, unresolved-min toward lb.
# Dual simplex warm-start makes re-optimization nearly free.
push_iter = 0
while True:
resolved_this_round = 0
resolved_push_ub = 0
resolved_push_lb = 0
# Push toward ub (resolve max bounds): maximize x_j → c[j] = -1
c_push = np.zeros(n_ext)
any_unresolved = False
for j in range(n_scan):
if not res_max[j]:
c_push[j] = -1.0
any_unresolved = True
if any_unresolved:
scan_lp.set_objective(c_push.tolist())
t0 = _time.perf_counter()
x_list_scan, _, scan_status = scan_lp.solve()
t_solve += _time.perf_counter() - t0
lps_solved += 1
if scan_status == OPTIMAL:
x_scan = np.array(x_list_scan[:n_scan], dtype=np.float64)
before = int(res_max.sum() + res_min.sum())
_bound_scan(x_scan)
np.maximum(incumbent_max, x_scan, out=incumbent_max)
np.minimum(incumbent_min, x_scan, out=incumbent_min)
resolved_push_ub = int(res_max.sum() + res_min.sum()) - before
resolved_this_round += resolved_push_ub
# Push toward lb (resolve min bounds): minimize x_j → c[j] = +1
c_push = np.zeros(n_ext)
any_unresolved = False
for j in range(n_scan):
if not res_min[j]:
c_push[j] = 1.0
any_unresolved = True
if any_unresolved:
scan_lp.set_objective(c_push.tolist())
t0 = _time.perf_counter()
x_list_scan, _, scan_status = scan_lp.solve()
t_solve += _time.perf_counter() - t0
lps_solved += 1
if scan_status == OPTIMAL:
x_scan = np.array(x_list_scan[:n_scan], dtype=np.float64)
before = int(res_max.sum() + res_min.sum())
_bound_scan(x_scan)
np.maximum(incumbent_max, x_scan, out=incumbent_max)
np.minimum(incumbent_min, x_scan, out=incumbent_min)
resolved_push_lb = int(res_max.sum() + res_min.sum()) - before
resolved_this_round += resolved_push_lb
if verbose:
n_done_iter = int(res_max.sum() + res_min.sum())
logging.debug(
f" Phase 1 push {push_iter}: "
f"ub +{resolved_push_ub}, lb +{resolved_push_lb} "
f"({n_done_iter}/{2*n_orig} resolved)")
push_iter += 1
if resolved_this_round < 5:
break
del scan_lp
t_phase['phase1'] = _time.perf_counter() - _tp1
# ------------------------------------------------------------------
# Phase 2: Dispatch remaining objectives — sequential or parallel
# ------------------------------------------------------------------
_tp2 = _time.perf_counter()
n_done = int(res_max.sum() + res_min.sum())
n_remaining = 2 * n_orig - n_done
phase2_entry_count = n_remaining # for stats
if n_remaining >= 1000 and threads > 1:
# Parallel dispatch via SDPool
# Build list of unresolved objective indices (even=max, odd=min)
unresolved = []
for j in range(n_orig):
if not res_max[j]:
unresolved.append(2 * j) # even = max
if not res_min[j]:
unresolved.append(2 * j + 1) # odd = min
x_par = [nan] * (2 * n_orig)
t0 = _time.perf_counter()
if solver == GLPK:
with SDPool(threads, initializer=fva_worker_init_glpk,
initargs=(A_ineq, b_ineq, A_eq, b_eq,
lb.tolist(), ub.tolist())) as pool:
chunk_size = max(1, len(unresolved) // threads)
for i, value in pool.imap_unordered(
fva_worker_compute_glpk, unresolved,
chunksize=chunk_size):
x_par[i] = value
else:
with SDPool(threads, initializer=fva_worker_init,
initargs=(A_ineq, b_ineq, A_eq, b_eq,
lb.tolist(), ub.tolist(), solver)) as pool:
chunk_size = max(1, len(unresolved) // threads)
for i, value in pool.imap_unordered(
fva_worker_compute, unresolved,
chunksize=chunk_size):
x_par[i] = value
t_solve += _time.perf_counter() - t0
lps_solved += len(unresolved)
# NaN retry with fresh LPs
nan_idx = [i for i in unresolved if np.isnan(x_par[i])]
if nan_idx:
_BATCH = 50
while nan_idx:
lp_retry = MILP_LP(A_ineq=A_ineq, b_ineq=b_ineq,
A_eq=A_eq, b_eq=b_eq,
lb=lb.tolist(), ub=ub.tolist(), solver=solver)
prev_retry = 0
for i in nan_idx[:_BATCH]:
C = idx2c(i, prev_retry)
if solver in ('cplex', 'gurobi'):
lp_retry.backend.set_objective_idx(C)
x_par[i] = lp_retry.backend.slim_solve()
else:
lp_retry.set_objective_idx(C)
x_par[i] = lp_retry.slim_solve()
prev_retry = C[0][0]
old_count = len(nan_idx)
nan_idx = [i for i in nan_idx if np.isnan(x_par[i])]
if len(nan_idx) == old_count:
break
lps_solved += len(unresolved) - len(nan_idx) if nan_idx else len(unresolved)
# Collect parallel results
for j in range(n_orig):
i_max = 2 * j
if not res_max[j] and not np.isnan(x_par[i_max]):
res_max[j] = True
incumbent_max[j] = -x_par[i_max]
i_min = 2 * j + 1
if not res_min[j] and not np.isnan(x_par[i_min]):
res_min[j] = True
incumbent_min[j] = x_par[i_min]
elif n_remaining > 0:
# Sequential dispatch — simple loop, no hub-first, no dual check
prev_col = -1
def _rebuild_lp():
nonlocal lp, prev_col
lp = MILP_LP(A_ineq=A_ineq, b_ineq=b_ineq, A_eq=A_eq, b_eq=b_eq,
lb=lb.tolist(), ub=ub.tolist(), solver=solver)
prev_col = -1
_rebuild_lp()
seq_count = 0
for j in range(n_orig):
for direction in (1, -1):
if direction == 1 and res_max[j]:
continue
if direction == -1 and res_min[j]:
continue
# Periodic rebuild to limit warm-start degeneration
if seq_count > 0 and seq_count % 200 == 0:
_rebuild_lp()
sig = -direction
if prev_col < 0 or prev_col == j:
C = [[j, float(sig)]]
else:
C = [[j, float(sig)], [prev_col, 0.0]]
if solver in ('cplex', 'gurobi'):
lp.backend.set_objective_idx(C)
else:
lp.set_objective_idx(C)
prev_col = j
t0 = _time.perf_counter()
x_list, obj_val, status = lp.solve()
t_solve += _time.perf_counter() - t0
lps_solved += 1
seq_count += 1
if status == UNBOUNDED:
if direction == 1:
res_max[j] = True
incumbent_max[j] = np.inf
else:
res_min[j] = True
incumbent_min[j] = -np.inf
continue
elif status != OPTIMAL:
if direction == 1:
res_max[j] = True
else:
res_min[j] = True
continue
# Guard: LP optimum must not be worse than incumbent
if direction == 1:
val, inc = -obj_val, incumbent_max[j]
bad = np.isfinite(inc) and val < inc - 1e-6 * (1 + abs(inc))
else:
val, inc = obj_val, incumbent_min[j]
bad = np.isfinite(inc) and val > inc + 1e-6 * (1 + abs(inc))
if bad:
_rebuild_lp()
C = [[j, float(sig)]]
if solver in ('cplex', 'gurobi'):
lp.backend.set_objective_idx(C)
else:
lp.set_objective_idx(C)
prev_col = j
t0 = _time.perf_counter()
x_list, obj_val, status = lp.solve()
t_solve += _time.perf_counter() - t0
lps_solved += 1
seq_count += 1
if status == UNBOUNDED:
if direction == 1:
res_max[j] = True
incumbent_max[j] = np.inf
else:
res_min[j] = True
incumbent_min[j] = -np.inf
elif status == OPTIMAL:
if direction == 1:
res_max[j] = True
incumbent_max[j] = -obj_val
else:
res_min[j] = True
incumbent_min[j] = obj_val
# Co-optimization scan: check if this vertex also
# resolves other unresolved directions (at-bound check
# on non-zero values that improve the incumbent).
x_arr = np.array(x_list[:n_orig], dtype=np.float64)
np.maximum(incumbent_max, x_arr, out=incumbent_max)
np.minimum(incumbent_min, x_arr, out=incumbent_min)
_bound_scan(x_arr)
# ------------------------------------------------------------------
# Assemble results
# ------------------------------------------------------------------
incumbent_max[np.abs(incumbent_max) < 1e-11] = 0.0
incumbent_min[np.abs(incumbent_min) < 1e-11] = 0.0
fva_result = DataFrame(
{"minimum": incumbent_min, "maximum": incumbent_max},
index=reaction_ids,
)
if verbose:
cmp_msg = ""
if cmp_maps:
cmp_msg = f" (compressed {n_original}→{n_orig} rxns)"
logging.debug(
f" speedy_fva done{cmp_msg}: {lps_solved} LPs, "
f"{total_bound_resolved} bound-resolved, "
f"{2*n_orig} total objectives")
logging.debug(
f" timing: solve={t_solve:.2f}s, threads={threads}")
t_phase['phase2'] = _time.perf_counter() - _tp2
t_phase['total'] = _time.perf_counter() - _tp0
fva_result.attrs['t_phase'] = t_phase
fva_result.attrs['lps_solved'] = lps_solved
fva_result.attrs['bound_resolved'] = total_bound_resolved
fva_result.attrs['phase2_remaining'] = phase2_entry_count
if cmp_maps:
fva_result.attrs['n_original'] = n_original
fva_result.attrs['n_compressed'] = n_orig
# Expand compressed results back to original reaction space
if cmp_maps:
expanded = _expand_fva(fva_result, cmp_maps, orig_reaction_ids)
expanded.attrs = fva_result.attrs
fva_result = expanded
return fva_result