"""
Metabolic network compression using nullspace-based coupling detection.
This module provides network compression for COBRA models. The main function
is compress_cobra_model() which compresses a model and returns transformation
matrices for converting between original and compressed flux spaces.
API:
>>> from straindesign.compression import compress_cobra_model
>>> result = compress_cobra_model(model)
>>> compressed_model = result.compressed_model
Note: The model should be preprocessed first (rational coefficients,
conservation relations removed). Use networktools.compress_model() for
automatic preprocessing.
"""
import ast
import copy
import logging
import numpy as np
from enum import Enum
from functools import reduce
from math import gcd, lcm, isinf
from typing import Dict, Iterator, List, Optional, Set, Tuple, Union, Any
from fractions import Fraction
from scipy import sparse
from scipy.sparse import csr_matrix, csc_matrix
from sympy import Rational, Symbol as SympySymbol, And as SympyAnd, Or as SympyOr
from cobra import Configuration
from cobra.util.array import create_stoichiometric_matrix
LOG = logging.getLogger(__name__)
# LP suppression utilities live in networktools. Imports are deferred to
# function bodies to avoid the circular dependency (networktools re-exports
# compression symbols).
# =============================================================================
# Utility Functions
# =============================================================================
[docs]
def float_to_rational(val, max_precision: int = 6, max_denom: int = 100) -> Fraction:
"""Convert float to Fraction with bounded denominators."""
if isinstance(val, Fraction):
return val
import numbers
if isinstance(val, numbers.Rational):
return Fraction(val.numerator, val.denominator)
# Handle sympy.Float and other numeric types that Fraction() doesn't accept directly
if not isinstance(val, (int, float)):
val = float(val)
if val == 0:
return Fraction(0)
if val == int(val):
return Fraction(int(val))
small_frac = Fraction(val).limit_denominator(max_denom)
if round(float(small_frac), max_precision) == round(val, max_precision):
return small_frac
denom = 10**max_precision
numer = round(val * denom)
return Fraction(numer, denom)
[docs]
def detect_max_precision(model) -> int:
"""Detect maximum decimal precision needed for model coefficients."""
max_decimals = 0
for rxn in model.reactions:
for met, coeff in rxn.metabolites.items():
if isinstance(coeff, float) and coeff != 0:
s = f"{abs(coeff):.15g}"
if '.' in s:
decimals = len(s.split('.')[1].rstrip('0'))
max_decimals = max(max_decimals, decimals)
return min(12, max(3, max_decimals))
def _lcm_list(numbers: List[int]) -> int:
"""Compute LCM of a list of integers."""
return reduce(lcm, numbers, 1) if numbers else 1
# =============================================================================
# Rational Matrix with Sparse Storage
# =============================================================================
[docs]
class RationalMatrix:
"""Sparse rational matrix using dual int sparse storage (numerators + denominators)."""
def __init__(self, rows: int, cols: int):
self._rows = rows
self._cols = cols
self._num_sparse: Optional[csr_matrix] = None # Numerators
self._den_sparse: Optional[csr_matrix] = None # Denominators
self._csc_cache: Optional[csc_matrix] = None
self._batch_mode = False
def _invalidate_cache(self):
if not self._batch_mode:
self._csc_cache = None
# -------------------------------------------------------------------------
# Construction
# -------------------------------------------------------------------------
@classmethod
def _from_sparse(cls,
num_sparse: csr_matrix,
den_sparse: csr_matrix,
scaled: Optional[csr_matrix] = None,
common_denom: Optional[int] = None) -> 'RationalMatrix':
"""Create RationalMatrix from sparse components."""
rows, cols = num_sparse.shape
result = cls(rows, cols)
result._num_sparse = num_sparse.copy()
result._den_sparse = den_sparse.copy()
return result
@classmethod
[docs]
def identity(cls, size: int) -> 'RationalMatrix':
"""Create identity matrix."""
row_idx = list(range(size))
col_idx = list(range(size))
num_data = [1] * size
den_data = [1] * size
num_sparse = csr_matrix((num_data, (row_idx, col_idx)), shape=(size, size), dtype=np.int64)
den_sparse = csr_matrix((den_data, (row_idx, col_idx)), shape=(size, size), dtype=np.int64)
return cls._from_sparse(num_sparse, den_sparse)
@classmethod
[docs]
def from_numpy(cls, arr: np.ndarray, max_precision: int = 6, max_denom: int = 100) -> 'RationalMatrix':
"""Create RationalMatrix from numpy array."""
rows, cols = arr.shape
row_idx, col_idx, num_data, den_data = [], [], [], []
for r in range(rows):
for c in range(cols):
val = arr[r, c]
if val != 0:
frac = float_to_rational(val, max_precision, max_denom)
row_idx.append(r)
col_idx.append(c)
num_data.append(frac.numerator)
den_data.append(frac.denominator)
num_sparse = csr_matrix((num_data, (row_idx, col_idx)), shape=(rows, cols), dtype=np.int64)
den_sparse = csr_matrix((den_data, (row_idx, col_idx)), shape=(rows, cols), dtype=np.int64)
return cls._from_sparse(num_sparse, den_sparse)
@classmethod
[docs]
def from_cobra_model(cls, model, max_precision: int = 6, max_denom: int = 100) -> 'RationalMatrix':
"""Create RationalMatrix from COBRA model stoichiometry."""
num_mets = len(model.metabolites)
num_rxns = len(model.reactions)
met_index = {m.id: i for i, m in enumerate(model.metabolites)}
row_idx, col_idx, num_data, den_data = [], [], [], []
for j, rxn in enumerate(model.reactions):
for met, coeff in rxn.metabolites.items():
if coeff != 0:
i = met_index[met.id]
if isinstance(coeff, Fraction):
frac = coeff
elif hasattr(coeff, 'p') and hasattr(coeff, 'q'):
frac = Fraction(int(coeff.p), int(coeff.q))
elif hasattr(coeff, 'numerator'):
frac = Fraction(coeff.numerator, coeff.denominator)
else:
frac = float_to_rational(coeff, max_precision, max_denom)
row_idx.append(i)
col_idx.append(j)
num_data.append(frac.numerator)
den_data.append(frac.denominator)
num_sparse = csr_matrix((num_data, (row_idx, col_idx)), shape=(num_mets, num_rxns), dtype=np.int64)
den_sparse = csr_matrix((den_data, (row_idx, col_idx)), shape=(num_mets, num_rxns), dtype=np.int64)
return cls._from_sparse(num_sparse, den_sparse)
@classmethod
def _build_from_sparse_data(cls, row_indices: List[int], col_indices: List[int], numerators: List[int], denominators: List[int],
num_rows: int, num_cols: int) -> 'RationalMatrix':
"""Build RationalMatrix from sparse coordinate data."""
num_sparse = csr_matrix((numerators, (row_indices, col_indices)), shape=(num_rows, num_cols), dtype=np.int64)
den_sparse = csr_matrix((denominators, (row_indices, col_indices)), shape=(num_rows, num_cols), dtype=np.int64)
return cls._from_sparse(num_sparse, den_sparse)
# -------------------------------------------------------------------------
# Size queries
# -------------------------------------------------------------------------
def get_row_count(self) -> int:
return self._rows
def get_column_count(self) -> int:
return self._cols
# -------------------------------------------------------------------------
# Iteration
# -------------------------------------------------------------------------
[docs]
def iter_column_fractions(self, col: int) -> Iterator[Tuple[int, Fraction]]:
"""Iterate over non-zero entries in column as (row, Fraction) pairs."""
if self._csc_cache is None:
self._csc_cache = self._num_sparse.tocsc()
self._den_csc_cache = self._den_sparse.tocsc()
start = self._csc_cache.indptr[col]
end = self._csc_cache.indptr[col + 1]
for idx in range(start, end):
row = self._csc_cache.indices[idx]
num = int(self._csc_cache.data[idx])
den = int(self._den_csc_cache.data[idx])
if num != 0:
yield row, Fraction(num, den)
[docs]
def get_signum(self, row: int, col: int) -> int:
"""Return sign of element: -1, 0, or 1."""
num = self._num_sparse[row, col]
if num > 0:
return 1
elif num < 0:
return -1
return 0
# -------------------------------------------------------------------------
# Batch edit mode
# -------------------------------------------------------------------------
[docs]
def begin_batch_edit(self):
"""Enter batch edit mode - delays cache invalidation."""
self._batch_mode = True
self._num_sparse = self._num_sparse.tolil()
self._den_sparse = self._den_sparse.tolil()
[docs]
def end_batch_edit(self):
"""Exit batch edit mode - converts back to CSR and invalidates cache."""
self._batch_mode = False
self._num_sparse = self._num_sparse.tocsr()
self._den_sparse = self._den_sparse.tocsr()
self._csc_cache = None
# -------------------------------------------------------------------------
# Matrix operations
# -------------------------------------------------------------------------
[docs]
def clone(self) -> 'RationalMatrix':
"""Create a deep copy."""
return RationalMatrix._from_sparse(self._num_sparse.copy(), self._den_sparse.copy())
[docs]
def submatrix(self, rows: int, cols: int) -> 'RationalMatrix':
"""Extract top-left submatrix of given dimensions."""
num_sub = self._num_sparse[:rows, :cols].copy()
den_sub = self._den_sparse[:rows, :cols].copy()
return RationalMatrix._from_sparse(num_sub, den_sub)
[docs]
def remove_rows(self, keep_indices: List[int]) -> None:
"""Keep only the specified rows."""
keep = np.array(keep_indices, dtype=np.intp)
self._num_sparse = self._num_sparse[keep, :]
self._den_sparse = self._den_sparse[keep, :]
self._rows = len(keep_indices)
self._invalidate_cache()
[docs]
def remove_columns(self, keep_indices: List[int]) -> None:
"""Keep only the specified columns."""
keep = np.array(keep_indices, dtype=np.intp)
self._num_sparse = self._num_sparse[:, keep]
self._den_sparse = self._den_sparse[:, keep]
self._cols = len(keep_indices)
self._invalidate_cache()
[docs]
def add_scaled_column(self, dst_col: int, src_col: int, scalar_num: int, scalar_den: int) -> None:
"""Add scalar * column[src] to column[dst]. dst[i] += (num/den) * src[i]"""
if scalar_num == 0:
return
num_lil = self._num_sparse
den_lil = self._den_sparse
# Get source column entries
if hasattr(num_lil, 'rows'): # LIL format
src_rows = num_lil.rows[src_col] if num_lil.format == 'csc' else None
# Convert to CSC for efficient column access
num_csc = num_lil.tocsc() if num_lil.format != 'csc' else num_lil
den_csc = den_lil.tocsc() if den_lil.format != 'csc' else den_lil
start = num_csc.indptr[src_col]
end = num_csc.indptr[src_col + 1]
for idx in range(start, end):
row = num_csc.indices[idx]
src_num = int(num_csc.data[idx])
src_den = int(den_csc.data[idx])
if src_num == 0:
continue
# Get current dst value
dst_num = int(num_lil[row, dst_col])
dst_den = int(den_lil[row, dst_col]) if dst_num != 0 else 1
# Compute: dst + (scalar_num/scalar_den) * (src_num/src_den)
# = dst_num/dst_den + (scalar_num * src_num)/(scalar_den * src_den)
add_num = scalar_num * src_num
add_den = scalar_den * src_den
if dst_num == 0:
new_num, new_den = add_num, add_den
else:
# Common denominator addition
new_num = dst_num * add_den + add_num * dst_den
new_den = dst_den * add_den
# Reduce
if new_num != 0:
g = gcd(abs(new_num), new_den)
new_num //= g
new_den //= g
num_lil[row, dst_col] = new_num
den_lil[row, dst_col] = new_den if new_num != 0 else 0
self._invalidate_cache()
# -------------------------------------------------------------------------
# Conversion
# -------------------------------------------------------------------------
[docs]
def to_numpy(self) -> np.ndarray:
"""Convert to numpy float array."""
result = np.zeros((self._rows, self._cols), dtype=np.float64)
coo_num = self._num_sparse.tocoo()
coo_den = self._den_sparse.tocoo()
for r, c, num, den in zip(coo_num.row, coo_num.col, coo_num.data, coo_den.data):
if num != 0:
result[r, c] = float(num) / float(den)
return result
[docs]
def to_sparse_csr(self) -> Tuple[csr_matrix, int]:
"""Return sparse representation and common denominator.
Returns numerator matrix scaled by LCM of denominators, plus the LCM.
"""
# Compute LCM of all denominators
dens = self._den_sparse.data
if len(dens) == 0:
return csr_matrix((self._rows, self._cols), dtype=np.int64), 1
common_denom = _lcm_list([int(d) for d in dens if d != 0])
# Scale numerators
coo_num = self._num_sparse.tocoo()
coo_den = self._den_sparse.tocoo()
scaled_data = []
for num, den in zip(coo_num.data, coo_den.data):
if num != 0:
scaled_data.append(int(num) * (common_denom // int(den)))
else:
scaled_data.append(0)
scaled = csr_matrix((scaled_data, (coo_num.row, coo_num.col)), shape=(self._rows, self._cols), dtype=np.int64)
return scaled, common_denom
[docs]
def to_sparse_pattern(self) -> Tuple[csr_matrix, Dict[int, Dict[int, Fraction]]]:
"""Return sparsity pattern as CSR and row-wise Fraction data.
For pattern analysis (coupled reaction detection) without integer overflow.
Returns:
pattern: CSR matrix with 1s at non-zero positions (for indptr/indices)
row_data: {row: {col: Fraction}} for actual values
"""
# Build pattern matrix (just 1s for structure)
coo_num = self._num_sparse.tocoo()
pattern_data = [1] * len(coo_num.data)
pattern = csr_matrix((pattern_data, (coo_num.row, coo_num.col)), shape=(self._rows, self._cols), dtype=np.int8)
# Build row-wise Fraction data
coo_den = self._den_sparse.tocoo()
row_data: Dict[int, Dict[int, Fraction]] = {}
for r, c, num, den in zip(coo_num.row, coo_num.col, coo_num.data, coo_den.data):
num_int = int(num)
den_int = int(den) if den != 0 else 1
if num_int != 0:
if r not in row_data:
row_data[r] = {}
row_data[r][c] = Fraction(num_int, den_int)
return pattern, row_data
def __repr__(self) -> str:
return f"RationalMatrix({self._rows}x{self._cols})"
# =============================================================================
# Sparse Integer RREF for Nullspace Computation
# =============================================================================
def _rref_integer_sparse(rm: RationalMatrix) -> Tuple[Dict[int, Dict[int, int]], int, List[int]]:
"""Compute integer RREF using dict-of-dicts for sparse row operations.
Columns are pre-sorted by nnz ascending so that sparse (likely pivot) columns
are processed first, reducing pivot-search time and fill-in during elimination.
Rows are pre-sorted by nnz ascending, and pivot rows are selected by Markowitz
criterion (sparsest row first, tie-break by smallest absolute value).
Results are translated back to the original column ordering before return.
Uses GCD pre-scaling before row operations and post-reduction to control
coefficient growth. No denominator tracking needed — rows are arbitrarily
scalable for nullspace computation.
Args:
rm: Input RationalMatrix
Returns:
(rref_data, rank, pivot_columns)
rref_data: {rref_row: {orig_col: value}} in original column space
pivot_columns: pivot column indices in original column space
"""
rows = rm.get_row_count()
cols = rm.get_column_count()
# --- Column sorting: sparse columns first ---
# col_order[sorted_pos] = original_col
nnz_per_col = np.diff(rm._num_sparse.tocsc().indptr)
col_order = np.argsort(nnz_per_col, kind='stable').tolist()
col_inverse = [0] * cols
for sorted_pos, orig_col in enumerate(col_order):
col_inverse[orig_col] = sorted_pos
# Convert to integer matrix (scale each row by LCM of denominators)
# Store column indices in sorted space
num_csr = rm._num_sparse.tocsr()
den_csr = rm._den_sparse.tocsr()
data: Dict[int, Dict[int, int]] = {}
for r in range(rows):
start, end = num_csr.indptr[r], num_csr.indptr[r + 1]
if start == end:
continue
# Compute row LCM of denominators
row_dens = [int(den_csr.data[i]) for i in range(start, end) if den_csr.data[i] != 0]
row_lcm = reduce(lcm, row_dens, 1) if row_dens else 1
# Scale numerators; store using sorted column index
row_data = {}
for i in range(start, end):
num = int(num_csr.data[i])
den = int(den_csr.data[i]) if den_csr.data[i] != 0 else 1
orig_col = int(num_csr.indices[i])
scaled = num * (row_lcm // den)
if scaled != 0:
row_data[col_inverse[orig_col]] = scaled
if row_data:
data[r] = row_data
# --- Row sorting: sparse rows first (better initial pivot candidates) ---
if data:
sorted_row_keys = sorted(data.keys(), key=lambda r: len(data[r]))
data = {new_r: data[old_r] for new_r, old_r in enumerate(sorted_row_keys)}
# Gaussian elimination with partial pivoting (all indices in sorted column space)
pivot_cols_sorted = []
pivot_row = 0
for pivot_col in range(cols):
if pivot_row >= rows:
break
# Find pivot: sparsest row first (Markowitz), then smallest abs value
best_row, best_val = -1, 0
best_nnz, best_abs = float('inf'), float('inf')
for r, row_data in data.items():
if r < pivot_row:
continue
if pivot_col in row_data:
v = row_data[pivot_col]
rnnz = len(row_data)
if rnnz < best_nnz or (rnnz == best_nnz and abs(v) < best_abs):
best_row, best_val = r, v
best_nnz, best_abs = rnnz, abs(v)
if best_row < 0:
continue
# Swap rows if needed
if best_row != pivot_row:
if pivot_row in data:
if best_row in data:
data[pivot_row], data[best_row] = data[best_row], data[pivot_row]
else:
data[best_row] = data.pop(pivot_row)
elif best_row in data:
data[pivot_row] = data.pop(best_row)
pivot_row_data = data.get(pivot_row)
if pivot_row_data is None:
continue
pivot_val = pivot_row_data.get(pivot_col, 0)
if pivot_val == 0:
continue
pivot_cols_sorted.append(pivot_col)
# Collect rows to eliminate (snapshot keys before modification)
elim_targets = [(r, row_data[pivot_col]) for r, row_data in data.items() if r != pivot_row and pivot_col in row_data]
# Eliminate pivot column from all other rows
for elim_row, elim_val in elim_targets:
elim_row_data = data[elim_row]
# Pre-scale by GCD to minimize coefficient growth.
# math.gcd handles negatives and returns a positive value (Python 3.5+).
g = gcd(pivot_val, elim_val)
pv_scaled = pivot_val // g
ev_scaled = elim_val // g
# new[c] = elim[c] * pv_scaled - ev_scaled * pivot[c]
new_row = {}
for c, p_val in pivot_row_data.items():
new_val = elim_row_data.get(c, 0) * pv_scaled - ev_scaled * p_val
if new_val != 0:
new_row[c] = new_val
for c, e_val in elim_row_data.items():
if c not in pivot_row_data:
new_val = e_val * pv_scaled
if new_val != 0:
new_row[c] = new_val
# Update row and reduce by GCD.
# gcd(*values) uses one C-level call instead of len(row) Python calls.
if new_row:
data[elim_row] = new_row
row_gcd = gcd(*new_row.values())
if row_gcd > 1:
for c in new_row:
new_row[c] //= row_gcd
else:
del data[elim_row]
pivot_row += 1
# Final GCD reduction of all rows
for row_data in data.values():
row_gcd = gcd(*row_data.values())
if row_gcd > 1:
for c in row_data:
row_data[c] //= row_gcd
# Translate results back to original column space
original_data = {r: {col_order[sc]: v for sc, v in rd.items()} for r, rd in data.items()}
pivot_cols_original = [col_order[p] for p in pivot_cols_sorted]
return original_data, len(pivot_cols_original), pivot_cols_original
def _nullspace_sparse(matrix: RationalMatrix) -> RationalMatrix:
"""Compute nullspace using integer RREF with row scaling.
No denominators needed - each row in RREF is scaled so pivot divides all elements.
The nullspace is extracted by reading off the relationships between pivot and free columns.
"""
cols = matrix.get_column_count()
# Compute integer RREF
rref_data, rank, pivot_cols = _rref_integer_sparse(matrix)
if rank == cols:
return RationalMatrix(cols, 0)
# Free columns
pivot_set = set(pivot_cols)
free_cols = [c for c in range(cols) if c not in pivot_set]
nullity = len(free_cols)
if nullity == 0:
return RationalMatrix(cols, 0)
# Build kernel matrix
# For each free column f, the nullspace vector has:
# - Entry 1 at position f (the free variable)
# - Entry -rref[i,f]/rref[i,pivot_i] at each pivot position
row_indices, col_indices = [], []
numerators, denominators = [], []
for k, free_col in enumerate(free_cols):
# Identity entry for free variable
row_indices.append(free_col)
col_indices.append(k)
numerators.append(1)
denominators.append(1)
# Entries from RREF for pivot variables
for i, pivot_col in enumerate(pivot_cols):
row_data = rref_data.get(i, {})
val_at_free = row_data.get(free_col, 0)
if val_at_free != 0:
pivot_val = row_data.get(pivot_col, 1) # Should always exist
# Nullspace entry: -val_at_free / pivot_val
g = gcd(abs(val_at_free), abs(pivot_val))
num = -val_at_free // g
den = pivot_val // g
# Ensure positive denominator
if den < 0:
num, den = -num, -den
row_indices.append(pivot_col)
col_indices.append(k)
numerators.append(num)
denominators.append(den)
return RationalMatrix._build_from_sparse_data(row_indices, col_indices, numerators, denominators, cols, nullity)
# =============================================================================
# Linear Algebra Functions
# =============================================================================
[docs]
def nullspace(matrix: RationalMatrix) -> RationalMatrix:
"""Compute right nullspace (kernel). Returns K where matrix @ K = 0.
Uses integer RREF with column/row pre-sorting and GCD reduction.
All arithmetic is Python arbitrary-precision integers — no overflow possible.
Args:
matrix: Input RationalMatrix
Returns:
Kernel matrix K where matrix @ K = 0
"""
return _nullspace_sparse(matrix)
[docs]
def basic_columns(matrix: RationalMatrix) -> List[int]:
"""Find indices of basic (pivot) columns using integer RREF."""
_, _, pivot_cols = _rref_integer_sparse(matrix)
return pivot_cols
[docs]
def basic_columns_from_numpy(mx: np.ndarray) -> List[int]:
"""Find basic columns from a numpy array."""
return basic_columns(RationalMatrix.from_numpy(mx))
# =============================================================================
# Configuration
# =============================================================================
[docs]
class CompressionMethod(Enum):
"""Compression methods for metabolic network compression."""
NULLSPACE = "Nullspace" # Nullspace-based compression
RECURSIVE = "Recursive" # Iterate until no more compression possible
@classmethod
def all(cls) -> List['CompressionMethod']:
return list(cls)
@classmethod
def none(cls) -> List['CompressionMethod']:
return []
@classmethod
[docs]
def standard(cls) -> List['CompressionMethod']:
"""Standard compression methods (recommended)."""
return [cls.NULLSPACE, cls.RECURSIVE]
# =============================================================================
# Statistics
# =============================================================================
[docs]
class CompressionStatistics:
"""Tracks compression statistics for logging."""
def __init__(self):
self.iteration_count = 0
self.zero_flux_count = 0
self.coupled_count = 0
self.unused_metabolite_count = 0
def inc_compression_iteration(self) -> int:
self.iteration_count += 1
return self.iteration_count
def get_compression_iteration(self) -> int:
return self.iteration_count
def inc_zero_flux_reactions(self) -> None:
self.zero_flux_count += 1
def inc_coupled_reactions_count(self, count: int) -> None:
self.coupled_count += count
def inc_unused_metabolite(self) -> None:
self.unused_metabolite_count += 1
def write_to_log(self) -> None:
LOG.info(f"Compression complete: {self.iteration_count} iterations, "
f"{self.zero_flux_count} zero-flux, "
f"{self.coupled_count} coupled, {self.unused_metabolite_count} unused metabolites")
def __repr__(self):
return (f"CompressionStatistics(iterations={self.iteration_count}, "
f"zero_flux={self.zero_flux_count}, "
f"coupled={self.coupled_count})")
# =============================================================================
# Compression Record
# =============================================================================
[docs]
class CompressionRecord:
"""
Compression result with transformation matrices.
Contains: pre @ stoich @ post == cmp
For EFM expansion: efm_original = post @ efm_compressed
"""
def __init__(self,
pre: RationalMatrix,
cmp: RationalMatrix,
post: RationalMatrix,
meta_names: List[str],
stats: Optional[CompressionStatistics] = None):
self.pre = pre # metabolite transformation
self.cmp = cmp # compressed stoich
self.post = post # reaction transformation
self.meta_names = list(meta_names) # compressed metabolite names (row order in cmp)
self.stats = stats
# =============================================================================
# Working State (Internal)
# =============================================================================
class _Size:
"""Mutable counter for active matrix dimensions during compression."""
def __init__(self, metas: int, reacs: int):
self.metas = metas
self.reacs = reacs
class _WorkRecord:
"""Mutable state during compression algorithm."""
def __init__(self, stoich: RationalMatrix, meta_names: List[str], reac_names: List[str],
bounds: Optional[List[Tuple[float, float]]] = None):
rows, cols = stoich.get_row_count(), stoich.get_column_count()
self.pre = RationalMatrix.identity(rows)
self.cmp = stoich.clone()
self.post = RationalMatrix.identity(cols)
self.meta_names = list(meta_names)
self.reac_names = list(reac_names)
self.bounds = list(bounds) if bounds else [(-float('inf'), float('inf'))] * cols
self.size = _Size(rows, cols)
# Store original dimensions for get_truncated()
self._orig_metas = rows
self._orig_reacs = cols
self.stats = CompressionStatistics()
self.stats.inc_compression_iteration()
def remove_reactions(self, suppressed: Optional[Set[str]]) -> bool:
"""Remove reactions by name - uses batch removal."""
if not suppressed:
return False
indices = set()
for name in suppressed:
try:
idx = self.reac_names.index(name)
if idx < self.size.reacs:
indices.add(idx)
except ValueError:
continue
if not indices:
return False
return self.remove_reactions_by_indices(indices)
def remove_reactions_by_indices(self, indices: Set[int]) -> bool:
"""Remove reactions by index - uses batch removal for efficiency."""
if not indices:
return False
# Filter to only valid indices
valid_indices = {idx for idx in indices if idx < self.size.reacs}
if not valid_indices:
return False
# Compute indices to keep
keep_indices = [i for i in range(self.size.reacs) if i not in valid_indices]
if not keep_indices:
self.size.reacs = 0
return True
# Use batch column removal
self._batch_remove_columns(keep_indices)
return True
def _batch_remove_columns(self, keep_indices: list) -> None:
"""Batch remove columns by keeping only specified indices."""
LOG.debug(f"_batch_remove_columns: keeping {len(keep_indices)} of {self.size.reacs}")
LOG.debug(f" Before: cmp={self.cmp.get_column_count()}, post={self.post.get_column_count()}")
# Remove columns from post and cmp matrices
self.post.remove_columns(keep_indices)
self.cmp.remove_columns(keep_indices)
LOG.debug(f" After: cmp={self.cmp.get_column_count()}, post={self.post.get_column_count()}")
# Reindex names and bounds
new_names = [self.reac_names[i] for i in keep_indices]
new_bounds = [self.bounds[i] for i in keep_indices]
for i, name in enumerate(new_names):
self.reac_names[i] = name
self.bounds = new_bounds
self.size.reacs = len(keep_indices)
LOG.debug(f" size.reacs={self.size.reacs}")
def _batch_remove_rows(self, keep_indices: list) -> None:
"""Batch remove rows (metabolites) by keeping only specified indices."""
LOG.debug(f"_batch_remove_rows: keeping {len(keep_indices)} of {self.size.metas}")
# Remove rows from pre and cmp matrices
self.pre.remove_rows(keep_indices)
self.cmp.remove_rows(keep_indices)
# Reindex metabolite names
new_names = [self.meta_names[i] for i in keep_indices]
for i, name in enumerate(new_names):
self.meta_names[i] = name
self.size.metas = len(keep_indices)
LOG.debug(f" size.metas={self.size.metas}")
def remove_metabolites_by_indices(self, indices: Set[int]) -> bool:
"""Remove metabolites by index - uses batch removal for efficiency."""
if not indices:
return False
# Filter to only valid indices
valid_indices = {idx for idx in indices if idx < self.size.metas}
if not valid_indices:
return False
# Compute indices to keep
keep_indices = [i for i in range(self.size.metas) if i not in valid_indices]
if not keep_indices:
self.size.metas = 0
return True
# Use batch row removal
self._batch_remove_rows(keep_indices)
return True
def remove_unused_metabolites(self) -> bool:
"""Remove metabolites with all-zero rows - uses batch removal.
Uses CSR indptr for O(m) zero-row detection (avoids element-by-element
sparse indexing which is O(m*r) with large per-call Python overhead).
"""
mc, rc = self.size.metas, self.size.reacs
# Slice active submatrix and convert to CSR for indptr access.
# tocsr() is a no-op if already CSR (returns self), so this is safe
# regardless of current format.
num_csr = self.cmp._num_sparse[:mc, :rc].tocsr()
zero_rows = np.where(np.diff(num_csr.indptr) == 0)[0]
unused_indices = set(zero_rows.tolist())
for _ in unused_indices:
self.stats.inc_unused_metabolite()
if unused_indices:
self.remove_metabolites_by_indices(unused_indices)
return True
return False
def get_truncated(self) -> CompressionRecord:
"""Create final CompressionRecord with truncated matrices."""
mc, rc = self.size.metas, self.size.reacs
# Use stored original dimensions (batch removal changes matrix sizes)
m = self._orig_metas
r = self._orig_reacs
# Create truncated matrices using efficient submatrix extraction
# pre: mc compressed metabolites × m original metabolites
# cmp: mc compressed metabolites × rc compressed reactions
# post: r original reactions × rc compressed reactions
pre_trunc = self.pre.submatrix(mc, m)
cmp_trunc = self.cmp.submatrix(mc, rc)
post_trunc = self.post.submatrix(r, rc)
meta_names_trunc = self.meta_names[:mc]
return CompressionRecord(pre_trunc, cmp_trunc, post_trunc, meta_names_trunc, self.stats)
# =============================================================================
# Core Algorithm
# =============================================================================
[docs]
class StoichMatrixCompressor:
"""Nullspace-based metabolic network compression."""
def __init__(self, *methods: CompressionMethod):
self._methods = list(methods) if methods else CompressionMethod.standard()
[docs]
def compress(self,
stoich: RationalMatrix,
meta_names: List[str],
reac_names: List[str],
suppressed: Optional[Set[str]] = None,
bounds: Optional[List[Tuple[float, float]]] = None) -> CompressionRecord:
"""Compress network, return transformation matrices.
Single-pass approach: each iteration computes the nullspace once,
then removes zero-flux reactions AND merges coupled groups from
the same kernel. Re-iterates only when contradicting groups were
removed (which may expose new couplings).
"""
work = _WorkRecord(stoich, meta_names, reac_names, bounds)
do_nullspace = CompressionMethod.NULLSPACE in self._methods
do_recursive = CompressionMethod.RECURSIVE in self._methods
work.remove_reactions(suppressed)
if do_nullspace:
while True:
work.stats.inc_compression_iteration()
work.remove_unused_metabolites()
found = self._nullspace_compress(work)
if not (found and do_recursive):
break
work.remove_unused_metabolites()
work.stats.write_to_log()
return work.get_truncated()
def _nullspace_compress(self, work: _WorkRecord) -> bool:
"""One pass of nullspace compression. Returns True if contradicting
groups were removed (triggering re-iteration).
Computes the nullspace once, then from the same kernel:
- Identifies zero-flux reactions (absent from kernel)
- Identifies and merges coupled reaction groups (proportional kernel rows)
- Removes zero-flux + coupled slaves + contradicting groups in one batch
"""
# Build active submatrix for nullspace computation
active = work.cmp.submatrix(work.size.metas, work.size.reacs)
kernel = nullspace(active)
LOG.debug(
f"Nullspace: {active.get_row_count()}x{active.get_column_count()} -> kernel {kernel.get_row_count()}x{kernel.get_column_count()}"
)
# Get kernel pattern (CSR for structure) and values (Fractions for exact ratios)
kernel_pattern, kernel_values = kernel.to_sparse_pattern()
# Find zero-flux reactions and merge coupled groups in one pass
return self._handle_compress(work, kernel_pattern, kernel_values)
def _find_zero_flux(self, work: _WorkRecord, kernel_sparse) -> set:
"""Find reactions with all-zero kernel rows (indices in current work)."""
zero_flux = set()
for reac in range(work.size.reacs):
if kernel_sparse.indptr[reac] == kernel_sparse.indptr[reac + 1]:
zero_flux.add(reac)
work.stats.inc_zero_flux_reactions()
return zero_flux
def _find_coupled_groups(self, kernel_pattern, kernel_values, num_reacs):
"""Find groups of coupled reactions from kernel sparsity pattern.
Args:
kernel_pattern: CSR matrix with 1s at non-zero positions (for indptr/indices)
kernel_values: {row: {col: Fraction}} for actual values
Returns (groups, ratios) where:
- groups: list of lists, each containing indices of coupled reactions
- ratios: list where ratios[i] is the coupling ratio for reaction i
"""
# Group reactions by zero-pattern in kernel
patterns = {}
for reac in range(num_reacs):
start = kernel_pattern.indptr[reac]
end = kernel_pattern.indptr[reac + 1]
pattern = tuple(kernel_pattern.indices[start:end])
patterns.setdefault(pattern, []).append(reac)
potential_groups = [idxs for idxs in patterns.values() if len(idxs) > 1]
groups = []
ratios = [None] * num_reacs
for potential in potential_groups:
ref_reac = potential[0]
ref_start = kernel_pattern.indptr[ref_reac]
ref_end = kernel_pattern.indptr[ref_reac + 1]
nonzero_cols = list(kernel_pattern.indices[ref_start:ref_end])
if not nonzero_cols:
continue
for i, reac_a in enumerate(potential):
if ratios[reac_a] is not None:
continue
group = None
# Get values for reaction a
a_row = kernel_values.get(reac_a, {})
for j in range(i + 1, len(potential)):
reac_b = potential[j]
if ratios[reac_b] is not None:
continue
# Get values for reaction b
b_row = kernel_values.get(reac_b, {})
# Compute ratio from first non-zero column
first_col = nonzero_cols[0]
a_val = a_row.get(first_col, Fraction(0))
b_val = b_row.get(first_col, Fraction(0))
if b_val == 0:
continue
# ratio = a_val / b_val
ratio = a_val / b_val
# Check if ratio is consistent across all columns
is_consistent = True
for col in nonzero_cols[1:]:
a_v = a_row.get(col, Fraction(0))
b_v = b_row.get(col, Fraction(0))
if b_v == 0 or a_v / b_v != ratio:
is_consistent = False
break
if is_consistent:
ratios[reac_b] = ratio
if group is None:
group = [reac_a]
group.append(reac_b)
if group is not None:
groups.append(group)
return groups, ratios
def _handle_compress(self, work: _WorkRecord, kernel_pattern, kernel_values) -> bool:
"""Remove zero-flux and merge coupled reactions from a single kernel.
From one nullspace computation:
1. Identifies zero-flux reactions (empty kernel rows)
2. Finds and merges coupled groups (proportional kernel rows)
3. Detects contradicting groups via bounds intersection
4. Removes everything in one batch
Returns True if contradicting groups were removed (flux space
changed), triggering re-iteration to find new couplings.
"""
# Collect zero-flux reactions (empty kernel rows)
zero_flux = self._find_zero_flux(work, kernel_pattern)
# Also catch reactions blocked by bounds (lb=ub=0) that still have
# non-zero kernel rows. These are structurally present but carry
# no flux. Removing them here avoids needing a separate FVA pass.
for reac in range(work.size.reacs):
if reac not in zero_flux:
lb, ub = work.bounds[reac]
if lb == 0 and ub == 0:
zero_flux.add(reac)
work.stats.inc_zero_flux_reactions()
# Find and merge coupled groups
groups, ratios = self._find_coupled_groups(kernel_pattern, kernel_values, work.size.reacs)
reactions_to_remove = set(zero_flux)
contradicting_removed = False
# Enter batch edit mode for cmp and post matrices to avoid repeated LIL conversions
work.cmp.begin_batch_edit()
work.post.begin_batch_edit()
for group in groups:
self._combine_coupled(work, group, ratios)
# Check bounds intersection to detect contradicting groups.
# v_master = ratios[slave] * v_slave => v_slave = v_master / ratios[slave]
# Slave constraint: lb_s <= v_slave <= ub_s
# Translates to bounds on v_master depending on sign of ratios[slave].
master = group[0]
lb_m, ub_m = work.bounds[master]
intersected_lb = lb_m
intersected_ub = ub_m
for slave in group[1:]:
ratio = ratios[slave] # v_master / v_slave
lb_s, ub_s = work.bounds[slave]
if ratio > 0:
# lb_s * ratio <= v_master <= ub_s * ratio
s_lb = -float('inf') if lb_s == -float('inf') else lb_s * float(ratio)
s_ub = float('inf') if ub_s == float('inf') else ub_s * float(ratio)
else: # ratio < 0
# ub_s * ratio <= v_master <= lb_s * ratio
s_lb = -float('inf') if ub_s == float('inf') else ub_s * float(ratio)
s_ub = float('inf') if lb_s == -float('inf') else lb_s * float(ratio)
intersected_lb = max(intersected_lb, s_lb)
intersected_ub = min(intersected_ub, s_ub)
# Update master bounds to intersection
work.bounds[master] = (intersected_lb, intersected_ub)
if intersected_lb > intersected_ub or (intersected_lb == 0 and intersected_ub == 0):
# Contradicting: only zero flux feasible, or infeasible.
# Remove master and all slaves.
LOG.debug(f"Contradicting coupled group: {[work.reac_names[r] for r in group]}")
for idx in group:
reactions_to_remove.add(idx)
contradicting_removed = True
else:
# Consistent: only remove slaves (merged into master)
for idx in group[1:]:
reactions_to_remove.add(idx)
# End batch edit mode
work.cmp.end_batch_edit()
work.post.end_batch_edit()
LOG.debug(f"Compression: {len(zero_flux)} zero-flux, {len(groups)} coupled groups, "
f"removing {len(reactions_to_remove)} reactions, contradicting={contradicting_removed}")
work.remove_reactions_by_indices(reactions_to_remove)
return contradicting_removed
def _combine_coupled(self, work: _WorkRecord, group: List[int], ratios: List[Optional[Fraction]]) -> None:
"""Combine coupled reactions into master reaction.
Uses batch column operations with native fmpq arithmetic for performance.
"""
master = group[0]
LOG.debug(f"Combining coupled: {[work.reac_names[r] for r in group]}")
for slave in group[1:]:
ratio = ratios[slave]
# multiplier = 1/ratio = ratio.denominator / ratio.numerator
mult_num, mult_den = ratio.denominator, ratio.numerator
# Update stoichiometric matrix: cmp[:, master] += cmp[:, slave] * mult
work.cmp.add_scaled_column(master, slave, mult_num, mult_den)
# Update post matrix: post[:, master] += post[:, slave] * mult
work.post.add_scaled_column(master, slave, mult_num, mult_den)
work.stats.inc_coupled_reactions_count(len(group))
# =============================================================================
# COBRA Interface
# =============================================================================
[docs]
class CompressionResult:
"""Result of COBRA model compression."""
def __init__(self, compressed_model, compression_converter, pre_matrix, post_matrix, reaction_map, metabolite_map, statistics,
methods_used, original_reaction_names, original_metabolite_names, flipped_reactions):
self.compressed_model = compressed_model
self.compression_converter = compression_converter
self.pre_matrix = pre_matrix
self.post_matrix = post_matrix
self.reaction_map = reaction_map
self.metabolite_map = metabolite_map
self.statistics = statistics
self.methods_used = methods_used
self.original_reaction_names = original_reaction_names
self.original_metabolite_names = original_metabolite_names
self.flipped_reactions = flipped_reactions
@property
def compression_ratio(self) -> float:
return len(self.compressed_model.reactions) / len(self.original_reaction_names)
@property
def reactions_removed(self) -> int:
return len(self.original_reaction_names) - len(self.compressed_model.reactions)
def summary(self) -> str:
return f"Compressed {len(self.original_reaction_names)} -> {len(self.compressed_model.reactions)} reactions ({self.compression_ratio:.1%})"
[docs]
class CompressionConverter:
"""Bidirectional transformer for expressions between original and compressed spaces."""
def __init__(self, reaction_map: Dict[str, Dict[str, Union[float, Fraction]]],
metabolite_map: Dict[str, Dict[str, Union[float, Fraction]]], flipped_reactions: List[str]):
self.reaction_map = reaction_map
self.metabolite_map = metabolite_map
self.flipped_reactions = set(flipped_reactions)
[docs]
def expand_expression(self, expression: Dict[str, float], remove_missing: bool = False) -> Dict[str, float]:
"""Transform expression from compressed back to original space."""
expanded = {}
for comp_rxn, comp_coeff in expression.items():
if comp_rxn in self.reaction_map:
for orig_rxn, scale in self.reaction_map[comp_rxn].items():
coeff = comp_coeff * scale
if orig_rxn in self.flipped_reactions:
coeff = -coeff
expanded[orig_rxn] = expanded.get(orig_rxn, 0) + coeff
elif not remove_missing:
LOG.warning(f"Compressed reaction {comp_rxn} not found")
return expanded
[docs]
def remove_conservation_relations(model) -> None:
"""Remove conservation relations (dependent metabolites) from a model.
This reduces the number of metabolites while maintaining the original flux space.
Uses exact rational arithmetic to find linearly independent rows.
Args:
model: COBRA model to modify in-place
"""
# Build the transposed stoichiometric matrix (reactions × metabolites) directly
# from cobra reaction coefficients — avoids a sparse→dense→sparse round-trip that
# would iterate over all m×r elements including zeros.
num_rxns = len(model.reactions)
num_mets = len(model.metabolites)
met_index = {m.id: i for i, m in enumerate(model.metabolites)}
row_idx, col_idx, num_data, den_data = [], [], [], []
for j, rxn in enumerate(model.reactions):
for met, coeff in rxn._metabolites.items():
if coeff == 0:
continue
i = met_index[met.id]
if isinstance(coeff, Fraction):
frac = coeff
elif hasattr(coeff, 'numerator'):
frac = Fraction(coeff.numerator, coeff.denominator)
else:
frac = float_to_rational(float(coeff))
row_idx.append(j) # reaction → row (transposed layout)
col_idx.append(i) # metabolite → column
num_data.append(frac.numerator)
den_data.append(frac.denominator)
from scipy.sparse import csr_matrix as _csr
num_sparse = _csr((num_data, (row_idx, col_idx)), shape=(num_rxns, num_mets), dtype=np.int64)
den_sparse = _csr((den_data, (row_idx, col_idx)), shape=(num_rxns, num_mets), dtype=np.int64)
rm = RationalMatrix._from_sparse(num_sparse, den_sparse)
basic_mets = basic_columns(rm)
dependent_mets = [model.metabolites[i] for i in set(range(num_mets)) - set(basic_mets)]
if dependent_mets:
model.remove_metabolites(dependent_mets)
[docs]
def compress_cobra_model(model,
methods: Optional[List[Union[str, CompressionMethod]]] = None,
in_place: bool = True,
suppressed_reactions: Optional[Set[str]] = None) -> CompressionResult:
"""
Compress a COBRA model using nullspace-based coupling detection.
Note: Model should be preprocessed first (rational coefficients,
conservation relations removed). Use networktools.compress_model()
for automatic preprocessing.
Args:
model: COBRA model to compress (should be preprocessed)
methods: Compression methods. Default: CompressionMethod.standard()
in_place: Modify original model (True) or work on copy (False)
suppressed_reactions: Reaction names to exclude from compression
Returns:
CompressionResult with compressed model and transformation data
"""
if not in_place:
model = copy.deepcopy(model)
original_reaction_names = [r.id for r in model.reactions]
original_metabolite_names = [m.id for m in model.metabolites]
# Parse methods
if methods is None:
compression_methods = CompressionMethod.standard()
else:
compression_methods = []
for method in methods:
if isinstance(method, CompressionMethod):
compression_methods.append(method)
elif isinstance(method, str):
compression_methods.append(CompressionMethod[method.upper()])
# Build stoichiometric matrix with exact arithmetic
stoich_matrix = RationalMatrix.from_cobra_model(model)
metabolite_names = [m.id for m in model.metabolites]
reaction_names = [r.id for r in model.reactions]
# Run compression
compressor = StoichMatrixCompressor(*compression_methods)
bounds = [(float(r.lower_bound), float(r.upper_bound)) for r in model.reactions]
compression_record = compressor.compress(stoich_matrix, metabolite_names, reaction_names, suppressed_reactions, bounds)
# Apply to model (uses direct manipulation, bypasses solver)
reaction_map = _apply_compression_to_model(model, compression_record, reaction_names)
pre_matrix = compression_record.pre.to_numpy()
post_matrix = compression_record.post.to_numpy()
converter = CompressionConverter(reaction_map, {}, [])
return CompressionResult(compressed_model=model,
compression_converter=converter,
pre_matrix=pre_matrix,
post_matrix=post_matrix,
reaction_map=reaction_map,
metabolite_map={},
statistics=compression_record.stats,
methods_used=compression_methods,
original_reaction_names=original_reaction_names,
original_metabolite_names=original_metabolite_names,
flipped_reactions=[])
def _apply_compression_to_model(model, compression_record, original_reaction_names):
"""Apply compression results to COBRA model using exact Fraction arithmetic.
Sets stoichiometric coefficients and bounds as Fractions directly from
the compressed matrices, avoiding floating-point arithmetic.
Uses direct attribute manipulation to bypass solver updates during modification.
Solver must be rebuilt after calling this function.
"""
post = compression_record.post
cmp = compression_record.cmp
meta_names = compression_record.meta_names
num_original = post.get_row_count()
num_compressed = post.get_column_count()
num_metas = cmp.get_row_count()
# Track which original reactions to keep (main reactions)
keep_rxns = [False] * num_original
reaction_map = {}
# Build metabolite lookup for compressed metabolites
met_lookup = {name: model.metabolites.get_by_id(name) for name in meta_names}
for j in range(num_compressed):
# Find contributing original reactions from POST matrix (sparse iteration)
contributing = list(post.iter_column_fractions(j))
if not contributing:
continue
# Select "main" reaction: most metabolites (nonzeros), ties broken alphabetically
main_idx = min(
(idx for idx, _ in contributing),
key=lambda i: (-len(model.reactions[i]._metabolites), model.reactions[i].id),
)
main_rxn = model.reactions[main_idx]
keep_rxns[main_idx] = True
if len(contributing) == 1:
# Standalone reaction — not merged with anything.
reaction_map[main_rxn.id] = {original_reaction_names[main_idx]: Fraction(1)}
continue
# --- Merged group (2+ contributing reactions) ---
# Store subset info
main_rxn.subset_rxns = [idx for idx, _ in contributing]
main_rxn.subset_stoich = [coeff for _, coeff in contributing]
# Build combined reaction name from contributing reactions
for idx, _ in contributing:
if idx == main_idx:
continue
rxn = model.reactions[idx]
if len(main_rxn.id) + len(rxn.id) < 220 and not main_rxn.id.endswith('...'):
main_rxn.id += '*' + rxn.id
elif not main_rxn.id.endswith('...'):
main_rxn.id += '...'
# Build new metabolites dict from cmp matrix (sparse iteration)
new_metabolites = {}
for m_idx, frac in cmp.iter_column_fractions(j):
met = met_lookup[meta_names[m_idx]]
new_metabolites[met] = frac
# OPTIMIZATION: Direct _metabolites assignment (bypasses solver updates)
# Clear old metabolite back-references
for met in list(main_rxn._metabolites.keys()):
if main_rxn in met._reaction:
met._reaction.discard(main_rxn)
# Set new metabolites directly
main_rxn._metabolites = new_metabolites
# Update new metabolite back-references
for met in new_metabolites:
met._reaction.add(main_rxn)
# Compute bounds as Fractions from original reactions scaled by POST factors
lb_candidates = []
ub_candidates = []
for idx, coeff in contributing:
rxn = model.reactions[idx]
lb, ub = rxn.lower_bound, rxn.upper_bound
# v_original = coeff * v_compressed (from POST interpretation)
# constraint: lb <= v_original <= ub
# so: lb <= coeff * v_compressed <= ub
# if coeff > 0: lb/coeff <= v_compressed <= ub/coeff
# if coeff < 0: ub/coeff <= v_compressed <= lb/coeff (inequality flips)
if coeff > 0:
if lb != -float('inf'):
lb_candidates.append(Fraction(lb) / coeff if lb != 0 else Fraction(0))
if ub != float('inf'):
ub_candidates.append(Fraction(ub) / coeff if ub != 0 else Fraction(0))
else: # coeff < 0
if ub != float('inf'):
lb_candidates.append(Fraction(ub) / coeff if ub != 0 else Fraction(0))
if lb != -float('inf'):
ub_candidates.append(Fraction(lb) / coeff if lb != 0 else Fraction(0))
# OPTIMIZATION: Direct bounds assignment (bypasses solver updates)
main_rxn._lower_bound = max(lb_candidates) if lb_candidates else -float('inf')
main_rxn._upper_bound = min(ub_candidates) if ub_candidates else float('inf')
# Update stored objective through compression factors
obj_dict = getattr(model, '_suppressed_obj', None)
if obj_dict is not None:
merged_obj = sum(
obj_dict.pop(original_reaction_names[idx], 0.0) * float(coeff)
for idx, coeff in contributing
)
if merged_obj != 0:
obj_dict[main_rxn.id] = merged_obj
# Build reaction map
reaction_map[main_rxn.id] = {original_reaction_names[idx]: coeff for idx, coeff in contributing}
# OPTIMIZATION: Batch reaction and metabolite removal
# Use reaction objects directly (not IDs, since IDs are modified during compression)
from cobra import DictList
# Build set of reaction objects to keep
keep_rxn_objs = {model.reactions[i] for i in range(num_original) if keep_rxns[i]}
# Find metabolites that will be in kept reactions
mets_in_kept_rxns = set()
for rxn in keep_rxn_objs:
mets_in_kept_rxns.update(rxn._metabolites.keys())
# Build new reactions list (only kept reactions)
new_reactions = DictList()
for rxn in model.reactions:
if rxn in keep_rxn_objs:
new_reactions.append(rxn)
else:
# Clear metabolite back-references for removed reaction
for met in list(rxn._metabolites.keys()):
if rxn in met._reaction:
met._reaction.discard(rxn)
rxn._model = None
# Build new metabolites list (only those in kept reactions)
new_metabolites = DictList()
for met in model.metabolites:
if met in mets_in_kept_rxns:
new_metabolites.append(met)
else:
met._model = None
# Replace model lists (using __dict__ to bypass any property setters)
model.__dict__['reactions'] = new_reactions
model.__dict__['metabolites'] = new_metabolites
# Update model references
for rxn in model.reactions:
rxn._model = model
for met in model.metabolites:
met._model = model
return reaction_map
# =============================================================================
# Preprocessing Functions
# =============================================================================
[docs]
def remove_blocked_reactions(model) -> List:
"""Remove blocked reactions (bounds == (0, 0)) from a network."""
blocked_reactions = [reac for reac in model.reactions if reac.bounds == (0, 0)]
if blocked_reactions:
model.remove_reactions(blocked_reactions, remove_orphans=True)
return blocked_reactions
[docs]
def remove_ext_mets(model) -> None:
"""Remove external metabolites from 'External_Species' compartment."""
external_mets = [m for m in model.metabolites if m.compartment == 'External_Species']
model.remove_metabolites(external_mets)
stoich_mat = create_stoichiometric_matrix(model)
obsolete_reacs = [r for r, has_nonzero in zip(model.reactions, np.any(stoich_mat, 0)) if not has_nonzero]
model.remove_reactions(obsolete_reacs)
[docs]
def remove_dummy_bounds(model) -> None:
"""Replace COBRA standard bounds with +/-inf."""
cobra_conf = Configuration()
bound_thres = max(abs(cobra_conf.lower_bound), abs(cobra_conf.upper_bound))
if any(any(abs(b) >= bound_thres for b in r.bounds) for r in model.reactions):
LOG.warning(f'Removing reaction bounds >= {round(bound_thres)}.')
for rxn in model.reactions:
if rxn.lower_bound <= -bound_thres:
rxn.lower_bound = -np.inf
if rxn.upper_bound >= bound_thres:
rxn.upper_bound = np.inf
[docs]
def stoichmat_coeff2rational(model) -> None:
"""Convert stoichiometric coefficients to rational numbers."""
for rxn in model.reactions:
for met, coeff in rxn._metabolites.items():
if isinstance(coeff, (float, int)):
rxn._metabolites[met] = float_to_rational(coeff)
elif not hasattr(coeff, 'p'): # Not sympy.Rational
if hasattr(coeff, 'numerator'): # fractions.Fraction
rxn._metabolites[met] = Rational(coeff.numerator, coeff.denominator)
else:
raise TypeError(f"Unsupported coefficient type: {type(coeff)}")
[docs]
def stoichmat_coeff2float(model) -> None:
"""Convert stoichiometric coefficients to floats."""
for rxn in model.reactions:
for met, coeff in rxn._metabolites.items():
rxn._metabolites[met] = float(coeff)
# =============================================================================
# GPR Propagation Helpers
# =============================================================================
def _gpr_ast_to_sympy(node):
"""Convert a cobra GPR AST node to a sympy boolean expression.
Returns None for empty GPR (node is None), meaning the reaction has
no gene requirement and is always active.
"""
if node is None:
return None
if isinstance(node, ast.BoolOp):
children = [_gpr_ast_to_sympy(v) for v in node.values]
if isinstance(node.op, ast.And):
return SympyAnd(*children)
else:
return SympyOr(*children)
elif isinstance(node, ast.Name):
return SympySymbol(node.id)
return None
def _sympy_to_gpr_string(expr):
"""Convert a sympy boolean expression to a GPR rule string.
Produces correctly parenthesised output with sorted gene names for
deterministic results. Returns '' for None input.
"""
if expr is None:
return ''
if isinstance(expr, SympySymbol):
return str(expr)
if expr.func == SympyAnd:
parts = []
for arg in sorted(expr.args, key=str):
s = _sympy_to_gpr_string(arg)
if hasattr(arg, 'func') and arg.func == SympyOr:
s = f'({s})'
parts.append(s)
return ' and '.join(parts)
if expr.func == SympyOr:
parts = []
for arg in sorted(expr.args, key=str):
s = _sympy_to_gpr_string(arg)
if hasattr(arg, 'func') and arg.func == SympyAnd:
s = f'({s})'
parts.append(s)
return ' or '.join(parts)
return str(expr)
def _combine_gpr_and(gpr_bodies):
"""Combine GPR AST bodies with AND logic (for coupled/serial reaction merge).
Args:
gpr_bodies: list of AST nodes (reaction.gpr.body), may include None
An empty/None GPR means the reaction has no gene requirement (always active),
which acts as True in boolean logic. AND with True is a no-op, so empty GPRs
are skipped. Returns '' if all inputs are empty (no gene restriction).
Uses sympy And constructor which automatically flattens nested ANDs and
deduplicates terms. Full simplification is deferred to reduce_gpr downstream.
"""
sympy_exprs = [_gpr_ast_to_sympy(b) for b in gpr_bodies]
non_empty = [s for s in sympy_exprs if s is not None]
if not non_empty:
return ''
if len(non_empty) == 1:
return _sympy_to_gpr_string(non_empty[0])
combined = SympyAnd(*non_empty)
return _sympy_to_gpr_string(combined)
def _combine_gpr_or(gpr_bodies):
"""Combine GPR AST bodies with OR logic (for parallel reaction merge).
Args:
gpr_bodies: list of AST nodes (reaction.gpr.body), may include None
If any input is None (reaction always active regardless of genes), the
combined reaction is also always active, so the result is '' (no restriction).
Uses sympy Or constructor which automatically flattens nested ORs and
deduplicates terms. Full simplification is deferred to reduce_gpr downstream.
"""
sympy_exprs = [_gpr_ast_to_sympy(b) for b in gpr_bodies]
if any(s is None for s in sympy_exprs):
return ''
if not sympy_exprs:
return ''
if len(sympy_exprs) == 1:
return _sympy_to_gpr_string(sympy_exprs[0])
combined = SympyOr(*sympy_exprs)
return _sympy_to_gpr_string(combined)
# =============================================================================
# High-Level Compression API
# =============================================================================
[docs]
def compress_model(model, no_par_compress_reacs=set(), compression_backend='sparse_rref', propagate_gpr=False):
"""Compress a metabolic model using multiple techniques.
Performs blocked reaction removal, conservation relation removal, and
alternating dependent/parallel reaction lumping until no further
compression is possible.
Args:
model: COBRA model to compress in-place
no_par_compress_reacs: Reactions exempt from parallel compression
compression_backend: Compression backend to use:
- 'sparse_rref' (default): Pure Python sparse integer RREF.
No external dependencies beyond NumPy/SciPy.
- 'efmtool_rref' (legacy): Java-based EFMTool via JPype.
Requires a JVM and the jpype1 package.
propagate_gpr: If True, propagate and simplify GPR rules through
compression (AND for coupled, OR for parallel merges).
Empty GPR rules are correctly handled: skipped in AND (always
active), and absorb in OR (result is always active).
Uses sympy for boolean simplification. Default False.
Returns:
list of dict: Compression maps for reversing each compression step
"""
from straindesign.networktools import suppress_lp_context, _is_lp_suppressed
with suppress_lp_context(model):
cmp_mapReac = []
use_java = (compression_backend == 'efmtool_rref')
LOG.info(' Removing blocked reactions.')
remove_blocked_reactions(model)
LOG.info(' Converting coefficients to rationals.')
stoichmat_coeff2rational(model)
coupled_changed = None # None = not yet computed
run = 1
while True:
numr = len(model.reactions)
# 1. Parallel (cheap — hash-based, no RREF)
LOG.info(f' Compression {run}: Lumping parallel reactions.')
reac_map_exp = compress_model_parallel(model, no_par_compress_reacs,
propagate_gpr=propagate_gpr)
parallel_changed = numr > len(reac_map_exp)
if parallel_changed:
LOG.info(f' Reduced to {len(reac_map_exp)} reactions.')
cmp_mapReac.append({"reac_map_exp": reac_map_exp, "parallel": True})
# 2. Conservation relation removal (reduces S rows for RREF)
if use_java:
_remove_conservation_relations_java(model)
else:
remove_conservation_relations(model)
# 3. Exit if either parallel or coupled found nothing (after
# at least one full cycle). If one step found nothing,
# re-running it won't help — the model hasn't changed in
# a way that creates new opportunities for that step.
if coupled_changed is not None and (not parallel_changed or not coupled_changed):
LOG.info(f' Compression complete ({run - 1} cycles).')
break
# 4. Coupled (expensive — nullspace/RREF)
numr_pre = len(model.reactions)
LOG.info(f' Compression {run}: Lumping coupled reactions.')
reac_map_exp = compress_model_coupled(model, compression_backend,
propagate_gpr=propagate_gpr)
for new_reac, old_reac_val in reac_map_exp.items():
old_reacs = [r for r in no_par_compress_reacs if r in old_reac_val]
if old_reacs:
for r in old_reacs:
no_par_compress_reacs.remove(r)
no_par_compress_reacs.add(new_reac)
coupled_changed = numr_pre > len(reac_map_exp)
if coupled_changed:
LOG.info(f' Reduced to {len(reac_map_exp)} reactions.')
cmp_mapReac.append({"reac_map_exp": reac_map_exp, "parallel": False})
run += 1
# suppress_lp_context handles solver rebuild and objective restoration on exit
return cmp_mapReac
def _remove_conservation_relations_java(model) -> None:
"""Remove conservation relations using Java efmtool."""
from . import efmtool_cmp_interface as efm
stoich_mat = create_stoichiometric_matrix(model, array_type='lil')
basic_mets = efm.basic_columns_rat_java(stoich_mat.transpose().toarray(), tolerance=0)
dependent = [model.metabolites[i] for i in set(range(len(model.metabolites))) - set(basic_mets)]
if dependent:
model.remove_metabolites(dependent)
[docs]
def compress_model_coupled(model, compression_backend='sparse_rref', propagate_gpr=False,
suppressed_reactions=None):
"""Compress by lumping stoichiometrically coupled (dependent) reactions.
Identifies groups of reactions whose flux vectors are proportional in every
steady state (i.e. they share a common nullspace direction) and merges each
group into a single lumped reaction. Both the pure-Python and legacy Java
backends perform this operation; the compression_backend controls the nullspace algorithm.
Args:
model: COBRA model to compress in-place
compression_backend: 'sparse_rref' (default, Python) or 'efmtool_rref' (Java legacy)
propagate_gpr: If True, AND-combine GPR rules of merged reactions
(with sympy simplification). Empty GPRs are skipped. Default False.
suppressed_reactions: Set of reaction IDs to exclude from compression
(Java backend only). Used to protect reactions referenced in strain
design constraints from being deleted by the Java compressor's
CoupledContradicting logic. Ignored for the Python backend (which
handles contradicting groups correctly via bounds intersection).
Returns:
dict: Mapping {compressed_id: {orig_id: factor, ...}}
"""
# Save GPR AST bodies before either backend clears them
if propagate_gpr:
saved_gpr_bodies = {r.id: r.gpr.body for r in model.reactions}
if compression_backend == 'efmtool_rref':
from .efmtool_cmp_interface import compress_model_java
reaction_map = compress_model_java(model, suppressed_reactions=suppressed_reactions)
# Java backend handles contradicting groups internally (CoupledContradicting).
# Clean up any remaining zero-flux reactions that the Java compressor created.
zero_flux = {r for r in model.reactions if r.lower_bound == 0 and r.upper_bound == 0}
for r in zero_flux:
reaction_map.pop(r.id, None)
if zero_flux:
model.remove_reactions(list(zero_flux), remove_orphans=True)
else:
# Clear gene rules to match Java behavior
for r in model.reactions:
r.gene_reaction_rule = ''
result = compress_cobra_model(model, methods=CompressionMethod.standard(), in_place=True)
reaction_map = result.reaction_map
# Python compressor handles contradicting groups internally via bounds
# intersection in _handle_coupled_combine (removes zero-flux groups and
# re-iterates to find new couplings).
# Propagate GPR rules: AND-combine contributing reactions' GPR ASTs
if propagate_gpr:
for cmp_id, orig_map in reaction_map.items():
try:
rxn = model.reactions.get_by_id(cmp_id)
except KeyError:
continue
gpr_bodies = [saved_gpr_bodies.get(orig_id) for orig_id in orig_map]
rxn.gene_reaction_rule = _combine_gpr_and(gpr_bodies)
return reaction_map
# Backward-compatibility alias (old name referenced efmtool, but the function
# is backend-agnostic — the new name compress_model_coupled is preferred).
compress_model_efmtool = compress_model_coupled
[docs]
def compress_model_parallel(model, protected_rxns=set(), propagate_gpr=False):
"""Compress by lumping parallel reactions.
Args:
model: COBRA model to compress in-place
protected_rxns: Reactions exempt from parallel compression
propagate_gpr: If True, OR-combine GPR rules of lumped reactions
(with sympy simplification). Default False.
Returns:
dict: Mapping {compressed_id: {orig_id: factor, ...}}
"""
old_num_reac = len(model.reactions)
old_reac_ids = [r.id for r in model.reactions]
if propagate_gpr:
old_gpr_bodies = {r.id: r.gpr.body for r in model.reactions}
stoichmat_T = create_stoichiometric_matrix(model, 'lil').transpose()
factor = [d[0] if d else 1.0 for d in stoichmat_T.data]
A = sparse.diags(factor) @ stoichmat_T
lb = [float(r.lower_bound) for r in model.reactions]
ub = [float(r.upper_bound) for r in model.reactions]
fwd = sparse.lil_matrix([1. if (isinf(u) and f > 0 or isinf(l) and f < 0) else 0. for f, l, u in zip(factor, lb, ub)]).transpose()
rev = sparse.lil_matrix([1. if (isinf(l) and f > 0 or isinf(u) and f < 0) else 0. for f, l, u in zip(factor, lb, ub)]).transpose()
inh = sparse.lil_matrix([
i + 1 if not ((isinf(ub[i]) or ub[i] == 0) and (isinf(lb[i]) or lb[i] == 0)) else 0 for i in range(len(model.reactions))
]).transpose()
A = sparse.hstack((A, fwd, rev, inh), 'csr')
# Find parallel reactions via hash comparison
subset_list = []
prev_found = set()
protected = [r.id in protected_rxns for r in model.reactions]
hashes = [hash((tuple(A[i].indices), tuple(A[i].data))) for i in range(A.shape[0])]
for i in range(A.shape[0]):
if i in prev_found:
continue
if protected[i]:
subset_list.append([i])
continue
subset_i = [i]
for j in range(i + 1, A.shape[0]):
if (not protected[j] and j not in prev_found and hashes[i] == hashes[j]
and np.array_equal(A[i].indices, A[j].indices)
and np.array_equal(A[i].data, A[j].data)):
subset_i.append(j)
prev_found.add(j)
subset_list.append(subset_i)
# Lump parallel reactions
del_rxns = [False] * len(model.reactions)
obj_dict = getattr(model, '_suppressed_obj', None)
for rxn_idx in subset_list:
for i in range(1, len(rxn_idx)):
main_rxn = model.reactions[rxn_idx[0]]
other_rxn = model.reactions[rxn_idx[i]]
if len(main_rxn.id) + len(other_rxn.id) < 220 and not main_rxn.id.endswith('...'):
main_rxn.id += '*' + other_rxn.id
elif not main_rxn.id.endswith('...'):
main_rxn.id += '...'
del_rxns[rxn_idx[i]] = True
# Update stored objective (parallel factor = 1.0, sum contributions)
if len(rxn_idx) > 1 and obj_dict is not None:
merged_obj = sum(obj_dict.pop(old_reac_ids[j], 0.0) for j in rxn_idx)
if merged_obj != 0:
obj_dict[model.reactions[rxn_idx[0]].id] = merged_obj
# Pre-compute combined GPR for each group (OR logic) before deletions.
# Save (surviving_rxn_object, combined_gpr_string) pairs.
if propagate_gpr:
group_gpr = []
for rxn_idx_group in subset_list:
main_rxn = model.reactions[rxn_idx_group[0]]
gpr_bodies = [old_gpr_bodies.get(old_reac_ids[j]) for j in rxn_idx_group]
group_gpr.append((main_rxn, _combine_gpr_or(gpr_bodies)))
remove_list = [model.reactions[i] for i in np.where(del_rxns)[0]]
if remove_list:
model.remove_reactions(remove_list, remove_orphans=True)
# Set combined GPR rules on surviving reactions
if propagate_gpr:
for rxn, combined_gpr in group_gpr:
rxn.gene_reaction_rule = combined_gpr
# Build compression map
rational_map = {}
subT = np.zeros((old_num_reac, len(model.reactions)))
for i in range(subT.shape[1]):
for j in subset_list[i]:
subT[j, i] = 1
rational_map[model.reactions[i].id] = {old_reac_ids[j]: Rational(1) for j in subset_list[i]}
return rational_map
# =============================================================================
# Exports
# =============================================================================
__all__ = [
# Rational matrix and utilities
'RationalMatrix',
'float_to_rational',
'detect_max_precision',
'nullspace',
'basic_columns',
'basic_columns_from_numpy',
# Core compression
'compress_cobra_model',
'CompressionResult',
'CompressionRecord',
'CompressionStatistics',
'CompressionMethod',
'CompressionConverter',
'StoichMatrixCompressor',
# High-level API
'compress_model',
'compress_model_coupled',
'compress_model_efmtool', # backward-compat alias
'compress_model_parallel',
# GPR propagation helpers
'_gpr_ast_to_sympy',
'_sympy_to_gpr_string',
'_combine_gpr_and',
'_combine_gpr_or',
# Preprocessing
'remove_blocked_reactions',
'remove_ext_mets',
'remove_conservation_relations',
'remove_dummy_bounds',
'stoichmat_coeff2rational',
'stoichmat_coeff2float',
]