"""
Metabolic network compression using nullspace-based coupling detection.
This module provides network compression for COBRA models. The main function
is compress_cobra_model() which compresses a model and returns transformation
matrices for converting between original and compressed flux spaces.
API:
>>> from straindesign.compression import compress_cobra_model
>>> result = compress_cobra_model(model)
>>> compressed_model = result.compressed_model
Note: The model should be preprocessed first (rational coefficients,
conservation relations removed). Use networktools.compress_model() for
automatic preprocessing.
"""
import copy
import logging
import numpy as np
from enum import Enum
from functools import reduce
from math import gcd, lcm, isinf
from typing import Dict, Iterator, List, Optional, Set, Tuple, Union, Any
from fractions import Fraction
from scipy import sparse
from scipy.sparse import csr_matrix, csc_matrix
from sympy import Rational
from cobra import Configuration
from cobra.util.array import create_stoichiometric_matrix
LOG = logging.getLogger(__name__)
# =============================================================================
# Utility Functions
# =============================================================================
[docs]
def float_to_rational(val, max_precision: int = 6, max_denom: int = 100) -> Fraction:
"""Convert float to Fraction with bounded denominators."""
if isinstance(val, Fraction):
return val
if val == 0:
return Fraction(0)
if val == int(val):
return Fraction(int(val))
small_frac = Fraction(val).limit_denominator(max_denom)
if round(float(small_frac), max_precision) == round(val, max_precision):
return small_frac
denom = 10**max_precision
numer = round(val * denom)
return Fraction(numer, denom)
[docs]
def detect_max_precision(model) -> int:
"""Detect maximum decimal precision needed for model coefficients."""
max_decimals = 0
for rxn in model.reactions:
for met, coeff in rxn.metabolites.items():
if isinstance(coeff, float) and coeff != 0:
s = f"{abs(coeff):.15g}"
if '.' in s:
decimals = len(s.split('.')[1].rstrip('0'))
max_decimals = max(max_decimals, decimals)
return min(12, max(3, max_decimals))
def _lcm_list(numbers: List[int]) -> int:
"""Compute LCM of a list of integers."""
return reduce(lcm, numbers, 1) if numbers else 1
# =============================================================================
# Rational Matrix with Sparse Storage
# =============================================================================
[docs]
class RationalMatrix:
"""Sparse rational matrix using dual int sparse storage (numerators + denominators)."""
def __init__(self, rows: int, cols: int):
self._rows = rows
self._cols = cols
self._num_sparse: Optional[csr_matrix] = None # Numerators
self._den_sparse: Optional[csr_matrix] = None # Denominators
self._csc_cache: Optional[csc_matrix] = None
self._batch_mode = False
def _invalidate_cache(self):
if not self._batch_mode:
self._csc_cache = None
# -------------------------------------------------------------------------
# Construction
# -------------------------------------------------------------------------
@classmethod
def _from_sparse(cls,
num_sparse: csr_matrix,
den_sparse: csr_matrix,
scaled: Optional[csr_matrix] = None,
common_denom: Optional[int] = None) -> 'RationalMatrix':
"""Create RationalMatrix from sparse components."""
rows, cols = num_sparse.shape
result = cls(rows, cols)
result._num_sparse = num_sparse.copy()
result._den_sparse = den_sparse.copy()
return result
@classmethod
[docs]
def identity(cls, size: int) -> 'RationalMatrix':
"""Create identity matrix."""
row_idx = list(range(size))
col_idx = list(range(size))
num_data = [1] * size
den_data = [1] * size
num_sparse = csr_matrix((num_data, (row_idx, col_idx)), shape=(size, size), dtype=np.int64)
den_sparse = csr_matrix((den_data, (row_idx, col_idx)), shape=(size, size), dtype=np.int64)
return cls._from_sparse(num_sparse, den_sparse)
@classmethod
[docs]
def from_numpy(cls, arr: np.ndarray, max_precision: int = 6, max_denom: int = 100) -> 'RationalMatrix':
"""Create RationalMatrix from numpy array."""
rows, cols = arr.shape
row_idx, col_idx, num_data, den_data = [], [], [], []
for r in range(rows):
for c in range(cols):
val = arr[r, c]
if val != 0:
frac = float_to_rational(val, max_precision, max_denom)
row_idx.append(r)
col_idx.append(c)
num_data.append(frac.numerator)
den_data.append(frac.denominator)
num_sparse = csr_matrix((num_data, (row_idx, col_idx)), shape=(rows, cols), dtype=np.int64)
den_sparse = csr_matrix((den_data, (row_idx, col_idx)), shape=(rows, cols), dtype=np.int64)
return cls._from_sparse(num_sparse, den_sparse)
@classmethod
[docs]
def from_cobra_model(cls, model, max_precision: int = 6, max_denom: int = 100) -> 'RationalMatrix':
"""Create RationalMatrix from COBRA model stoichiometry."""
num_mets = len(model.metabolites)
num_rxns = len(model.reactions)
met_index = {m.id: i for i, m in enumerate(model.metabolites)}
row_idx, col_idx, num_data, den_data = [], [], [], []
for j, rxn in enumerate(model.reactions):
for met, coeff in rxn.metabolites.items():
if coeff != 0:
i = met_index[met.id]
if isinstance(coeff, Fraction):
frac = coeff
elif hasattr(coeff, 'p') and hasattr(coeff, 'q'):
frac = Fraction(int(coeff.p), int(coeff.q))
elif hasattr(coeff, 'numerator'):
frac = Fraction(coeff.numerator, coeff.denominator)
else:
frac = float_to_rational(coeff, max_precision, max_denom)
row_idx.append(i)
col_idx.append(j)
num_data.append(frac.numerator)
den_data.append(frac.denominator)
num_sparse = csr_matrix((num_data, (row_idx, col_idx)), shape=(num_mets, num_rxns), dtype=np.int64)
den_sparse = csr_matrix((den_data, (row_idx, col_idx)), shape=(num_mets, num_rxns), dtype=np.int64)
return cls._from_sparse(num_sparse, den_sparse)
@classmethod
def _build_from_sparse_data(cls, row_indices: List[int], col_indices: List[int], numerators: List[int], denominators: List[int],
num_rows: int, num_cols: int) -> 'RationalMatrix':
"""Build RationalMatrix from sparse coordinate data."""
num_sparse = csr_matrix((numerators, (row_indices, col_indices)), shape=(num_rows, num_cols), dtype=np.int64)
den_sparse = csr_matrix((denominators, (row_indices, col_indices)), shape=(num_rows, num_cols), dtype=np.int64)
return cls._from_sparse(num_sparse, den_sparse)
# -------------------------------------------------------------------------
# Size queries
# -------------------------------------------------------------------------
def get_row_count(self) -> int:
return self._rows
def get_column_count(self) -> int:
return self._cols
# -------------------------------------------------------------------------
# Iteration
# -------------------------------------------------------------------------
[docs]
def iter_column_fractions(self, col: int) -> Iterator[Tuple[int, Fraction]]:
"""Iterate over non-zero entries in column as (row, Fraction) pairs."""
if self._csc_cache is None:
self._csc_cache = self._num_sparse.tocsc()
self._den_csc_cache = self._den_sparse.tocsc()
start = self._csc_cache.indptr[col]
end = self._csc_cache.indptr[col + 1]
for idx in range(start, end):
row = self._csc_cache.indices[idx]
num = int(self._csc_cache.data[idx])
den = int(self._den_csc_cache.data[idx])
if num != 0:
yield row, Fraction(num, den)
[docs]
def get_signum(self, row: int, col: int) -> int:
"""Return sign of element: -1, 0, or 1."""
num = self._num_sparse[row, col]
if num > 0:
return 1
elif num < 0:
return -1
return 0
# -------------------------------------------------------------------------
# Batch edit mode
# -------------------------------------------------------------------------
[docs]
def begin_batch_edit(self):
"""Enter batch edit mode - delays cache invalidation."""
self._batch_mode = True
self._num_sparse = self._num_sparse.tolil()
self._den_sparse = self._den_sparse.tolil()
[docs]
def end_batch_edit(self):
"""Exit batch edit mode - converts back to CSR and invalidates cache."""
self._batch_mode = False
self._num_sparse = self._num_sparse.tocsr()
self._den_sparse = self._den_sparse.tocsr()
self._csc_cache = None
# -------------------------------------------------------------------------
# Matrix operations
# -------------------------------------------------------------------------
[docs]
def clone(self) -> 'RationalMatrix':
"""Create a deep copy."""
return RationalMatrix._from_sparse(self._num_sparse.copy(), self._den_sparse.copy())
[docs]
def submatrix(self, rows: int, cols: int) -> 'RationalMatrix':
"""Extract top-left submatrix of given dimensions."""
num_sub = self._num_sparse[:rows, :cols].copy()
den_sub = self._den_sparse[:rows, :cols].copy()
return RationalMatrix._from_sparse(num_sub, den_sub)
[docs]
def remove_rows(self, keep_indices: List[int]) -> None:
"""Keep only the specified rows."""
keep = np.array(keep_indices, dtype=np.intp)
self._num_sparse = self._num_sparse[keep, :]
self._den_sparse = self._den_sparse[keep, :]
self._rows = len(keep_indices)
self._invalidate_cache()
[docs]
def remove_columns(self, keep_indices: List[int]) -> None:
"""Keep only the specified columns."""
keep = np.array(keep_indices, dtype=np.intp)
self._num_sparse = self._num_sparse[:, keep]
self._den_sparse = self._den_sparse[:, keep]
self._cols = len(keep_indices)
self._invalidate_cache()
[docs]
def add_scaled_column(self, dst_col: int, src_col: int, scalar_num: int, scalar_den: int) -> None:
"""Add scalar * column[src] to column[dst]. dst[i] += (num/den) * src[i]"""
if scalar_num == 0:
return
num_lil = self._num_sparse
den_lil = self._den_sparse
# Get source column entries
if hasattr(num_lil, 'rows'): # LIL format
src_rows = num_lil.rows[src_col] if num_lil.format == 'csc' else None
# Convert to CSC for efficient column access
num_csc = num_lil.tocsc() if num_lil.format != 'csc' else num_lil
den_csc = den_lil.tocsc() if den_lil.format != 'csc' else den_lil
start = num_csc.indptr[src_col]
end = num_csc.indptr[src_col + 1]
for idx in range(start, end):
row = num_csc.indices[idx]
src_num = int(num_csc.data[idx])
src_den = int(den_csc.data[idx])
if src_num == 0:
continue
# Get current dst value
dst_num = int(num_lil[row, dst_col])
dst_den = int(den_lil[row, dst_col]) if dst_num != 0 else 1
# Compute: dst + (scalar_num/scalar_den) * (src_num/src_den)
# = dst_num/dst_den + (scalar_num * src_num)/(scalar_den * src_den)
add_num = scalar_num * src_num
add_den = scalar_den * src_den
if dst_num == 0:
new_num, new_den = add_num, add_den
else:
# Common denominator addition
new_num = dst_num * add_den + add_num * dst_den
new_den = dst_den * add_den
# Reduce
if new_num != 0:
g = gcd(abs(new_num), new_den)
new_num //= g
new_den //= g
num_lil[row, dst_col] = new_num
den_lil[row, dst_col] = new_den if new_num != 0 else 0
self._invalidate_cache()
# -------------------------------------------------------------------------
# Conversion
# -------------------------------------------------------------------------
[docs]
def to_numpy(self) -> np.ndarray:
"""Convert to numpy float array."""
result = np.zeros((self._rows, self._cols), dtype=np.float64)
coo_num = self._num_sparse.tocoo()
coo_den = self._den_sparse.tocoo()
for r, c, num, den in zip(coo_num.row, coo_num.col, coo_num.data, coo_den.data):
if num != 0:
result[r, c] = float(num) / float(den)
return result
[docs]
def to_sparse_csr(self) -> Tuple[csr_matrix, int]:
"""Return sparse representation and common denominator.
Returns numerator matrix scaled by LCM of denominators, plus the LCM.
"""
# Compute LCM of all denominators
dens = self._den_sparse.data
if len(dens) == 0:
return csr_matrix((self._rows, self._cols), dtype=np.int64), 1
common_denom = _lcm_list([int(d) for d in dens if d != 0])
# Scale numerators
coo_num = self._num_sparse.tocoo()
coo_den = self._den_sparse.tocoo()
scaled_data = []
for num, den in zip(coo_num.data, coo_den.data):
if num != 0:
scaled_data.append(int(num) * (common_denom // int(den)))
else:
scaled_data.append(0)
scaled = csr_matrix((scaled_data, (coo_num.row, coo_num.col)), shape=(self._rows, self._cols), dtype=np.int64)
return scaled, common_denom
[docs]
def to_sparse_pattern(self) -> Tuple[csr_matrix, Dict[int, Dict[int, Fraction]]]:
"""Return sparsity pattern as CSR and row-wise Fraction data.
For pattern analysis (coupled reaction detection) without integer overflow.
Returns:
pattern: CSR matrix with 1s at non-zero positions (for indptr/indices)
row_data: {row: {col: Fraction}} for actual values
"""
# Build pattern matrix (just 1s for structure)
coo_num = self._num_sparse.tocoo()
pattern_data = [1] * len(coo_num.data)
pattern = csr_matrix((pattern_data, (coo_num.row, coo_num.col)), shape=(self._rows, self._cols), dtype=np.int8)
# Build row-wise Fraction data
coo_den = self._den_sparse.tocoo()
row_data: Dict[int, Dict[int, Fraction]] = {}
for r, c, num, den in zip(coo_num.row, coo_num.col, coo_num.data, coo_den.data):
num_int = int(num)
den_int = int(den) if den != 0 else 1
if num_int != 0:
if r not in row_data:
row_data[r] = {}
row_data[r][c] = Fraction(num_int, den_int)
return pattern, row_data
def __repr__(self) -> str:
return f"RationalMatrix({self._rows}x{self._cols})"
# =============================================================================
# Sparse Integer RREF for Nullspace Computation
# =============================================================================
def _rref_integer_sparse(rm: RationalMatrix) -> Tuple[Dict[int, Dict[int, int]], int, List[int]]:
"""Compute integer RREF using dict-of-dicts for sparse row operations.
Columns are pre-sorted by nnz ascending so that sparse (likely pivot) columns
are processed first, reducing pivot-search time and fill-in during elimination.
Rows are similarly pre-sorted by nnz ascending. Results are translated back to
the original column ordering before return.
Uses GCD pre-scaling before row operations and post-reduction to control
coefficient growth. No denominator tracking needed — rows are arbitrarily
scalable for nullspace computation.
Args:
rm: Input RationalMatrix
Returns:
(rref_data, rank, pivot_columns)
rref_data: {rref_row: {orig_col: value}} in original column space
pivot_columns: pivot column indices in original column space
"""
rows = rm.get_row_count()
cols = rm.get_column_count()
# --- Column sorting: sparse columns first ---
# col_order[sorted_pos] = original_col
nnz_per_col = np.diff(rm._num_sparse.tocsc().indptr)
col_order = np.argsort(nnz_per_col, kind='stable').tolist()
col_inverse = [0] * cols
for sorted_pos, orig_col in enumerate(col_order):
col_inverse[orig_col] = sorted_pos
# Convert to integer matrix (scale each row by LCM of denominators)
# Store column indices in sorted space
num_csr = rm._num_sparse.tocsr()
den_csr = rm._den_sparse.tocsr()
data: Dict[int, Dict[int, int]] = {}
for r in range(rows):
start, end = num_csr.indptr[r], num_csr.indptr[r + 1]
if start == end:
continue
# Compute row LCM of denominators
row_dens = [int(den_csr.data[i]) for i in range(start, end) if den_csr.data[i] != 0]
row_lcm = reduce(lcm, row_dens, 1) if row_dens else 1
# Scale numerators; store using sorted column index
row_data = {}
for i in range(start, end):
num = int(num_csr.data[i])
den = int(den_csr.data[i]) if den_csr.data[i] != 0 else 1
orig_col = int(num_csr.indices[i])
scaled = num * (row_lcm // den)
if scaled != 0:
row_data[col_inverse[orig_col]] = scaled
if row_data:
data[r] = row_data
# --- Row sorting: sparse rows first (better initial pivot candidates) ---
if data:
sorted_row_keys = sorted(data.keys(), key=lambda r: len(data[r]))
data = {new_r: data[old_r] for new_r, old_r in enumerate(sorted_row_keys)}
# Gaussian elimination with partial pivoting (all indices in sorted column space)
pivot_cols_sorted = []
pivot_row = 0
for pivot_col in range(cols):
if pivot_row >= rows:
break
# Find pivot with smallest absolute value (minimizes coefficient growth)
best_row, best_val, best_abs = -1, 0, float('inf')
for r, row_data in data.items():
if r < pivot_row:
continue
if pivot_col in row_data:
v = row_data[pivot_col]
if abs(v) < best_abs:
best_row, best_val, best_abs = r, v, abs(v)
if best_row < 0:
continue
# Swap rows if needed
if best_row != pivot_row:
if pivot_row in data:
if best_row in data:
data[pivot_row], data[best_row] = data[best_row], data[pivot_row]
else:
data[best_row] = data.pop(pivot_row)
elif best_row in data:
data[pivot_row] = data.pop(best_row)
pivot_row_data = data.get(pivot_row)
if pivot_row_data is None:
continue
pivot_val = pivot_row_data.get(pivot_col, 0)
if pivot_val == 0:
continue
pivot_cols_sorted.append(pivot_col)
# Collect rows to eliminate (snapshot keys before modification)
elim_targets = [(r, row_data[pivot_col]) for r, row_data in data.items() if r != pivot_row and pivot_col in row_data]
# Eliminate pivot column from all other rows
for elim_row, elim_val in elim_targets:
elim_row_data = data[elim_row]
# Pre-scale by GCD to minimize coefficient growth.
# math.gcd handles negatives and returns a positive value (Python 3.5+).
g = gcd(pivot_val, elim_val)
pv_scaled = pivot_val // g
ev_scaled = elim_val // g
# new[c] = elim[c] * pv_scaled - ev_scaled * pivot[c]
new_row = {}
for c, p_val in pivot_row_data.items():
new_val = elim_row_data.get(c, 0) * pv_scaled - ev_scaled * p_val
if new_val != 0:
new_row[c] = new_val
for c, e_val in elim_row_data.items():
if c not in pivot_row_data:
new_val = e_val * pv_scaled
if new_val != 0:
new_row[c] = new_val
# Update row and reduce by GCD.
# gcd(*values) uses one C-level call instead of len(row) Python calls.
if new_row:
data[elim_row] = new_row
row_gcd = gcd(*new_row.values())
if row_gcd > 1:
for c in new_row:
new_row[c] //= row_gcd
else:
del data[elim_row]
pivot_row += 1
# Final GCD reduction of all rows
for row_data in data.values():
row_gcd = gcd(*row_data.values())
if row_gcd > 1:
for c in row_data:
row_data[c] //= row_gcd
# Translate results back to original column space
original_data = {r: {col_order[sc]: v for sc, v in rd.items()} for r, rd in data.items()}
pivot_cols_original = [col_order[p] for p in pivot_cols_sorted]
return original_data, len(pivot_cols_original), pivot_cols_original
def _nullspace_sparse(matrix: RationalMatrix) -> RationalMatrix:
"""Compute nullspace using integer RREF with row scaling.
No denominators needed - each row in RREF is scaled so pivot divides all elements.
The nullspace is extracted by reading off the relationships between pivot and free columns.
"""
cols = matrix.get_column_count()
# Compute integer RREF
rref_data, rank, pivot_cols = _rref_integer_sparse(matrix)
if rank == cols:
return RationalMatrix(cols, 0)
# Free columns
pivot_set = set(pivot_cols)
free_cols = [c for c in range(cols) if c not in pivot_set]
nullity = len(free_cols)
if nullity == 0:
return RationalMatrix(cols, 0)
# Build kernel matrix
# For each free column f, the nullspace vector has:
# - Entry 1 at position f (the free variable)
# - Entry -rref[i,f]/rref[i,pivot_i] at each pivot position
row_indices, col_indices = [], []
numerators, denominators = [], []
for k, free_col in enumerate(free_cols):
# Identity entry for free variable
row_indices.append(free_col)
col_indices.append(k)
numerators.append(1)
denominators.append(1)
# Entries from RREF for pivot variables
for i, pivot_col in enumerate(pivot_cols):
row_data = rref_data.get(i, {})
val_at_free = row_data.get(free_col, 0)
if val_at_free != 0:
pivot_val = row_data.get(pivot_col, 1) # Should always exist
# Nullspace entry: -val_at_free / pivot_val
g = gcd(abs(val_at_free), abs(pivot_val))
num = -val_at_free // g
den = pivot_val // g
# Ensure positive denominator
if den < 0:
num, den = -num, -den
row_indices.append(pivot_col)
col_indices.append(k)
numerators.append(num)
denominators.append(den)
return RationalMatrix._build_from_sparse_data(row_indices, col_indices, numerators, denominators, cols, nullity)
# =============================================================================
# Linear Algebra Functions
# =============================================================================
[docs]
def nullspace(matrix: RationalMatrix) -> RationalMatrix:
"""Compute right nullspace (kernel). Returns K where matrix @ K = 0.
Uses integer RREF with column/row pre-sorting and GCD reduction.
All arithmetic is Python arbitrary-precision integers — no overflow possible.
Args:
matrix: Input RationalMatrix
Returns:
Kernel matrix K where matrix @ K = 0
"""
return _nullspace_sparse(matrix)
[docs]
def basic_columns(matrix: RationalMatrix) -> List[int]:
"""Find indices of basic (pivot) columns using integer RREF."""
_, _, pivot_cols = _rref_integer_sparse(matrix)
return pivot_cols
[docs]
def basic_columns_from_numpy(mx: np.ndarray) -> List[int]:
"""Find basic columns from a numpy array."""
return basic_columns(RationalMatrix.from_numpy(mx))
# =============================================================================
# Configuration
# =============================================================================
[docs]
class CompressionMethod(Enum):
"""Compression methods for metabolic network compression."""
NULLSPACE = "Nullspace" # Nullspace-based compression
RECURSIVE = "Recursive" # Iterate until no more compression possible
@classmethod
def all(cls) -> List['CompressionMethod']:
return list(cls)
@classmethod
def none(cls) -> List['CompressionMethod']:
return []
@classmethod
[docs]
def standard(cls) -> List['CompressionMethod']:
"""Standard compression methods (recommended)."""
return [cls.NULLSPACE, cls.RECURSIVE]
# =============================================================================
# Statistics
# =============================================================================
[docs]
class CompressionStatistics:
"""Tracks compression statistics for logging."""
def __init__(self):
self.iteration_count = 0
self.zero_flux_count = 0
self.contradicting_count = 0
self.coupled_count = 0
self.unused_metabolite_count = 0
def inc_compression_iteration(self) -> int:
self.iteration_count += 1
return self.iteration_count
def get_compression_iteration(self) -> int:
return self.iteration_count
def inc_zero_flux_reactions(self) -> None:
self.zero_flux_count += 1
def inc_contradicting_reactions(self) -> None:
self.contradicting_count += 1
def inc_coupled_reactions_count(self, count: int) -> None:
self.coupled_count += count
def inc_unused_metabolite(self) -> None:
self.unused_metabolite_count += 1
def write_to_log(self) -> None:
LOG.info(f"Compression complete: {self.iteration_count} iterations, "
f"{self.zero_flux_count} zero-flux, {self.contradicting_count} contradicting, "
f"{self.coupled_count} coupled, {self.unused_metabolite_count} unused metabolites")
def __repr__(self):
return (f"CompressionStatistics(iterations={self.iteration_count}, "
f"zero_flux={self.zero_flux_count}, contradicting={self.contradicting_count}, "
f"coupled={self.coupled_count})")
# =============================================================================
# Compression Record
# =============================================================================
[docs]
class CompressionRecord:
"""
Compression result with transformation matrices.
Contains: pre @ stoich @ post == cmp
For EFM expansion: efm_original = post @ efm_compressed
"""
def __init__(self,
pre: RationalMatrix,
cmp: RationalMatrix,
post: RationalMatrix,
reversible: List[bool],
meta_names: List[str],
stats: Optional[CompressionStatistics] = None):
self.pre = pre # metabolite transformation
self.cmp = cmp # compressed stoich
self.post = post # reaction transformation
self.reversible = list(reversible)
self.meta_names = list(meta_names) # compressed metabolite names (row order in cmp)
self.stats = stats
# =============================================================================
# Working State (Internal)
# =============================================================================
class _Size:
"""Mutable counter for active matrix dimensions during compression."""
def __init__(self, metas: int, reacs: int):
self.metas = metas
self.reacs = reacs
class _WorkRecord:
"""Mutable state during compression algorithm."""
def __init__(self, stoich: RationalMatrix, reversible: List[bool], meta_names: List[str], reac_names: List[str]):
rows, cols = stoich.get_row_count(), stoich.get_column_count()
self.pre = RationalMatrix.identity(rows)
self.cmp = stoich.clone()
self.post = RationalMatrix.identity(cols)
self.reversible = list(reversible)
self.meta_names = list(meta_names)
self.reac_names = list(reac_names)
self.size = _Size(rows, cols)
# Store original dimensions for get_truncated()
self._orig_metas = rows
self._orig_reacs = cols
self.stats = CompressionStatistics()
self.stats.inc_compression_iteration()
def remove_reactions(self, suppressed: Optional[Set[str]]) -> bool:
"""Remove reactions by name - uses batch removal."""
if not suppressed:
return False
indices = set()
for name in suppressed:
try:
idx = self.reac_names.index(name)
if idx < self.size.reacs:
indices.add(idx)
except ValueError:
continue
if not indices:
return False
return self.remove_reactions_by_indices(indices)
def remove_reactions_by_indices(self, indices: Set[int]) -> bool:
"""Remove reactions by index - uses batch removal for efficiency."""
if not indices:
return False
# Filter to only valid indices
valid_indices = {idx for idx in indices if idx < self.size.reacs}
if not valid_indices:
return False
# Compute indices to keep
keep_indices = [i for i in range(self.size.reacs) if i not in valid_indices]
if not keep_indices:
self.size.reacs = 0
return True
# Use batch column removal
self._batch_remove_columns(keep_indices)
return True
def _batch_remove_columns(self, keep_indices: list) -> None:
"""Batch remove columns by keeping only specified indices."""
LOG.debug(f"_batch_remove_columns: keeping {len(keep_indices)} of {self.size.reacs}")
LOG.debug(f" Before: cmp={self.cmp.get_column_count()}, post={self.post.get_column_count()}")
# Remove columns from post and cmp matrices
self.post.remove_columns(keep_indices)
self.cmp.remove_columns(keep_indices)
LOG.debug(f" After: cmp={self.cmp.get_column_count()}, post={self.post.get_column_count()}")
# Reindex names and reversibility
new_names = [self.reac_names[i] for i in keep_indices]
new_rev = [self.reversible[i] for i in keep_indices]
# Update arrays (preserving total length for consistency)
orig_len = len(self.reac_names)
for i, (name, rev) in enumerate(zip(new_names, new_rev)):
self.reac_names[i] = name
self.reversible[i] = rev
self.size.reacs = len(keep_indices)
LOG.debug(f" size.reacs={self.size.reacs}")
def _batch_remove_rows(self, keep_indices: list) -> None:
"""Batch remove rows (metabolites) by keeping only specified indices."""
LOG.debug(f"_batch_remove_rows: keeping {len(keep_indices)} of {self.size.metas}")
# Remove rows from pre and cmp matrices
self.pre.remove_rows(keep_indices)
self.cmp.remove_rows(keep_indices)
# Reindex metabolite names
new_names = [self.meta_names[i] for i in keep_indices]
for i, name in enumerate(new_names):
self.meta_names[i] = name
self.size.metas = len(keep_indices)
LOG.debug(f" size.metas={self.size.metas}")
def remove_metabolites_by_indices(self, indices: Set[int]) -> bool:
"""Remove metabolites by index - uses batch removal for efficiency."""
if not indices:
return False
# Filter to only valid indices
valid_indices = {idx for idx in indices if idx < self.size.metas}
if not valid_indices:
return False
# Compute indices to keep
keep_indices = [i for i in range(self.size.metas) if i not in valid_indices]
if not keep_indices:
self.size.metas = 0
return True
# Use batch row removal
self._batch_remove_rows(keep_indices)
return True
def remove_unused_metabolites(self) -> bool:
"""Remove metabolites with all-zero rows - uses batch removal.
Uses CSR indptr for O(m) zero-row detection (avoids element-by-element
sparse indexing which is O(m*r) with large per-call Python overhead).
"""
mc, rc = self.size.metas, self.size.reacs
# Slice active submatrix and convert to CSR for indptr access.
# tocsr() is a no-op if already CSR (returns self), so this is safe
# regardless of current format.
num_csr = self.cmp._num_sparse[:mc, :rc].tocsr()
zero_rows = np.where(np.diff(num_csr.indptr) == 0)[0]
unused_indices = set(zero_rows.tolist())
for _ in unused_indices:
self.stats.inc_unused_metabolite()
if unused_indices:
self.remove_metabolites_by_indices(unused_indices)
return True
return False
def get_truncated(self) -> CompressionRecord:
"""Create final CompressionRecord with truncated matrices."""
mc, rc = self.size.metas, self.size.reacs
# Use stored original dimensions (batch removal changes matrix sizes)
m = self._orig_metas
r = self._orig_reacs
# Create truncated matrices using efficient submatrix extraction
# pre: mc compressed metabolites × m original metabolites
# cmp: mc compressed metabolites × rc compressed reactions
# post: r original reactions × rc compressed reactions
pre_trunc = self.pre.submatrix(mc, m)
cmp_trunc = self.cmp.submatrix(mc, rc)
post_trunc = self.post.submatrix(r, rc)
rev_trunc = self.reversible[:rc]
meta_names_trunc = self.meta_names[:mc]
return CompressionRecord(pre_trunc, cmp_trunc, post_trunc, rev_trunc, meta_names_trunc, self.stats)
# =============================================================================
# Core Algorithm
# =============================================================================
[docs]
class StoichMatrixCompressor:
"""Nullspace-based metabolic network compression."""
def __init__(self, *methods: CompressionMethod):
self._methods = list(methods) if methods else CompressionMethod.standard()
[docs]
def compress(self,
stoich: RationalMatrix,
reversible: List[bool],
meta_names: List[str],
reac_names: List[str],
suppressed: Optional[Set[str]] = None) -> CompressionRecord:
"""Compress network, return transformation matrices.
Uses Java efmtool's two-phase approach:
1. Phase 1: Remove zero-flux and contradicting reactions (iteratively)
2. Phase 2: Combine coupled reactions (iteratively)
This separation prevents cascading effects where combining reactions
could create new coupling patterns that weren't present in the original.
"""
work = _WorkRecord(stoich, reversible, meta_names, reac_names)
do_nullspace = CompressionMethod.NULLSPACE in self._methods
do_recursive = CompressionMethod.RECURSIVE in self._methods
work.remove_reactions(suppressed)
if do_nullspace:
# Phase 1: Remove zero-flux and contradicting reactions only
# (matches Java's inclCompression=false phase)
# Optimization: Continue only if nullspace compression found reactions.
# Removing unused metabolites (all-zero rows) doesn't change the nullspace,
# so if _nullspace_compress returns False, the next iteration would too.
while True:
work.stats.inc_compression_iteration()
work.remove_unused_metabolites()
found = self._nullspace_compress(work, incl_compression=False)
if not (found and do_recursive):
break
# Phase 2: Combine coupled reactions
# (matches Java's inclCompression=true phase)
while True:
work.stats.inc_compression_iteration()
work.remove_unused_metabolites()
found = self._nullspace_compress(work, incl_compression=True)
if not (found and do_recursive):
break
work.remove_unused_metabolites()
work.stats.write_to_log()
return work.get_truncated()
def _nullspace_compress(self, work: _WorkRecord, incl_compression: bool = True) -> bool:
"""One pass of nullspace compression. Returns True if reactions removed.
Args:
incl_compression: If False, only remove zero-flux and contradicting.
If True, only combine coupled reactions.
"""
# Build active submatrix for nullspace computation
active = work.cmp.submatrix(work.size.metas, work.size.reacs)
kernel = nullspace(active)
LOG.debug(
f"Nullspace: {active.get_row_count()}x{active.get_column_count()} -> kernel {kernel.get_row_count()}x{kernel.get_column_count()}"
)
# Get kernel pattern (CSR for structure) and values (Fractions for exact ratios)
kernel_pattern, kernel_values = kernel.to_sparse_pattern()
changed = False
if not incl_compression:
# Phase 1: Remove zero-flux and contradicting
# IMPORTANT: Find both sets BEFORE any removal to avoid index mismatch!
zero_flux = self._find_zero_flux(work, kernel_pattern)
contradicting = self._find_contradicting(work, kernel_pattern, kernel_values)
# Remove all at once
all_to_remove = zero_flux | contradicting
LOG.debug(f"Phase 1: {len(zero_flux)} zero-flux + {len(contradicting)} contradicting = {len(all_to_remove)} to remove")
if all_to_remove:
work.remove_reactions_by_indices(all_to_remove)
changed = True
else:
# Phase 2: Combine coupled reactions
changed |= self._handle_coupled_combine(work, kernel_pattern, kernel_values)
if changed:
work.remove_unused_metabolites()
return changed
def _find_zero_flux(self, work: _WorkRecord, kernel_sparse) -> set:
"""Find reactions with all-zero kernel rows (indices in current work)."""
zero_flux = set()
for reac in range(work.size.reacs):
if kernel_sparse.indptr[reac] == kernel_sparse.indptr[reac + 1]:
zero_flux.add(reac)
work.stats.inc_zero_flux_reactions()
return zero_flux
def _find_coupled_groups(self, kernel_pattern, kernel_values, num_reacs):
"""Find groups of coupled reactions from kernel sparsity pattern.
Args:
kernel_pattern: CSR matrix with 1s at non-zero positions (for indptr/indices)
kernel_values: {row: {col: Fraction}} for actual values
Returns (groups, ratios) where:
- groups: list of lists, each containing indices of coupled reactions
- ratios: list where ratios[i] is the coupling ratio for reaction i
"""
# Group reactions by zero-pattern in kernel
patterns = {}
for reac in range(num_reacs):
start = kernel_pattern.indptr[reac]
end = kernel_pattern.indptr[reac + 1]
pattern = tuple(kernel_pattern.indices[start:end])
patterns.setdefault(pattern, []).append(reac)
potential_groups = [idxs for idxs in patterns.values() if len(idxs) > 1]
groups = []
ratios = [None] * num_reacs
for potential in potential_groups:
ref_reac = potential[0]
ref_start = kernel_pattern.indptr[ref_reac]
ref_end = kernel_pattern.indptr[ref_reac + 1]
nonzero_cols = list(kernel_pattern.indices[ref_start:ref_end])
if not nonzero_cols:
continue
for i, reac_a in enumerate(potential):
if ratios[reac_a] is not None:
continue
group = None
# Get values for reaction a
a_row = kernel_values.get(reac_a, {})
for j in range(i + 1, len(potential)):
reac_b = potential[j]
if ratios[reac_b] is not None:
continue
# Get values for reaction b
b_row = kernel_values.get(reac_b, {})
# Compute ratio from first non-zero column
first_col = nonzero_cols[0]
a_val = a_row.get(first_col, Fraction(0))
b_val = b_row.get(first_col, Fraction(0))
if b_val == 0:
continue
# ratio = a_val / b_val
ratio = a_val / b_val
# Check if ratio is consistent across all columns
is_consistent = True
for col in nonzero_cols[1:]:
a_v = a_row.get(col, Fraction(0))
b_v = b_row.get(col, Fraction(0))
if b_v == 0 or a_v / b_v != ratio:
is_consistent = False
break
if is_consistent:
ratios[reac_b] = ratio
if group is None:
group = [reac_a]
group.append(reac_b)
if group is not None:
groups.append(group)
return groups, ratios
def _find_contradicting(self, work: _WorkRecord, kernel_pattern, kernel_values) -> set:
"""Find reactions in contradicting coupled groups (indices in current work)."""
groups, ratios = self._find_coupled_groups(kernel_pattern, kernel_values, work.size.reacs)
# Find contradicting reactions
contradicting = set()
for group in groups:
is_consistent = self._check_coupling_consistency(group, ratios, work.reversible)
if not is_consistent:
for idx in group:
contradicting.add(idx)
work.stats.inc_contradicting_reactions()
return contradicting
def _handle_coupled_combine(self, work: _WorkRecord, kernel_pattern, kernel_values) -> bool:
"""Phase 2: Combine consistent coupled reactions, remove contradicting.
Returns True if any contradicting reactions were removed (flux space changed),
which means new couplings might be revealed. Returns False if only merging
happened, since merging doesn't change the flux space.
"""
groups, ratios = self._find_coupled_groups(kernel_pattern, kernel_values, work.size.reacs)
# Combine consistent groups, remove contradicting groups
reactions_to_remove = set()
contradicting_removed = False
# Enter batch edit mode for cmp and post matrices to avoid repeated LIL conversions
work.cmp.begin_batch_edit()
work.post.begin_batch_edit()
for group in groups:
is_consistent = self._check_coupling_consistency(group, ratios, work.reversible)
if is_consistent:
self._combine_coupled(work, group, ratios)
for idx in group[1:]:
reactions_to_remove.add(idx)
else:
# Contradicting coupled group: reactions cannot carry flux together
# due to irreversibility constraints. Remove them to prune blocked paths,
# which may reveal new couplings in subsequent iterations.
LOG.debug(f"Removing contradicting coupled group: {[work.reac_names[r] for r in group]}")
for idx in group:
reactions_to_remove.add(idx)
work.stats.inc_contradicting_reactions()
contradicting_removed = True
# End batch edit mode
work.cmp.end_batch_edit()
work.post.end_batch_edit()
LOG.debug(f"Phase 2: {len(groups)} groups, removing {len(reactions_to_remove)} reactions, contradicting={contradicting_removed}")
work.remove_reactions_by_indices(reactions_to_remove)
# Only return True if contradicting reactions were removed (flux space changed).
# Merging alone doesn't change the flux space, so no new couplings can emerge.
return contradicting_removed
def _check_coupling_consistency(self, group: List[int], ratios: List[Optional[Fraction]], reversible: List[bool]) -> bool:
"""Check if coupled group is consistent with reversibility."""
for forward in [True, False]:
all_consistent = forward or reversible[group[0]]
for idx in group[1:]:
ratio = ratios[idx]
ratio_positive = ratio.numerator * ratio.denominator > 0
if not ((forward == ratio_positive) or reversible[idx]):
all_consistent = False
break
if all_consistent:
return True
return False
def _combine_coupled(self, work: _WorkRecord, group: List[int], ratios: List[Optional[Fraction]]) -> None:
"""Combine coupled reactions into master reaction.
Uses batch column operations with native fmpq arithmetic for performance.
"""
master = group[0]
LOG.debug(f"Combining coupled: {[work.reac_names[r] for r in group]}")
for slave in group[1:]:
ratio = ratios[slave]
# multiplier = 1/ratio = ratio.denominator / ratio.numerator
mult_num, mult_den = ratio.denominator, ratio.numerator
# Update stoichiometric matrix: cmp[:, master] += cmp[:, slave] * mult
work.cmp.add_scaled_column(master, slave, mult_num, mult_den)
# Update post matrix: post[:, master] += post[:, slave] * mult
work.post.add_scaled_column(master, slave, mult_num, mult_den)
work.stats.inc_coupled_reactions_count(len(group))
# =============================================================================
# COBRA Interface
# =============================================================================
[docs]
class CompressionResult:
"""Result of COBRA model compression."""
def __init__(self, compressed_model, compression_converter, pre_matrix, post_matrix, reaction_map, metabolite_map, statistics,
methods_used, original_reaction_names, original_metabolite_names, flipped_reactions):
self.compressed_model = compressed_model
self.compression_converter = compression_converter
self.pre_matrix = pre_matrix
self.post_matrix = post_matrix
self.reaction_map = reaction_map
self.metabolite_map = metabolite_map
self.statistics = statistics
self.methods_used = methods_used
self.original_reaction_names = original_reaction_names
self.original_metabolite_names = original_metabolite_names
self.flipped_reactions = flipped_reactions
@property
def compression_ratio(self) -> float:
return len(self.compressed_model.reactions) / len(self.original_reaction_names)
@property
def reactions_removed(self) -> int:
return len(self.original_reaction_names) - len(self.compressed_model.reactions)
def summary(self) -> str:
return f"Compressed {len(self.original_reaction_names)} -> {len(self.compressed_model.reactions)} reactions ({self.compression_ratio:.1%})"
[docs]
class CompressionConverter:
"""Bidirectional transformer for expressions between original and compressed spaces."""
def __init__(self, reaction_map: Dict[str, Dict[str, Union[float, Fraction]]],
metabolite_map: Dict[str, Dict[str, Union[float, Fraction]]], flipped_reactions: List[str]):
self.reaction_map = reaction_map
self.metabolite_map = metabolite_map
self.flipped_reactions = set(flipped_reactions)
[docs]
def expand_expression(self, expression: Dict[str, float], remove_missing: bool = False) -> Dict[str, float]:
"""Transform expression from compressed back to original space."""
expanded = {}
for comp_rxn, comp_coeff in expression.items():
if comp_rxn in self.reaction_map:
for orig_rxn, scale in self.reaction_map[comp_rxn].items():
coeff = comp_coeff * float(scale)
if orig_rxn in self.flipped_reactions:
coeff = -coeff
expanded[orig_rxn] = expanded.get(orig_rxn, 0) + coeff
elif not remove_missing:
LOG.warning(f"Compressed reaction {comp_rxn} not found")
return expanded
[docs]
def remove_conservation_relations(model) -> None:
"""Remove conservation relations (dependent metabolites) from a model.
This reduces the number of metabolites while maintaining the original flux space.
Uses exact rational arithmetic to find linearly independent rows.
Args:
model: COBRA model to modify in-place
"""
# Build the transposed stoichiometric matrix (reactions × metabolites) directly
# from cobra reaction coefficients — avoids a sparse→dense→sparse round-trip that
# would iterate over all m×r elements including zeros.
num_rxns = len(model.reactions)
num_mets = len(model.metabolites)
met_index = {m.id: i for i, m in enumerate(model.metabolites)}
row_idx, col_idx, num_data, den_data = [], [], [], []
for j, rxn in enumerate(model.reactions):
for met, coeff in rxn._metabolites.items():
if coeff == 0:
continue
i = met_index[met.id]
if isinstance(coeff, Fraction):
frac = coeff
elif hasattr(coeff, 'numerator'):
frac = Fraction(coeff.numerator, coeff.denominator)
else:
frac = float_to_rational(float(coeff))
row_idx.append(j) # reaction → row (transposed layout)
col_idx.append(i) # metabolite → column
num_data.append(frac.numerator)
den_data.append(frac.denominator)
from scipy.sparse import csr_matrix as _csr
num_sparse = _csr((num_data, (row_idx, col_idx)), shape=(num_rxns, num_mets), dtype=np.int64)
den_sparse = _csr((den_data, (row_idx, col_idx)), shape=(num_rxns, num_mets), dtype=np.int64)
rm = RationalMatrix._from_sparse(num_sparse, den_sparse)
basic_mets = basic_columns(rm)
dependent_mets = [model.metabolites[i].id for i in set(range(num_mets)) - set(basic_mets)]
for m_id in dependent_mets:
model.metabolites.get_by_id(m_id).remove_from_model()
[docs]
def compress_cobra_model(model,
methods: Optional[List[Union[str, CompressionMethod]]] = None,
in_place: bool = True,
suppressed_reactions: Optional[Set[str]] = None) -> CompressionResult:
"""
Compress a COBRA model using nullspace-based coupling detection.
Note: Model should be preprocessed first (rational coefficients,
conservation relations removed). Use networktools.compress_model()
for automatic preprocessing.
Args:
model: COBRA model to compress (should be preprocessed)
methods: Compression methods. Default: CompressionMethod.standard()
in_place: Modify original model (True) or work on copy (False)
suppressed_reactions: Reaction names to exclude from compression
Returns:
CompressionResult with compressed model and transformation data
"""
if not in_place:
model = copy.deepcopy(model)
original_reaction_names = [r.id for r in model.reactions]
original_metabolite_names = [m.id for m in model.metabolites]
# Parse methods
if methods is None:
compression_methods = CompressionMethod.standard()
else:
compression_methods = []
for method in methods:
if isinstance(method, CompressionMethod):
compression_methods.append(method)
elif isinstance(method, str):
compression_methods.append(CompressionMethod[method.upper()])
# Flip reactions that can only run backwards
flipped_reactions = []
for rxn in model.reactions:
if rxn.upper_bound <= 0:
rxn *= -1
flipped_reactions.append(rxn.id)
# Build stoichiometric matrix with exact arithmetic
stoich_matrix = RationalMatrix.from_cobra_model(model)
reversible = [rxn.reversibility for rxn in model.reactions]
metabolite_names = [m.id for m in model.metabolites]
reaction_names = [r.id for r in model.reactions]
# Run compression
compressor = StoichMatrixCompressor(*compression_methods)
compression_record = compressor.compress(stoich_matrix, reversible, metabolite_names, reaction_names, suppressed_reactions)
# Apply to model (uses direct manipulation, bypasses solver)
reaction_map, objective_updates = _apply_compression_to_model(model, compression_record, reaction_names)
# Rebuild solver after all direct modifications
_rebuild_solver(model, objective_updates)
pre_matrix = compression_record.pre.to_numpy()
post_matrix = compression_record.post.to_numpy()
converter = CompressionConverter(reaction_map, {}, flipped_reactions)
return CompressionResult(compressed_model=model,
compression_converter=converter,
pre_matrix=pre_matrix,
post_matrix=post_matrix,
reaction_map=reaction_map,
metabolite_map={},
statistics=compression_record.stats,
methods_used=compression_methods,
original_reaction_names=original_reaction_names,
original_metabolite_names=original_metabolite_names,
flipped_reactions=flipped_reactions)
def _rebuild_solver(model, objective_updates: dict) -> None:
"""Rebuild solver after direct model modifications.
This creates a fresh solver and populates it with the current model state.
Must be called after using direct attribute manipulation that bypasses
solver updates (e.g., _metabolites, _lower_bound, _upper_bound).
Args:
model: COBRA model with stale solver state
objective_updates: dict mapping reactions to their objective coefficients
"""
# Get solver interface type
solver_interface = model.solver.interface
# Create fresh solver
new_solver = solver_interface.Model()
model._solver = new_solver
# Populate solver with current model state
model._populate_solver(model.reactions, model.metabolites)
# Set objective coefficients (must be done after solver is populated)
for rxn, obj_coeff in objective_updates.items():
if rxn in model.reactions:
rxn.objective_coefficient = obj_coeff
def _apply_compression_to_model(model, compression_record, original_reaction_names):
"""Apply compression results to COBRA model using exact Fraction arithmetic.
Sets stoichiometric coefficients and bounds as Fractions directly from
the compressed matrices, avoiding floating-point arithmetic.
Uses direct attribute manipulation to bypass solver updates during modification.
Solver must be rebuilt after calling this function.
"""
post = compression_record.post
cmp = compression_record.cmp
meta_names = compression_record.meta_names
num_original = post.get_row_count()
num_compressed = post.get_column_count()
num_metas = cmp.get_row_count()
# Track which original reactions to keep (main reactions)
keep_rxns = [False] * num_original
reaction_map = {}
# Build metabolite lookup for compressed metabolites
met_lookup = {name: model.metabolites.get_by_id(name) for name in meta_names}
# Track objective coefficients to set at end (bypass solver updates)
objective_updates = {}
for j in range(num_compressed):
# Find contributing original reactions from POST matrix (sparse iteration)
contributing = list(post.iter_column_fractions(j))
if not contributing:
continue
# Select "main" reaction: prefer one with non-zero objective coefficient
main_idx = contributing[0][0]
for idx, _ in contributing:
if model.reactions[idx].objective_coefficient != 0:
main_idx = idx
break
main_rxn = model.reactions[main_idx]
keep_rxns[main_idx] = True
# Scale objective coefficient by POST factors (store for later)
combined_obj = Fraction(0)
for idx, coeff in contributing:
obj_coeff = model.reactions[idx].objective_coefficient
if obj_coeff != 0:
combined_obj += Fraction(obj_coeff).limit_denominator(10**12) * coeff
objective_updates[main_rxn] = float(combined_obj)
# Store subset info
main_rxn.subset_rxns = [idx for idx, _ in contributing]
main_rxn.subset_stoich = [coeff for _, coeff in contributing]
# Build combined reaction name from contributing reactions
for idx, _ in contributing[1:]:
rxn = model.reactions[idx]
if len(main_rxn.id) + len(rxn.id) < 220 and not main_rxn.id.endswith('...'):
main_rxn.id += '*' + rxn.id
elif not main_rxn.id.endswith('...'):
main_rxn.id += '...'
# Build new metabolites dict from cmp matrix (sparse iteration)
new_metabolites = {}
for m_idx, frac in cmp.iter_column_fractions(j):
met = met_lookup[meta_names[m_idx]]
new_metabolites[met] = frac
# OPTIMIZATION: Direct _metabolites assignment (bypasses solver updates)
# Clear old metabolite back-references
for met in list(main_rxn._metabolites.keys()):
if main_rxn in met._reaction:
met._reaction.discard(main_rxn)
# Set new metabolites directly
main_rxn._metabolites = new_metabolites
# Update new metabolite back-references
for met in new_metabolites:
met._reaction.add(main_rxn)
# Compute bounds as Fractions from original reactions scaled by POST factors
lb_candidates = []
ub_candidates = []
for idx, coeff in contributing:
rxn = model.reactions[idx]
lb, ub = rxn.lower_bound, rxn.upper_bound
# v_original = coeff * v_compressed (from POST interpretation)
# constraint: lb <= v_original <= ub
# so: lb <= coeff * v_compressed <= ub
# if coeff > 0: lb/coeff <= v_compressed <= ub/coeff
# if coeff < 0: ub/coeff <= v_compressed <= lb/coeff (inequality flips)
if coeff > 0:
if lb != -float('inf'):
lb_candidates.append(Fraction(lb) / coeff if lb != 0 else Fraction(0))
if ub != float('inf'):
ub_candidates.append(Fraction(ub) / coeff if ub != 0 else Fraction(0))
else: # coeff < 0
if ub != float('inf'):
lb_candidates.append(Fraction(ub) / coeff if ub != 0 else Fraction(0))
if lb != -float('inf'):
ub_candidates.append(Fraction(lb) / coeff if lb != 0 else Fraction(0))
# OPTIMIZATION: Direct bounds assignment (bypasses solver updates)
main_rxn._lower_bound = float(max(lb_candidates)) if lb_candidates else -float('inf')
main_rxn._upper_bound = float(min(ub_candidates)) if ub_candidates else float('inf')
# Build reaction map
reaction_map[main_rxn.id] = {original_reaction_names[idx]: coeff for idx, coeff in contributing}
# OPTIMIZATION: Batch reaction and metabolite removal
# Use reaction objects directly (not IDs, since IDs are modified during compression)
from cobra import DictList
# Build set of reaction objects to keep
keep_rxn_objs = {model.reactions[i] for i in range(num_original) if keep_rxns[i]}
# Find metabolites that will be in kept reactions
mets_in_kept_rxns = set()
for rxn in keep_rxn_objs:
mets_in_kept_rxns.update(rxn._metabolites.keys())
# Build new reactions list (only kept reactions)
new_reactions = DictList()
for rxn in model.reactions:
if rxn in keep_rxn_objs:
new_reactions.append(rxn)
else:
# Clear metabolite back-references for removed reaction
for met in list(rxn._metabolites.keys()):
if rxn in met._reaction:
met._reaction.discard(rxn)
rxn._model = None
# Build new metabolites list (only those in kept reactions)
new_metabolites = DictList()
for met in model.metabolites:
if met in mets_in_kept_rxns:
new_metabolites.append(met)
else:
met._model = None
# Replace model lists (using __dict__ to bypass any property setters)
model.__dict__['reactions'] = new_reactions
model.__dict__['metabolites'] = new_metabolites
# Update model references
for rxn in model.reactions:
rxn._model = model
for met in model.metabolites:
met._model = model
return reaction_map, objective_updates
# =============================================================================
# Preprocessing Functions
# =============================================================================
[docs]
def remove_blocked_reactions(model) -> List:
"""Remove blocked reactions (bounds == (0, 0)) from a network."""
blocked_reactions = [reac for reac in model.reactions if reac.bounds == (0, 0)]
model.remove_reactions(blocked_reactions)
return blocked_reactions
[docs]
def remove_ext_mets(model) -> None:
"""Remove external metabolites from 'External_Species' compartment."""
external_mets = [m for m in model.metabolites if m.compartment == 'External_Species']
model.remove_metabolites(external_mets)
stoich_mat = create_stoichiometric_matrix(model)
obsolete_reacs = [r for r, has_nonzero in zip(model.reactions, np.any(stoich_mat, 0)) if not has_nonzero]
model.remove_reactions(obsolete_reacs)
[docs]
def remove_dummy_bounds(model) -> None:
"""Replace COBRA standard bounds with +/-inf."""
cobra_conf = Configuration()
bound_thres = max(abs(cobra_conf.lower_bound), abs(cobra_conf.upper_bound))
if any(any(abs(b) >= bound_thres for b in r.bounds) for r in model.reactions):
LOG.warning(f'Removing reaction bounds >= {round(bound_thres)}.')
for rxn in model.reactions:
if rxn.lower_bound <= -bound_thres:
rxn.lower_bound = -np.inf
if rxn.upper_bound >= bound_thres:
rxn.upper_bound = np.inf
[docs]
def stoichmat_coeff2rational(model) -> None:
"""Convert stoichiometric coefficients to rational numbers."""
for rxn in model.reactions:
for met, coeff in rxn._metabolites.items():
if isinstance(coeff, (float, int)):
rxn._metabolites[met] = float_to_rational(coeff)
elif not hasattr(coeff, 'p'): # Not sympy.Rational
if hasattr(coeff, 'numerator'): # fractions.Fraction
rxn._metabolites[met] = Rational(coeff.numerator, coeff.denominator)
else:
raise TypeError(f"Unsupported coefficient type: {type(coeff)}")
[docs]
def stoichmat_coeff2float(model) -> None:
"""Convert stoichiometric coefficients to floats."""
for rxn in model.reactions:
for met, coeff in rxn._metabolites.items():
rxn._metabolites[met] = float(coeff)
# =============================================================================
# High-Level Compression API
# =============================================================================
[docs]
def compress_model(model, no_par_compress_reacs=set(), compression_backend='sparse_rref'):
"""Compress a metabolic model using multiple techniques.
Performs blocked reaction removal, conservation relation removal, and
alternating dependent/parallel reaction lumping until no further
compression is possible.
Args:
model: COBRA model to compress in-place
no_par_compress_reacs: Reactions exempt from parallel compression
compression_backend: Compression backend to use:
- 'sparse_rref' (default): Pure Python sparse integer RREF.
No external dependencies beyond NumPy/SciPy.
- 'efmtool_rref' (legacy): Java-based EFMTool via JPype.
Requires a JVM and the jpype1 package.
Returns:
list of dict: Compression maps for reversing each compression step
"""
# Suppress optlang LP coefficient updates during compression.
#
# compress_model_coupled calls _rebuild_solver after each iteration,
# which calls _populate_solver -> constraint.set_linear_coefficients
# for every metabolite. On Gurobi this involves _get_expression (a
# full LP-row read) and dominates runtime for large/GPR-extended models.
#
# Fix: patch set_linear_coefficients to a no-op for the duration of
# compression so that intermediate rebuilds only maintain LP structure
# (variables, constraints, bounds) without re-writing coefficients.
# One final _populate_solver call at the end sets the correct
# stoichiometric coefficients from the compressed model state.
#
# Guards: all accesses are hasattr-checked so that cobra/optlang API
# changes simply disable the optimisation without breaking compression.
_slc_cls = None # the Constraint class we patched
_slc_orig = None # the original set_linear_coefficients method
try:
if hasattr(model, 'problem') and hasattr(model.problem, 'Constraint'):
_slc_cls = model.problem.Constraint
if hasattr(_slc_cls, 'set_linear_coefficients'):
_slc_orig = _slc_cls.set_linear_coefficients
_slc_cls.set_linear_coefficients = lambda self, *a, **kw: None
except Exception:
_slc_cls = _slc_orig = None
cmp_mapReac = []
try:
use_java = (compression_backend == 'efmtool_rref')
LOG.info(' Removing blocked reactions.')
remove_blocked_reactions(model)
LOG.info(' Converting coefficients to rationals.')
stoichmat_coeff2rational(model)
LOG.info(' Removing conservation relations.')
if use_java:
_remove_conservation_relations_java(model)
else:
remove_conservation_relations(model)
parallel = False
run = 1
numr = len(model.reactions)
while True:
if not parallel:
LOG.info(f' Compression {run}: Lumping coupled reactions.')
reac_map_exp = compress_model_coupled(model, compression_backend)
for new_reac, old_reac_val in reac_map_exp.items():
old_reacs_no_compress = [r for r in no_par_compress_reacs if r in old_reac_val]
if old_reacs_no_compress:
for r in old_reacs_no_compress:
no_par_compress_reacs.remove(r)
no_par_compress_reacs.add(new_reac)
else:
LOG.info(f' Compression {run}: Lumping parallel reactions.')
reac_map_exp = compress_model_parallel(model, no_par_compress_reacs)
if use_java:
_remove_conservation_relations_java(model)
else:
remove_conservation_relations(model)
if numr > len(reac_map_exp):
LOG.info(f' Reduced to {len(reac_map_exp)} reactions.')
cmp_mapReac.append({
"reac_map_exp": reac_map_exp,
"parallel": parallel,
})
parallel = not parallel
run += 1
numr = len(reac_map_exp)
else:
LOG.info(f' No further reduction ({numr} reactions).')
LOG.info(f' Compression complete ({run - 1} iterations).')
break
stoichmat_coeff2float(model)
finally:
# Step 1: Always restore set_linear_coefficients, even if compression raises.
if _slc_orig is not None and _slc_cls is not None:
_slc_cls.set_linear_coefficients = _slc_orig
# Step 2: Single final LP rebuild with correct stoichiometric coefficients.
# _populate_solver cannot be called on the existing solver (it would try to
# re-add constraints already in the pending queue -> ContainerAlreadyContains).
# _rebuild_solver creates a fresh optlang model first, then populates it.
# We must collect the objective from the current (still valid) LP beforehand.
if _slc_orig is not None:
try:
obj_updates = {}
for rxn in model.reactions:
try:
c = rxn.objective_coefficient
if c != 0:
obj_updates[rxn] = c
except Exception:
pass
_rebuild_solver(model, obj_updates)
except Exception:
pass # Structural compression succeeded; LP may need manual resync.
return cmp_mapReac
def _remove_conservation_relations_java(model) -> None:
"""Remove conservation relations using Java efmtool."""
from . import efmtool_cmp_interface as efm
stoich_mat = create_stoichiometric_matrix(model, array_type='lil')
basic_mets = efm.basic_columns_rat_java(stoich_mat.transpose().toarray(), tolerance=0)
dependent_mets = [model.metabolites[i].id for i in set(range(len(model.metabolites))) - set(basic_mets)]
for m_id in dependent_mets:
model.metabolites.get_by_id(m_id).remove_from_model()
[docs]
def compress_model_coupled(model, compression_backend='sparse_rref'):
"""Compress by lumping stoichiometrically coupled (dependent) reactions.
Identifies groups of reactions whose flux vectors are proportional in every
steady state (i.e. they share a common nullspace direction) and merges each
group into a single lumped reaction. Both the pure-Python and legacy Java
backends perform this operation; the compression_backend controls the nullspace algorithm.
Args:
model: COBRA model to compress in-place
compression_backend: 'sparse_rref' (default, Python) or 'efmtool_rref' (Java legacy)
Returns:
dict: Mapping {compressed_id: {orig_id: factor, ...}}
"""
if compression_backend == 'efmtool_rref':
from .efmtool_cmp_interface import compress_model_java
return compress_model_java(model)
# Clear gene rules to match Java behavior
for r in model.reactions:
r.gene_reaction_rule = ''
result = compress_cobra_model(model, methods=CompressionMethod.standard(), in_place=True)
# Account for flipped reactions
flipped = set(result.flipped_reactions)
return {
cmp_id: {
orig_id: c * (-1 if orig_id in flipped else 1) for orig_id, c in orig_map.items()
} for cmp_id, orig_map in result.reaction_map.items()
}
# Backward-compatibility alias (old name referenced efmtool, but the function
# is backend-agnostic — the new name compress_model_coupled is preferred).
compress_model_efmtool = compress_model_coupled
[docs]
def compress_model_parallel(model, protected_rxns=set()):
"""Compress by lumping parallel reactions.
Args:
model: COBRA model to compress in-place
protected_rxns: Reactions exempt from parallel compression
Returns:
dict: Mapping {compressed_id: {orig_id: factor, ...}}
"""
old_num_reac = len(model.reactions)
old_objective = [r.objective_coefficient for r in model.reactions]
old_reac_ids = [r.id for r in model.reactions]
stoichmat_T = create_stoichiometric_matrix(model, 'lil').transpose()
factor = [d[0] if d else 1.0 for d in stoichmat_T.data]
A = sparse.diags(factor) @ stoichmat_T
lb = [float(r.lower_bound) for r in model.reactions]
ub = [float(r.upper_bound) for r in model.reactions]
fwd = sparse.lil_matrix([1. if (isinf(u) and f > 0 or isinf(l) and f < 0) else 0. for f, l, u in zip(factor, lb, ub)]).transpose()
rev = sparse.lil_matrix([1. if (isinf(l) and f > 0 or isinf(u) and f < 0) else 0. for f, l, u in zip(factor, lb, ub)]).transpose()
inh = sparse.lil_matrix([
i + 1 if not ((isinf(ub[i]) or ub[i] == 0) and (isinf(lb[i]) or lb[i] == 0)) else 0 for i in range(len(model.reactions))
]).transpose()
A = sparse.hstack((A, fwd, rev, inh), 'csr')
# Find parallel reactions via hash comparison
subset_list = []
prev_found = []
protected = [r.id in protected_rxns for r in model.reactions]
hashes = [hash((tuple(A[i].indices), tuple(A[i].data))) for i in range(A.shape[0])]
for i in range(A.shape[0]):
if i in prev_found:
continue
if protected[i]:
subset_list.append([i])
continue
subset_i = [i]
for j in range(i + 1, A.shape[0]):
if not protected[j] and j not in prev_found and hashes[i] == hashes[j]:
subset_i.append(j)
prev_found.append(j)
subset_list.append(subset_i)
# Lump parallel reactions
del_rxns = [False] * len(model.reactions)
for rxn_idx in subset_list:
for i in range(1, len(rxn_idx)):
main_rxn = model.reactions[rxn_idx[0]]
other_rxn = model.reactions[rxn_idx[i]]
if len(main_rxn.id) + len(other_rxn.id) < 220 and main_rxn.id[-3:] != '...':
main_rxn.id += '*' + other_rxn.id
elif main_rxn.id[-3:] != '...':
main_rxn.id += '...'
del_rxns[rxn_idx[i]] = True
del_indices = np.where(del_rxns)[0]
for i in reversed(del_indices):
model.reactions[i].remove_from_model(remove_orphans=True)
# Build compression map
rational_map = {}
subT = np.zeros((old_num_reac, len(model.reactions)))
for i in range(subT.shape[1]):
for j in subset_list[i]:
subT[j, i] = 1
rational_map[model.reactions[i].id] = {old_reac_ids[j]: Rational(1) for j in subset_list[i]}
# Update objective
new_objective = old_objective @ subT
for r, c in zip(model.reactions, new_objective):
r.objective_coefficient = c
return rational_map
# =============================================================================
# Exports
# =============================================================================
__all__ = [
# Rational matrix and utilities
'RationalMatrix',
'float_to_rational',
'detect_max_precision',
'nullspace',
'basic_columns',
'basic_columns_from_numpy',
# Core compression
'compress_cobra_model',
'CompressionResult',
'CompressionRecord',
'CompressionStatistics',
'CompressionMethod',
'CompressionConverter',
'StoichMatrixCompressor',
# High-level API
'compress_model',
'compress_model_coupled',
'compress_model_efmtool', # backward-compat alias
'compress_model_parallel',
# Preprocessing
'remove_blocked_reactions',
'remove_ext_mets',
'remove_conservation_relations',
'remove_dummy_bounds',
'stoichmat_coeff2rational',
'stoichmat_coeff2float',
]