Source code for straindesign.compression

"""
Metabolic network compression using nullspace-based coupling detection.

This module provides network compression for COBRA models. The main function
is compress_cobra_model() which compresses a model and returns transformation
matrices for converting between original and compressed flux spaces.

API:
    >>> from straindesign.compression import compress_cobra_model
    >>> result = compress_cobra_model(model)
    >>> compressed_model = result.compressed_model

Note: The model should be preprocessed first (rational coefficients,
conservation relations removed). Use networktools.compress_model() for
automatic preprocessing.
"""

import ast
import copy
import logging
import numpy as np
from enum import Enum
from functools import reduce
from math import gcd, lcm, isinf
from typing import Dict, Iterator, List, Optional, Set, Tuple, Union, Any

from fractions import Fraction
from scipy import sparse
from scipy.sparse import csr_matrix, csc_matrix
from sympy import Rational, Symbol as SympySymbol, And as SympyAnd, Or as SympyOr
from cobra import Configuration
from cobra.util.array import create_stoichiometric_matrix

LOG = logging.getLogger(__name__)

# LP suppression utilities live in networktools.  Imports are deferred to
# function bodies to avoid the circular dependency (networktools re-exports
# compression symbols).

# =============================================================================
# Utility Functions
# =============================================================================


[docs] def float_to_rational(val, max_precision: int = 6, max_denom: int = 100) -> Fraction: """Convert float to Fraction with bounded denominators.""" if isinstance(val, Fraction): return val import numbers if isinstance(val, numbers.Rational): return Fraction(val.numerator, val.denominator) # Handle sympy.Float and other numeric types that Fraction() doesn't accept directly if not isinstance(val, (int, float)): val = float(val) if val == 0: return Fraction(0) if val == int(val): return Fraction(int(val)) small_frac = Fraction(val).limit_denominator(max_denom) if round(float(small_frac), max_precision) == round(val, max_precision): return small_frac denom = 10**max_precision numer = round(val * denom) return Fraction(numer, denom)
[docs] def detect_max_precision(model) -> int: """Detect maximum decimal precision needed for model coefficients.""" max_decimals = 0 for rxn in model.reactions: for met, coeff in rxn.metabolites.items(): if isinstance(coeff, float) and coeff != 0: s = f"{abs(coeff):.15g}" if '.' in s: decimals = len(s.split('.')[1].rstrip('0')) max_decimals = max(max_decimals, decimals) return min(12, max(3, max_decimals))
def _lcm_list(numbers: List[int]) -> int: """Compute LCM of a list of integers.""" return reduce(lcm, numbers, 1) if numbers else 1 # ============================================================================= # Rational Matrix with Sparse Storage # =============================================================================
[docs] class RationalMatrix: """Sparse rational matrix using dual int sparse storage (numerators + denominators).""" def __init__(self, rows: int, cols: int): self._rows = rows self._cols = cols self._num_sparse: Optional[csr_matrix] = None # Numerators self._den_sparse: Optional[csr_matrix] = None # Denominators self._csc_cache: Optional[csc_matrix] = None self._batch_mode = False def _invalidate_cache(self): if not self._batch_mode: self._csc_cache = None # ------------------------------------------------------------------------- # Construction # ------------------------------------------------------------------------- @classmethod def _from_sparse(cls, num_sparse: csr_matrix, den_sparse: csr_matrix, scaled: Optional[csr_matrix] = None, common_denom: Optional[int] = None) -> 'RationalMatrix': """Create RationalMatrix from sparse components.""" rows, cols = num_sparse.shape result = cls(rows, cols) result._num_sparse = num_sparse.copy() result._den_sparse = den_sparse.copy() return result @classmethod
[docs] def identity(cls, size: int) -> 'RationalMatrix': """Create identity matrix.""" row_idx = list(range(size)) col_idx = list(range(size)) num_data = [1] * size den_data = [1] * size num_sparse = csr_matrix((num_data, (row_idx, col_idx)), shape=(size, size), dtype=np.int64) den_sparse = csr_matrix((den_data, (row_idx, col_idx)), shape=(size, size), dtype=np.int64) return cls._from_sparse(num_sparse, den_sparse)
@classmethod
[docs] def from_numpy(cls, arr: np.ndarray, max_precision: int = 6, max_denom: int = 100) -> 'RationalMatrix': """Create RationalMatrix from numpy array.""" rows, cols = arr.shape row_idx, col_idx, num_data, den_data = [], [], [], [] for r in range(rows): for c in range(cols): val = arr[r, c] if val != 0: frac = float_to_rational(val, max_precision, max_denom) row_idx.append(r) col_idx.append(c) num_data.append(frac.numerator) den_data.append(frac.denominator) num_sparse = csr_matrix((num_data, (row_idx, col_idx)), shape=(rows, cols), dtype=np.int64) den_sparse = csr_matrix((den_data, (row_idx, col_idx)), shape=(rows, cols), dtype=np.int64) return cls._from_sparse(num_sparse, den_sparse)
@classmethod
[docs] def from_cobra_model(cls, model, max_precision: int = 6, max_denom: int = 100) -> 'RationalMatrix': """Create RationalMatrix from COBRA model stoichiometry.""" num_mets = len(model.metabolites) num_rxns = len(model.reactions) met_index = {m.id: i for i, m in enumerate(model.metabolites)} row_idx, col_idx, num_data, den_data = [], [], [], [] for j, rxn in enumerate(model.reactions): for met, coeff in rxn.metabolites.items(): if coeff != 0: i = met_index[met.id] if isinstance(coeff, Fraction): frac = coeff elif hasattr(coeff, 'p') and hasattr(coeff, 'q'): frac = Fraction(int(coeff.p), int(coeff.q)) elif hasattr(coeff, 'numerator'): frac = Fraction(coeff.numerator, coeff.denominator) else: frac = float_to_rational(coeff, max_precision, max_denom) row_idx.append(i) col_idx.append(j) num_data.append(frac.numerator) den_data.append(frac.denominator) num_sparse = csr_matrix((num_data, (row_idx, col_idx)), shape=(num_mets, num_rxns), dtype=np.int64) den_sparse = csr_matrix((den_data, (row_idx, col_idx)), shape=(num_mets, num_rxns), dtype=np.int64) return cls._from_sparse(num_sparse, den_sparse)
@classmethod def _build_from_sparse_data(cls, row_indices: List[int], col_indices: List[int], numerators: List[int], denominators: List[int], num_rows: int, num_cols: int) -> 'RationalMatrix': """Build RationalMatrix from sparse coordinate data.""" num_sparse = csr_matrix((numerators, (row_indices, col_indices)), shape=(num_rows, num_cols), dtype=np.int64) den_sparse = csr_matrix((denominators, (row_indices, col_indices)), shape=(num_rows, num_cols), dtype=np.int64) return cls._from_sparse(num_sparse, den_sparse) # ------------------------------------------------------------------------- # Size queries # ------------------------------------------------------------------------- def get_row_count(self) -> int: return self._rows def get_column_count(self) -> int: return self._cols # ------------------------------------------------------------------------- # Iteration # -------------------------------------------------------------------------
[docs] def iter_column_fractions(self, col: int) -> Iterator[Tuple[int, Fraction]]: """Iterate over non-zero entries in column as (row, Fraction) pairs.""" if self._csc_cache is None: self._csc_cache = self._num_sparse.tocsc() self._den_csc_cache = self._den_sparse.tocsc() start = self._csc_cache.indptr[col] end = self._csc_cache.indptr[col + 1] for idx in range(start, end): row = self._csc_cache.indices[idx] num = int(self._csc_cache.data[idx]) den = int(self._den_csc_cache.data[idx]) if num != 0: yield row, Fraction(num, den)
[docs] def get_signum(self, row: int, col: int) -> int: """Return sign of element: -1, 0, or 1.""" num = self._num_sparse[row, col] if num > 0: return 1 elif num < 0: return -1 return 0
# ------------------------------------------------------------------------- # Batch edit mode # -------------------------------------------------------------------------
[docs] def begin_batch_edit(self): """Enter batch edit mode - delays cache invalidation.""" self._batch_mode = True self._num_sparse = self._num_sparse.tolil() self._den_sparse = self._den_sparse.tolil()
[docs] def end_batch_edit(self): """Exit batch edit mode - converts back to CSR and invalidates cache.""" self._batch_mode = False self._num_sparse = self._num_sparse.tocsr() self._den_sparse = self._den_sparse.tocsr() self._csc_cache = None
# ------------------------------------------------------------------------- # Matrix operations # -------------------------------------------------------------------------
[docs] def clone(self) -> 'RationalMatrix': """Create a deep copy.""" return RationalMatrix._from_sparse(self._num_sparse.copy(), self._den_sparse.copy())
[docs] def submatrix(self, rows: int, cols: int) -> 'RationalMatrix': """Extract top-left submatrix of given dimensions.""" num_sub = self._num_sparse[:rows, :cols].copy() den_sub = self._den_sparse[:rows, :cols].copy() return RationalMatrix._from_sparse(num_sub, den_sub)
[docs] def remove_rows(self, keep_indices: List[int]) -> None: """Keep only the specified rows.""" keep = np.array(keep_indices, dtype=np.intp) self._num_sparse = self._num_sparse[keep, :] self._den_sparse = self._den_sparse[keep, :] self._rows = len(keep_indices) self._invalidate_cache()
[docs] def remove_columns(self, keep_indices: List[int]) -> None: """Keep only the specified columns.""" keep = np.array(keep_indices, dtype=np.intp) self._num_sparse = self._num_sparse[:, keep] self._den_sparse = self._den_sparse[:, keep] self._cols = len(keep_indices) self._invalidate_cache()
[docs] def add_scaled_column(self, dst_col: int, src_col: int, scalar_num: int, scalar_den: int) -> None: """Add scalar * column[src] to column[dst]. dst[i] += (num/den) * src[i]""" if scalar_num == 0: return num_lil = self._num_sparse den_lil = self._den_sparse # Get source column entries if hasattr(num_lil, 'rows'): # LIL format src_rows = num_lil.rows[src_col] if num_lil.format == 'csc' else None # Convert to CSC for efficient column access num_csc = num_lil.tocsc() if num_lil.format != 'csc' else num_lil den_csc = den_lil.tocsc() if den_lil.format != 'csc' else den_lil start = num_csc.indptr[src_col] end = num_csc.indptr[src_col + 1] for idx in range(start, end): row = num_csc.indices[idx] src_num = int(num_csc.data[idx]) src_den = int(den_csc.data[idx]) if src_num == 0: continue # Get current dst value dst_num = int(num_lil[row, dst_col]) dst_den = int(den_lil[row, dst_col]) if dst_num != 0 else 1 # Compute: dst + (scalar_num/scalar_den) * (src_num/src_den) # = dst_num/dst_den + (scalar_num * src_num)/(scalar_den * src_den) add_num = scalar_num * src_num add_den = scalar_den * src_den if dst_num == 0: new_num, new_den = add_num, add_den else: # Common denominator addition new_num = dst_num * add_den + add_num * dst_den new_den = dst_den * add_den # Reduce if new_num != 0: g = gcd(abs(new_num), new_den) new_num //= g new_den //= g num_lil[row, dst_col] = new_num den_lil[row, dst_col] = new_den if new_num != 0 else 0 self._invalidate_cache()
# ------------------------------------------------------------------------- # Conversion # -------------------------------------------------------------------------
[docs] def to_numpy(self) -> np.ndarray: """Convert to numpy float array.""" result = np.zeros((self._rows, self._cols), dtype=np.float64) coo_num = self._num_sparse.tocoo() coo_den = self._den_sparse.tocoo() for r, c, num, den in zip(coo_num.row, coo_num.col, coo_num.data, coo_den.data): if num != 0: result[r, c] = float(num) / float(den) return result
[docs] def to_sparse_csr(self) -> Tuple[csr_matrix, int]: """Return sparse representation and common denominator. Returns numerator matrix scaled by LCM of denominators, plus the LCM. """ # Compute LCM of all denominators dens = self._den_sparse.data if len(dens) == 0: return csr_matrix((self._rows, self._cols), dtype=np.int64), 1 common_denom = _lcm_list([int(d) for d in dens if d != 0]) # Scale numerators coo_num = self._num_sparse.tocoo() coo_den = self._den_sparse.tocoo() scaled_data = [] for num, den in zip(coo_num.data, coo_den.data): if num != 0: scaled_data.append(int(num) * (common_denom // int(den))) else: scaled_data.append(0) scaled = csr_matrix((scaled_data, (coo_num.row, coo_num.col)), shape=(self._rows, self._cols), dtype=np.int64) return scaled, common_denom
[docs] def to_sparse_pattern(self) -> Tuple[csr_matrix, Dict[int, Dict[int, Fraction]]]: """Return sparsity pattern as CSR and row-wise Fraction data. For pattern analysis (coupled reaction detection) without integer overflow. Returns: pattern: CSR matrix with 1s at non-zero positions (for indptr/indices) row_data: {row: {col: Fraction}} for actual values """ # Build pattern matrix (just 1s for structure) coo_num = self._num_sparse.tocoo() pattern_data = [1] * len(coo_num.data) pattern = csr_matrix((pattern_data, (coo_num.row, coo_num.col)), shape=(self._rows, self._cols), dtype=np.int8) # Build row-wise Fraction data coo_den = self._den_sparse.tocoo() row_data: Dict[int, Dict[int, Fraction]] = {} for r, c, num, den in zip(coo_num.row, coo_num.col, coo_num.data, coo_den.data): num_int = int(num) den_int = int(den) if den != 0 else 1 if num_int != 0: if r not in row_data: row_data[r] = {} row_data[r][c] = Fraction(num_int, den_int) return pattern, row_data
def __repr__(self) -> str: return f"RationalMatrix({self._rows}x{self._cols})"
# ============================================================================= # Sparse Integer RREF for Nullspace Computation # ============================================================================= def _rref_integer_sparse(rm: RationalMatrix) -> Tuple[Dict[int, Dict[int, int]], int, List[int]]: """Compute integer RREF using dict-of-dicts for sparse row operations. Columns are pre-sorted by nnz ascending so that sparse (likely pivot) columns are processed first, reducing pivot-search time and fill-in during elimination. Rows are pre-sorted by nnz ascending, and pivot rows are selected by Markowitz criterion (sparsest row first, tie-break by smallest absolute value). Results are translated back to the original column ordering before return. Uses GCD pre-scaling before row operations and post-reduction to control coefficient growth. No denominator tracking needed — rows are arbitrarily scalable for nullspace computation. Args: rm: Input RationalMatrix Returns: (rref_data, rank, pivot_columns) rref_data: {rref_row: {orig_col: value}} in original column space pivot_columns: pivot column indices in original column space """ rows = rm.get_row_count() cols = rm.get_column_count() # --- Column sorting: sparse columns first --- # col_order[sorted_pos] = original_col nnz_per_col = np.diff(rm._num_sparse.tocsc().indptr) col_order = np.argsort(nnz_per_col, kind='stable').tolist() col_inverse = [0] * cols for sorted_pos, orig_col in enumerate(col_order): col_inverse[orig_col] = sorted_pos # Convert to integer matrix (scale each row by LCM of denominators) # Store column indices in sorted space num_csr = rm._num_sparse.tocsr() den_csr = rm._den_sparse.tocsr() data: Dict[int, Dict[int, int]] = {} for r in range(rows): start, end = num_csr.indptr[r], num_csr.indptr[r + 1] if start == end: continue # Compute row LCM of denominators row_dens = [int(den_csr.data[i]) for i in range(start, end) if den_csr.data[i] != 0] row_lcm = reduce(lcm, row_dens, 1) if row_dens else 1 # Scale numerators; store using sorted column index row_data = {} for i in range(start, end): num = int(num_csr.data[i]) den = int(den_csr.data[i]) if den_csr.data[i] != 0 else 1 orig_col = int(num_csr.indices[i]) scaled = num * (row_lcm // den) if scaled != 0: row_data[col_inverse[orig_col]] = scaled if row_data: data[r] = row_data # --- Row sorting: sparse rows first (better initial pivot candidates) --- if data: sorted_row_keys = sorted(data.keys(), key=lambda r: len(data[r])) data = {new_r: data[old_r] for new_r, old_r in enumerate(sorted_row_keys)} # Gaussian elimination with partial pivoting (all indices in sorted column space) pivot_cols_sorted = [] pivot_row = 0 for pivot_col in range(cols): if pivot_row >= rows: break # Find pivot: sparsest row first (Markowitz), then smallest abs value best_row, best_val = -1, 0 best_nnz, best_abs = float('inf'), float('inf') for r, row_data in data.items(): if r < pivot_row: continue if pivot_col in row_data: v = row_data[pivot_col] rnnz = len(row_data) if rnnz < best_nnz or (rnnz == best_nnz and abs(v) < best_abs): best_row, best_val = r, v best_nnz, best_abs = rnnz, abs(v) if best_row < 0: continue # Swap rows if needed if best_row != pivot_row: if pivot_row in data: if best_row in data: data[pivot_row], data[best_row] = data[best_row], data[pivot_row] else: data[best_row] = data.pop(pivot_row) elif best_row in data: data[pivot_row] = data.pop(best_row) pivot_row_data = data.get(pivot_row) if pivot_row_data is None: continue pivot_val = pivot_row_data.get(pivot_col, 0) if pivot_val == 0: continue pivot_cols_sorted.append(pivot_col) # Collect rows to eliminate (snapshot keys before modification) elim_targets = [(r, row_data[pivot_col]) for r, row_data in data.items() if r != pivot_row and pivot_col in row_data] # Eliminate pivot column from all other rows for elim_row, elim_val in elim_targets: elim_row_data = data[elim_row] # Pre-scale by GCD to minimize coefficient growth. # math.gcd handles negatives and returns a positive value (Python 3.5+). g = gcd(pivot_val, elim_val) pv_scaled = pivot_val // g ev_scaled = elim_val // g # new[c] = elim[c] * pv_scaled - ev_scaled * pivot[c] new_row = {} for c, p_val in pivot_row_data.items(): new_val = elim_row_data.get(c, 0) * pv_scaled - ev_scaled * p_val if new_val != 0: new_row[c] = new_val for c, e_val in elim_row_data.items(): if c not in pivot_row_data: new_val = e_val * pv_scaled if new_val != 0: new_row[c] = new_val # Update row and reduce by GCD. # gcd(*values) uses one C-level call instead of len(row) Python calls. if new_row: data[elim_row] = new_row row_gcd = gcd(*new_row.values()) if row_gcd > 1: for c in new_row: new_row[c] //= row_gcd else: del data[elim_row] pivot_row += 1 # Final GCD reduction of all rows for row_data in data.values(): row_gcd = gcd(*row_data.values()) if row_gcd > 1: for c in row_data: row_data[c] //= row_gcd # Translate results back to original column space original_data = {r: {col_order[sc]: v for sc, v in rd.items()} for r, rd in data.items()} pivot_cols_original = [col_order[p] for p in pivot_cols_sorted] return original_data, len(pivot_cols_original), pivot_cols_original def _nullspace_sparse(matrix: RationalMatrix) -> RationalMatrix: """Compute nullspace using integer RREF with row scaling. No denominators needed - each row in RREF is scaled so pivot divides all elements. The nullspace is extracted by reading off the relationships between pivot and free columns. """ cols = matrix.get_column_count() # Compute integer RREF rref_data, rank, pivot_cols = _rref_integer_sparse(matrix) if rank == cols: return RationalMatrix(cols, 0) # Free columns pivot_set = set(pivot_cols) free_cols = [c for c in range(cols) if c not in pivot_set] nullity = len(free_cols) if nullity == 0: return RationalMatrix(cols, 0) # Build kernel matrix # For each free column f, the nullspace vector has: # - Entry 1 at position f (the free variable) # - Entry -rref[i,f]/rref[i,pivot_i] at each pivot position row_indices, col_indices = [], [] numerators, denominators = [], [] for k, free_col in enumerate(free_cols): # Identity entry for free variable row_indices.append(free_col) col_indices.append(k) numerators.append(1) denominators.append(1) # Entries from RREF for pivot variables for i, pivot_col in enumerate(pivot_cols): row_data = rref_data.get(i, {}) val_at_free = row_data.get(free_col, 0) if val_at_free != 0: pivot_val = row_data.get(pivot_col, 1) # Should always exist # Nullspace entry: -val_at_free / pivot_val g = gcd(abs(val_at_free), abs(pivot_val)) num = -val_at_free // g den = pivot_val // g # Ensure positive denominator if den < 0: num, den = -num, -den row_indices.append(pivot_col) col_indices.append(k) numerators.append(num) denominators.append(den) return RationalMatrix._build_from_sparse_data(row_indices, col_indices, numerators, denominators, cols, nullity) # ============================================================================= # Linear Algebra Functions # =============================================================================
[docs] def nullspace(matrix: RationalMatrix) -> RationalMatrix: """Compute right nullspace (kernel). Returns K where matrix @ K = 0. Uses integer RREF with column/row pre-sorting and GCD reduction. All arithmetic is Python arbitrary-precision integers — no overflow possible. Args: matrix: Input RationalMatrix Returns: Kernel matrix K where matrix @ K = 0 """ return _nullspace_sparse(matrix)
[docs] def basic_columns(matrix: RationalMatrix) -> List[int]: """Find indices of basic (pivot) columns using integer RREF.""" _, _, pivot_cols = _rref_integer_sparse(matrix) return pivot_cols
[docs] def basic_columns_from_numpy(mx: np.ndarray) -> List[int]: """Find basic columns from a numpy array.""" return basic_columns(RationalMatrix.from_numpy(mx))
# ============================================================================= # Configuration # =============================================================================
[docs] class CompressionMethod(Enum): """Compression methods for metabolic network compression.""" NULLSPACE = "Nullspace" # Nullspace-based compression RECURSIVE = "Recursive" # Iterate until no more compression possible @classmethod def all(cls) -> List['CompressionMethod']: return list(cls) @classmethod def none(cls) -> List['CompressionMethod']: return [] @classmethod
[docs] def standard(cls) -> List['CompressionMethod']: """Standard compression methods (recommended).""" return [cls.NULLSPACE, cls.RECURSIVE]
# ============================================================================= # Statistics # =============================================================================
[docs] class CompressionStatistics: """Tracks compression statistics for logging.""" def __init__(self): self.iteration_count = 0 self.zero_flux_count = 0 self.coupled_count = 0 self.unused_metabolite_count = 0 def inc_compression_iteration(self) -> int: self.iteration_count += 1 return self.iteration_count def get_compression_iteration(self) -> int: return self.iteration_count def inc_zero_flux_reactions(self) -> None: self.zero_flux_count += 1 def inc_coupled_reactions_count(self, count: int) -> None: self.coupled_count += count def inc_unused_metabolite(self) -> None: self.unused_metabolite_count += 1 def write_to_log(self) -> None: LOG.info(f"Compression complete: {self.iteration_count} iterations, " f"{self.zero_flux_count} zero-flux, " f"{self.coupled_count} coupled, {self.unused_metabolite_count} unused metabolites") def __repr__(self): return (f"CompressionStatistics(iterations={self.iteration_count}, " f"zero_flux={self.zero_flux_count}, " f"coupled={self.coupled_count})")
# ============================================================================= # Compression Record # =============================================================================
[docs] class CompressionRecord: """ Compression result with transformation matrices. Contains: pre @ stoich @ post == cmp For EFM expansion: efm_original = post @ efm_compressed """ def __init__(self, pre: RationalMatrix, cmp: RationalMatrix, post: RationalMatrix, meta_names: List[str], stats: Optional[CompressionStatistics] = None): self.pre = pre # metabolite transformation self.cmp = cmp # compressed stoich self.post = post # reaction transformation self.meta_names = list(meta_names) # compressed metabolite names (row order in cmp) self.stats = stats
# ============================================================================= # Working State (Internal) # ============================================================================= class _Size: """Mutable counter for active matrix dimensions during compression.""" def __init__(self, metas: int, reacs: int): self.metas = metas self.reacs = reacs class _WorkRecord: """Mutable state during compression algorithm.""" def __init__(self, stoich: RationalMatrix, meta_names: List[str], reac_names: List[str], bounds: Optional[List[Tuple[float, float]]] = None): rows, cols = stoich.get_row_count(), stoich.get_column_count() self.pre = RationalMatrix.identity(rows) self.cmp = stoich.clone() self.post = RationalMatrix.identity(cols) self.meta_names = list(meta_names) self.reac_names = list(reac_names) self.bounds = list(bounds) if bounds else [(-float('inf'), float('inf'))] * cols self.size = _Size(rows, cols) # Store original dimensions for get_truncated() self._orig_metas = rows self._orig_reacs = cols self.stats = CompressionStatistics() self.stats.inc_compression_iteration() def remove_reactions(self, suppressed: Optional[Set[str]]) -> bool: """Remove reactions by name - uses batch removal.""" if not suppressed: return False indices = set() for name in suppressed: try: idx = self.reac_names.index(name) if idx < self.size.reacs: indices.add(idx) except ValueError: continue if not indices: return False return self.remove_reactions_by_indices(indices) def remove_reactions_by_indices(self, indices: Set[int]) -> bool: """Remove reactions by index - uses batch removal for efficiency.""" if not indices: return False # Filter to only valid indices valid_indices = {idx for idx in indices if idx < self.size.reacs} if not valid_indices: return False # Compute indices to keep keep_indices = [i for i in range(self.size.reacs) if i not in valid_indices] if not keep_indices: self.size.reacs = 0 return True # Use batch column removal self._batch_remove_columns(keep_indices) return True def _batch_remove_columns(self, keep_indices: list) -> None: """Batch remove columns by keeping only specified indices.""" LOG.debug(f"_batch_remove_columns: keeping {len(keep_indices)} of {self.size.reacs}") LOG.debug(f" Before: cmp={self.cmp.get_column_count()}, post={self.post.get_column_count()}") # Remove columns from post and cmp matrices self.post.remove_columns(keep_indices) self.cmp.remove_columns(keep_indices) LOG.debug(f" After: cmp={self.cmp.get_column_count()}, post={self.post.get_column_count()}") # Reindex names and bounds new_names = [self.reac_names[i] for i in keep_indices] new_bounds = [self.bounds[i] for i in keep_indices] for i, name in enumerate(new_names): self.reac_names[i] = name self.bounds = new_bounds self.size.reacs = len(keep_indices) LOG.debug(f" size.reacs={self.size.reacs}") def _batch_remove_rows(self, keep_indices: list) -> None: """Batch remove rows (metabolites) by keeping only specified indices.""" LOG.debug(f"_batch_remove_rows: keeping {len(keep_indices)} of {self.size.metas}") # Remove rows from pre and cmp matrices self.pre.remove_rows(keep_indices) self.cmp.remove_rows(keep_indices) # Reindex metabolite names new_names = [self.meta_names[i] for i in keep_indices] for i, name in enumerate(new_names): self.meta_names[i] = name self.size.metas = len(keep_indices) LOG.debug(f" size.metas={self.size.metas}") def remove_metabolites_by_indices(self, indices: Set[int]) -> bool: """Remove metabolites by index - uses batch removal for efficiency.""" if not indices: return False # Filter to only valid indices valid_indices = {idx for idx in indices if idx < self.size.metas} if not valid_indices: return False # Compute indices to keep keep_indices = [i for i in range(self.size.metas) if i not in valid_indices] if not keep_indices: self.size.metas = 0 return True # Use batch row removal self._batch_remove_rows(keep_indices) return True def remove_unused_metabolites(self) -> bool: """Remove metabolites with all-zero rows - uses batch removal. Uses CSR indptr for O(m) zero-row detection (avoids element-by-element sparse indexing which is O(m*r) with large per-call Python overhead). """ mc, rc = self.size.metas, self.size.reacs # Slice active submatrix and convert to CSR for indptr access. # tocsr() is a no-op if already CSR (returns self), so this is safe # regardless of current format. num_csr = self.cmp._num_sparse[:mc, :rc].tocsr() zero_rows = np.where(np.diff(num_csr.indptr) == 0)[0] unused_indices = set(zero_rows.tolist()) for _ in unused_indices: self.stats.inc_unused_metabolite() if unused_indices: self.remove_metabolites_by_indices(unused_indices) return True return False def get_truncated(self) -> CompressionRecord: """Create final CompressionRecord with truncated matrices.""" mc, rc = self.size.metas, self.size.reacs # Use stored original dimensions (batch removal changes matrix sizes) m = self._orig_metas r = self._orig_reacs # Create truncated matrices using efficient submatrix extraction # pre: mc compressed metabolites × m original metabolites # cmp: mc compressed metabolites × rc compressed reactions # post: r original reactions × rc compressed reactions pre_trunc = self.pre.submatrix(mc, m) cmp_trunc = self.cmp.submatrix(mc, rc) post_trunc = self.post.submatrix(r, rc) meta_names_trunc = self.meta_names[:mc] return CompressionRecord(pre_trunc, cmp_trunc, post_trunc, meta_names_trunc, self.stats) # ============================================================================= # Core Algorithm # =============================================================================
[docs] class StoichMatrixCompressor: """Nullspace-based metabolic network compression.""" def __init__(self, *methods: CompressionMethod): self._methods = list(methods) if methods else CompressionMethod.standard()
[docs] def compress(self, stoich: RationalMatrix, meta_names: List[str], reac_names: List[str], suppressed: Optional[Set[str]] = None, bounds: Optional[List[Tuple[float, float]]] = None) -> CompressionRecord: """Compress network, return transformation matrices. Single-pass approach: each iteration computes the nullspace once, then removes zero-flux reactions AND merges coupled groups from the same kernel. Re-iterates only when contradicting groups were removed (which may expose new couplings). """ work = _WorkRecord(stoich, meta_names, reac_names, bounds) do_nullspace = CompressionMethod.NULLSPACE in self._methods do_recursive = CompressionMethod.RECURSIVE in self._methods work.remove_reactions(suppressed) if do_nullspace: while True: work.stats.inc_compression_iteration() work.remove_unused_metabolites() found = self._nullspace_compress(work) if not (found and do_recursive): break work.remove_unused_metabolites() work.stats.write_to_log() return work.get_truncated()
def _nullspace_compress(self, work: _WorkRecord) -> bool: """One pass of nullspace compression. Returns True if contradicting groups were removed (triggering re-iteration). Computes the nullspace once, then from the same kernel: - Identifies zero-flux reactions (absent from kernel) - Identifies and merges coupled reaction groups (proportional kernel rows) - Removes zero-flux + coupled slaves + contradicting groups in one batch """ # Build active submatrix for nullspace computation active = work.cmp.submatrix(work.size.metas, work.size.reacs) kernel = nullspace(active) LOG.debug( f"Nullspace: {active.get_row_count()}x{active.get_column_count()} -> kernel {kernel.get_row_count()}x{kernel.get_column_count()}" ) # Get kernel pattern (CSR for structure) and values (Fractions for exact ratios) kernel_pattern, kernel_values = kernel.to_sparse_pattern() # Find zero-flux reactions and merge coupled groups in one pass return self._handle_compress(work, kernel_pattern, kernel_values) def _find_zero_flux(self, work: _WorkRecord, kernel_sparse) -> set: """Find reactions with all-zero kernel rows (indices in current work).""" zero_flux = set() for reac in range(work.size.reacs): if kernel_sparse.indptr[reac] == kernel_sparse.indptr[reac + 1]: zero_flux.add(reac) work.stats.inc_zero_flux_reactions() return zero_flux def _find_coupled_groups(self, kernel_pattern, kernel_values, num_reacs): """Find groups of coupled reactions from kernel sparsity pattern. Args: kernel_pattern: CSR matrix with 1s at non-zero positions (for indptr/indices) kernel_values: {row: {col: Fraction}} for actual values Returns (groups, ratios) where: - groups: list of lists, each containing indices of coupled reactions - ratios: list where ratios[i] is the coupling ratio for reaction i """ # Group reactions by zero-pattern in kernel patterns = {} for reac in range(num_reacs): start = kernel_pattern.indptr[reac] end = kernel_pattern.indptr[reac + 1] pattern = tuple(kernel_pattern.indices[start:end]) patterns.setdefault(pattern, []).append(reac) potential_groups = [idxs for idxs in patterns.values() if len(idxs) > 1] groups = [] ratios = [None] * num_reacs for potential in potential_groups: ref_reac = potential[0] ref_start = kernel_pattern.indptr[ref_reac] ref_end = kernel_pattern.indptr[ref_reac + 1] nonzero_cols = list(kernel_pattern.indices[ref_start:ref_end]) if not nonzero_cols: continue for i, reac_a in enumerate(potential): if ratios[reac_a] is not None: continue group = None # Get values for reaction a a_row = kernel_values.get(reac_a, {}) for j in range(i + 1, len(potential)): reac_b = potential[j] if ratios[reac_b] is not None: continue # Get values for reaction b b_row = kernel_values.get(reac_b, {}) # Compute ratio from first non-zero column first_col = nonzero_cols[0] a_val = a_row.get(first_col, Fraction(0)) b_val = b_row.get(first_col, Fraction(0)) if b_val == 0: continue # ratio = a_val / b_val ratio = a_val / b_val # Check if ratio is consistent across all columns is_consistent = True for col in nonzero_cols[1:]: a_v = a_row.get(col, Fraction(0)) b_v = b_row.get(col, Fraction(0)) if b_v == 0 or a_v / b_v != ratio: is_consistent = False break if is_consistent: ratios[reac_b] = ratio if group is None: group = [reac_a] group.append(reac_b) if group is not None: groups.append(group) return groups, ratios def _handle_compress(self, work: _WorkRecord, kernel_pattern, kernel_values) -> bool: """Remove zero-flux and merge coupled reactions from a single kernel. From one nullspace computation: 1. Identifies zero-flux reactions (empty kernel rows) 2. Finds and merges coupled groups (proportional kernel rows) 3. Detects contradicting groups via bounds intersection 4. Removes everything in one batch Returns True if contradicting groups were removed (flux space changed), triggering re-iteration to find new couplings. """ # Collect zero-flux reactions (empty kernel rows) zero_flux = self._find_zero_flux(work, kernel_pattern) # Also catch reactions blocked by bounds (lb=ub=0) that still have # non-zero kernel rows. These are structurally present but carry # no flux. Removing them here avoids needing a separate FVA pass. for reac in range(work.size.reacs): if reac not in zero_flux: lb, ub = work.bounds[reac] if lb == 0 and ub == 0: zero_flux.add(reac) work.stats.inc_zero_flux_reactions() # Find and merge coupled groups groups, ratios = self._find_coupled_groups(kernel_pattern, kernel_values, work.size.reacs) reactions_to_remove = set(zero_flux) contradicting_removed = False # Enter batch edit mode for cmp and post matrices to avoid repeated LIL conversions work.cmp.begin_batch_edit() work.post.begin_batch_edit() for group in groups: self._combine_coupled(work, group, ratios) # Check bounds intersection to detect contradicting groups. # v_master = ratios[slave] * v_slave => v_slave = v_master / ratios[slave] # Slave constraint: lb_s <= v_slave <= ub_s # Translates to bounds on v_master depending on sign of ratios[slave]. master = group[0] lb_m, ub_m = work.bounds[master] intersected_lb = lb_m intersected_ub = ub_m for slave in group[1:]: ratio = ratios[slave] # v_master / v_slave lb_s, ub_s = work.bounds[slave] if ratio > 0: # lb_s * ratio <= v_master <= ub_s * ratio s_lb = -float('inf') if lb_s == -float('inf') else lb_s * float(ratio) s_ub = float('inf') if ub_s == float('inf') else ub_s * float(ratio) else: # ratio < 0 # ub_s * ratio <= v_master <= lb_s * ratio s_lb = -float('inf') if ub_s == float('inf') else ub_s * float(ratio) s_ub = float('inf') if lb_s == -float('inf') else lb_s * float(ratio) intersected_lb = max(intersected_lb, s_lb) intersected_ub = min(intersected_ub, s_ub) # Update master bounds to intersection work.bounds[master] = (intersected_lb, intersected_ub) if intersected_lb > intersected_ub or (intersected_lb == 0 and intersected_ub == 0): # Contradicting: only zero flux feasible, or infeasible. # Remove master and all slaves. LOG.debug(f"Contradicting coupled group: {[work.reac_names[r] for r in group]}") for idx in group: reactions_to_remove.add(idx) contradicting_removed = True else: # Consistent: only remove slaves (merged into master) for idx in group[1:]: reactions_to_remove.add(idx) # End batch edit mode work.cmp.end_batch_edit() work.post.end_batch_edit() LOG.debug(f"Compression: {len(zero_flux)} zero-flux, {len(groups)} coupled groups, " f"removing {len(reactions_to_remove)} reactions, contradicting={contradicting_removed}") work.remove_reactions_by_indices(reactions_to_remove) return contradicting_removed def _combine_coupled(self, work: _WorkRecord, group: List[int], ratios: List[Optional[Fraction]]) -> None: """Combine coupled reactions into master reaction. Uses batch column operations with native fmpq arithmetic for performance. """ master = group[0] LOG.debug(f"Combining coupled: {[work.reac_names[r] for r in group]}") for slave in group[1:]: ratio = ratios[slave] # multiplier = 1/ratio = ratio.denominator / ratio.numerator mult_num, mult_den = ratio.denominator, ratio.numerator # Update stoichiometric matrix: cmp[:, master] += cmp[:, slave] * mult work.cmp.add_scaled_column(master, slave, mult_num, mult_den) # Update post matrix: post[:, master] += post[:, slave] * mult work.post.add_scaled_column(master, slave, mult_num, mult_den) work.stats.inc_coupled_reactions_count(len(group))
# ============================================================================= # COBRA Interface # =============================================================================
[docs] class CompressionResult: """Result of COBRA model compression.""" def __init__(self, compressed_model, compression_converter, pre_matrix, post_matrix, reaction_map, metabolite_map, statistics, methods_used, original_reaction_names, original_metabolite_names, flipped_reactions): self.compressed_model = compressed_model self.compression_converter = compression_converter self.pre_matrix = pre_matrix self.post_matrix = post_matrix self.reaction_map = reaction_map self.metabolite_map = metabolite_map self.statistics = statistics self.methods_used = methods_used self.original_reaction_names = original_reaction_names self.original_metabolite_names = original_metabolite_names self.flipped_reactions = flipped_reactions @property def compression_ratio(self) -> float: return len(self.compressed_model.reactions) / len(self.original_reaction_names) @property def reactions_removed(self) -> int: return len(self.original_reaction_names) - len(self.compressed_model.reactions) def summary(self) -> str: return f"Compressed {len(self.original_reaction_names)} -> {len(self.compressed_model.reactions)} reactions ({self.compression_ratio:.1%})"
[docs] class CompressionConverter: """Bidirectional transformer for expressions between original and compressed spaces.""" def __init__(self, reaction_map: Dict[str, Dict[str, Union[float, Fraction]]], metabolite_map: Dict[str, Dict[str, Union[float, Fraction]]], flipped_reactions: List[str]): self.reaction_map = reaction_map self.metabolite_map = metabolite_map self.flipped_reactions = set(flipped_reactions)
[docs] def expand_expression(self, expression: Dict[str, float], remove_missing: bool = False) -> Dict[str, float]: """Transform expression from compressed back to original space.""" expanded = {} for comp_rxn, comp_coeff in expression.items(): if comp_rxn in self.reaction_map: for orig_rxn, scale in self.reaction_map[comp_rxn].items(): coeff = comp_coeff * scale if orig_rxn in self.flipped_reactions: coeff = -coeff expanded[orig_rxn] = expanded.get(orig_rxn, 0) + coeff elif not remove_missing: LOG.warning(f"Compressed reaction {comp_rxn} not found") return expanded
[docs] def remove_conservation_relations(model) -> None: """Remove conservation relations (dependent metabolites) from a model. This reduces the number of metabolites while maintaining the original flux space. Uses exact rational arithmetic to find linearly independent rows. Args: model: COBRA model to modify in-place """ # Build the transposed stoichiometric matrix (reactions × metabolites) directly # from cobra reaction coefficients — avoids a sparse→dense→sparse round-trip that # would iterate over all m×r elements including zeros. num_rxns = len(model.reactions) num_mets = len(model.metabolites) met_index = {m.id: i for i, m in enumerate(model.metabolites)} row_idx, col_idx, num_data, den_data = [], [], [], [] for j, rxn in enumerate(model.reactions): for met, coeff in rxn._metabolites.items(): if coeff == 0: continue i = met_index[met.id] if isinstance(coeff, Fraction): frac = coeff elif hasattr(coeff, 'numerator'): frac = Fraction(coeff.numerator, coeff.denominator) else: frac = float_to_rational(float(coeff)) row_idx.append(j) # reaction → row (transposed layout) col_idx.append(i) # metabolite → column num_data.append(frac.numerator) den_data.append(frac.denominator) from scipy.sparse import csr_matrix as _csr num_sparse = _csr((num_data, (row_idx, col_idx)), shape=(num_rxns, num_mets), dtype=np.int64) den_sparse = _csr((den_data, (row_idx, col_idx)), shape=(num_rxns, num_mets), dtype=np.int64) rm = RationalMatrix._from_sparse(num_sparse, den_sparse) basic_mets = basic_columns(rm) dependent_mets = [model.metabolites[i] for i in set(range(num_mets)) - set(basic_mets)] if dependent_mets: model.remove_metabolites(dependent_mets)
[docs] def compress_cobra_model(model, methods: Optional[List[Union[str, CompressionMethod]]] = None, in_place: bool = True, suppressed_reactions: Optional[Set[str]] = None) -> CompressionResult: """ Compress a COBRA model using nullspace-based coupling detection. Note: Model should be preprocessed first (rational coefficients, conservation relations removed). Use networktools.compress_model() for automatic preprocessing. Args: model: COBRA model to compress (should be preprocessed) methods: Compression methods. Default: CompressionMethod.standard() in_place: Modify original model (True) or work on copy (False) suppressed_reactions: Reaction names to exclude from compression Returns: CompressionResult with compressed model and transformation data """ if not in_place: model = copy.deepcopy(model) original_reaction_names = [r.id for r in model.reactions] original_metabolite_names = [m.id for m in model.metabolites] # Parse methods if methods is None: compression_methods = CompressionMethod.standard() else: compression_methods = [] for method in methods: if isinstance(method, CompressionMethod): compression_methods.append(method) elif isinstance(method, str): compression_methods.append(CompressionMethod[method.upper()]) # Build stoichiometric matrix with exact arithmetic stoich_matrix = RationalMatrix.from_cobra_model(model) metabolite_names = [m.id for m in model.metabolites] reaction_names = [r.id for r in model.reactions] # Run compression compressor = StoichMatrixCompressor(*compression_methods) bounds = [(float(r.lower_bound), float(r.upper_bound)) for r in model.reactions] compression_record = compressor.compress(stoich_matrix, metabolite_names, reaction_names, suppressed_reactions, bounds) # Apply to model (uses direct manipulation, bypasses solver) reaction_map = _apply_compression_to_model(model, compression_record, reaction_names) pre_matrix = compression_record.pre.to_numpy() post_matrix = compression_record.post.to_numpy() converter = CompressionConverter(reaction_map, {}, []) return CompressionResult(compressed_model=model, compression_converter=converter, pre_matrix=pre_matrix, post_matrix=post_matrix, reaction_map=reaction_map, metabolite_map={}, statistics=compression_record.stats, methods_used=compression_methods, original_reaction_names=original_reaction_names, original_metabolite_names=original_metabolite_names, flipped_reactions=[])
def _apply_compression_to_model(model, compression_record, original_reaction_names): """Apply compression results to COBRA model using exact Fraction arithmetic. Sets stoichiometric coefficients and bounds as Fractions directly from the compressed matrices, avoiding floating-point arithmetic. Uses direct attribute manipulation to bypass solver updates during modification. Solver must be rebuilt after calling this function. """ post = compression_record.post cmp = compression_record.cmp meta_names = compression_record.meta_names num_original = post.get_row_count() num_compressed = post.get_column_count() num_metas = cmp.get_row_count() # Track which original reactions to keep (main reactions) keep_rxns = [False] * num_original reaction_map = {} # Build metabolite lookup for compressed metabolites met_lookup = {name: model.metabolites.get_by_id(name) for name in meta_names} for j in range(num_compressed): # Find contributing original reactions from POST matrix (sparse iteration) contributing = list(post.iter_column_fractions(j)) if not contributing: continue # Select "main" reaction: most metabolites (nonzeros), ties broken alphabetically main_idx = min( (idx for idx, _ in contributing), key=lambda i: (-len(model.reactions[i]._metabolites), model.reactions[i].id), ) main_rxn = model.reactions[main_idx] keep_rxns[main_idx] = True if len(contributing) == 1: # Standalone reaction — not merged with anything. reaction_map[main_rxn.id] = {original_reaction_names[main_idx]: Fraction(1)} continue # --- Merged group (2+ contributing reactions) --- # Store subset info main_rxn.subset_rxns = [idx for idx, _ in contributing] main_rxn.subset_stoich = [coeff for _, coeff in contributing] # Build combined reaction name from contributing reactions for idx, _ in contributing: if idx == main_idx: continue rxn = model.reactions[idx] if len(main_rxn.id) + len(rxn.id) < 220 and not main_rxn.id.endswith('...'): main_rxn.id += '*' + rxn.id elif not main_rxn.id.endswith('...'): main_rxn.id += '...' # Build new metabolites dict from cmp matrix (sparse iteration) new_metabolites = {} for m_idx, frac in cmp.iter_column_fractions(j): met = met_lookup[meta_names[m_idx]] new_metabolites[met] = frac # OPTIMIZATION: Direct _metabolites assignment (bypasses solver updates) # Clear old metabolite back-references for met in list(main_rxn._metabolites.keys()): if main_rxn in met._reaction: met._reaction.discard(main_rxn) # Set new metabolites directly main_rxn._metabolites = new_metabolites # Update new metabolite back-references for met in new_metabolites: met._reaction.add(main_rxn) # Compute bounds as Fractions from original reactions scaled by POST factors lb_candidates = [] ub_candidates = [] for idx, coeff in contributing: rxn = model.reactions[idx] lb, ub = rxn.lower_bound, rxn.upper_bound # v_original = coeff * v_compressed (from POST interpretation) # constraint: lb <= v_original <= ub # so: lb <= coeff * v_compressed <= ub # if coeff > 0: lb/coeff <= v_compressed <= ub/coeff # if coeff < 0: ub/coeff <= v_compressed <= lb/coeff (inequality flips) if coeff > 0: if lb != -float('inf'): lb_candidates.append(Fraction(lb) / coeff if lb != 0 else Fraction(0)) if ub != float('inf'): ub_candidates.append(Fraction(ub) / coeff if ub != 0 else Fraction(0)) else: # coeff < 0 if ub != float('inf'): lb_candidates.append(Fraction(ub) / coeff if ub != 0 else Fraction(0)) if lb != -float('inf'): ub_candidates.append(Fraction(lb) / coeff if lb != 0 else Fraction(0)) # OPTIMIZATION: Direct bounds assignment (bypasses solver updates) main_rxn._lower_bound = max(lb_candidates) if lb_candidates else -float('inf') main_rxn._upper_bound = min(ub_candidates) if ub_candidates else float('inf') # Update stored objective through compression factors obj_dict = getattr(model, '_suppressed_obj', None) if obj_dict is not None: merged_obj = sum( obj_dict.pop(original_reaction_names[idx], 0.0) * float(coeff) for idx, coeff in contributing ) if merged_obj != 0: obj_dict[main_rxn.id] = merged_obj # Build reaction map reaction_map[main_rxn.id] = {original_reaction_names[idx]: coeff for idx, coeff in contributing} # OPTIMIZATION: Batch reaction and metabolite removal # Use reaction objects directly (not IDs, since IDs are modified during compression) from cobra import DictList # Build set of reaction objects to keep keep_rxn_objs = {model.reactions[i] for i in range(num_original) if keep_rxns[i]} # Find metabolites that will be in kept reactions mets_in_kept_rxns = set() for rxn in keep_rxn_objs: mets_in_kept_rxns.update(rxn._metabolites.keys()) # Build new reactions list (only kept reactions) new_reactions = DictList() for rxn in model.reactions: if rxn in keep_rxn_objs: new_reactions.append(rxn) else: # Clear metabolite back-references for removed reaction for met in list(rxn._metabolites.keys()): if rxn in met._reaction: met._reaction.discard(rxn) rxn._model = None # Build new metabolites list (only those in kept reactions) new_metabolites = DictList() for met in model.metabolites: if met in mets_in_kept_rxns: new_metabolites.append(met) else: met._model = None # Replace model lists (using __dict__ to bypass any property setters) model.__dict__['reactions'] = new_reactions model.__dict__['metabolites'] = new_metabolites # Update model references for rxn in model.reactions: rxn._model = model for met in model.metabolites: met._model = model return reaction_map # ============================================================================= # Preprocessing Functions # =============================================================================
[docs] def remove_blocked_reactions(model) -> List: """Remove blocked reactions (bounds == (0, 0)) from a network.""" blocked_reactions = [reac for reac in model.reactions if reac.bounds == (0, 0)] if blocked_reactions: model.remove_reactions(blocked_reactions, remove_orphans=True) return blocked_reactions
[docs] def remove_ext_mets(model) -> None: """Remove external metabolites from 'External_Species' compartment.""" external_mets = [m for m in model.metabolites if m.compartment == 'External_Species'] model.remove_metabolites(external_mets) stoich_mat = create_stoichiometric_matrix(model) obsolete_reacs = [r for r, has_nonzero in zip(model.reactions, np.any(stoich_mat, 0)) if not has_nonzero] model.remove_reactions(obsolete_reacs)
[docs] def remove_dummy_bounds(model) -> None: """Replace COBRA standard bounds with +/-inf.""" cobra_conf = Configuration() bound_thres = max(abs(cobra_conf.lower_bound), abs(cobra_conf.upper_bound)) if any(any(abs(b) >= bound_thres for b in r.bounds) for r in model.reactions): LOG.warning(f'Removing reaction bounds >= {round(bound_thres)}.') for rxn in model.reactions: if rxn.lower_bound <= -bound_thres: rxn.lower_bound = -np.inf if rxn.upper_bound >= bound_thres: rxn.upper_bound = np.inf
[docs] def stoichmat_coeff2rational(model) -> None: """Convert stoichiometric coefficients to rational numbers.""" for rxn in model.reactions: for met, coeff in rxn._metabolites.items(): if isinstance(coeff, (float, int)): rxn._metabolites[met] = float_to_rational(coeff) elif not hasattr(coeff, 'p'): # Not sympy.Rational if hasattr(coeff, 'numerator'): # fractions.Fraction rxn._metabolites[met] = Rational(coeff.numerator, coeff.denominator) else: raise TypeError(f"Unsupported coefficient type: {type(coeff)}")
[docs] def stoichmat_coeff2float(model) -> None: """Convert stoichiometric coefficients to floats.""" for rxn in model.reactions: for met, coeff in rxn._metabolites.items(): rxn._metabolites[met] = float(coeff)
# ============================================================================= # GPR Propagation Helpers # ============================================================================= def _gpr_ast_to_sympy(node): """Convert a cobra GPR AST node to a sympy boolean expression. Returns None for empty GPR (node is None), meaning the reaction has no gene requirement and is always active. """ if node is None: return None if isinstance(node, ast.BoolOp): children = [_gpr_ast_to_sympy(v) for v in node.values] if isinstance(node.op, ast.And): return SympyAnd(*children) else: return SympyOr(*children) elif isinstance(node, ast.Name): return SympySymbol(node.id) return None def _sympy_to_gpr_string(expr): """Convert a sympy boolean expression to a GPR rule string. Produces correctly parenthesised output with sorted gene names for deterministic results. Returns '' for None input. """ if expr is None: return '' if isinstance(expr, SympySymbol): return str(expr) if expr.func == SympyAnd: parts = [] for arg in sorted(expr.args, key=str): s = _sympy_to_gpr_string(arg) if hasattr(arg, 'func') and arg.func == SympyOr: s = f'({s})' parts.append(s) return ' and '.join(parts) if expr.func == SympyOr: parts = [] for arg in sorted(expr.args, key=str): s = _sympy_to_gpr_string(arg) if hasattr(arg, 'func') and arg.func == SympyAnd: s = f'({s})' parts.append(s) return ' or '.join(parts) return str(expr) def _combine_gpr_and(gpr_bodies): """Combine GPR AST bodies with AND logic (for coupled/serial reaction merge). Args: gpr_bodies: list of AST nodes (reaction.gpr.body), may include None An empty/None GPR means the reaction has no gene requirement (always active), which acts as True in boolean logic. AND with True is a no-op, so empty GPRs are skipped. Returns '' if all inputs are empty (no gene restriction). Uses sympy And constructor which automatically flattens nested ANDs and deduplicates terms. Full simplification is deferred to reduce_gpr downstream. """ sympy_exprs = [_gpr_ast_to_sympy(b) for b in gpr_bodies] non_empty = [s for s in sympy_exprs if s is not None] if not non_empty: return '' if len(non_empty) == 1: return _sympy_to_gpr_string(non_empty[0]) combined = SympyAnd(*non_empty) return _sympy_to_gpr_string(combined) def _combine_gpr_or(gpr_bodies): """Combine GPR AST bodies with OR logic (for parallel reaction merge). Args: gpr_bodies: list of AST nodes (reaction.gpr.body), may include None If any input is None (reaction always active regardless of genes), the combined reaction is also always active, so the result is '' (no restriction). Uses sympy Or constructor which automatically flattens nested ORs and deduplicates terms. Full simplification is deferred to reduce_gpr downstream. """ sympy_exprs = [_gpr_ast_to_sympy(b) for b in gpr_bodies] if any(s is None for s in sympy_exprs): return '' if not sympy_exprs: return '' if len(sympy_exprs) == 1: return _sympy_to_gpr_string(sympy_exprs[0]) combined = SympyOr(*sympy_exprs) return _sympy_to_gpr_string(combined) # ============================================================================= # High-Level Compression API # =============================================================================
[docs] def compress_model(model, no_par_compress_reacs=set(), compression_backend='sparse_rref', propagate_gpr=False): """Compress a metabolic model using multiple techniques. Performs blocked reaction removal, conservation relation removal, and alternating dependent/parallel reaction lumping until no further compression is possible. Args: model: COBRA model to compress in-place no_par_compress_reacs: Reactions exempt from parallel compression compression_backend: Compression backend to use: - 'sparse_rref' (default): Pure Python sparse integer RREF. No external dependencies beyond NumPy/SciPy. - 'efmtool_rref' (legacy): Java-based EFMTool via JPype. Requires a JVM and the jpype1 package. propagate_gpr: If True, propagate and simplify GPR rules through compression (AND for coupled, OR for parallel merges). Empty GPR rules are correctly handled: skipped in AND (always active), and absorb in OR (result is always active). Uses sympy for boolean simplification. Default False. Returns: list of dict: Compression maps for reversing each compression step """ from straindesign.networktools import suppress_lp_context, _is_lp_suppressed with suppress_lp_context(model): cmp_mapReac = [] use_java = (compression_backend == 'efmtool_rref') LOG.info(' Removing blocked reactions.') remove_blocked_reactions(model) LOG.info(' Converting coefficients to rationals.') stoichmat_coeff2rational(model) coupled_changed = None # None = not yet computed run = 1 while True: numr = len(model.reactions) # 1. Parallel (cheap — hash-based, no RREF) LOG.info(f' Compression {run}: Lumping parallel reactions.') reac_map_exp = compress_model_parallel(model, no_par_compress_reacs, propagate_gpr=propagate_gpr) parallel_changed = numr > len(reac_map_exp) if parallel_changed: LOG.info(f' Reduced to {len(reac_map_exp)} reactions.') cmp_mapReac.append({"reac_map_exp": reac_map_exp, "parallel": True}) # 2. Conservation relation removal (reduces S rows for RREF) if use_java: _remove_conservation_relations_java(model) else: remove_conservation_relations(model) # 3. Exit if either parallel or coupled found nothing (after # at least one full cycle). If one step found nothing, # re-running it won't help — the model hasn't changed in # a way that creates new opportunities for that step. if coupled_changed is not None and (not parallel_changed or not coupled_changed): LOG.info(f' Compression complete ({run - 1} cycles).') break # 4. Coupled (expensive — nullspace/RREF) numr_pre = len(model.reactions) LOG.info(f' Compression {run}: Lumping coupled reactions.') reac_map_exp = compress_model_coupled(model, compression_backend, propagate_gpr=propagate_gpr) for new_reac, old_reac_val in reac_map_exp.items(): old_reacs = [r for r in no_par_compress_reacs if r in old_reac_val] if old_reacs: for r in old_reacs: no_par_compress_reacs.remove(r) no_par_compress_reacs.add(new_reac) coupled_changed = numr_pre > len(reac_map_exp) if coupled_changed: LOG.info(f' Reduced to {len(reac_map_exp)} reactions.') cmp_mapReac.append({"reac_map_exp": reac_map_exp, "parallel": False}) run += 1 # suppress_lp_context handles solver rebuild and objective restoration on exit return cmp_mapReac
def _remove_conservation_relations_java(model) -> None: """Remove conservation relations using Java efmtool.""" from . import efmtool_cmp_interface as efm stoich_mat = create_stoichiometric_matrix(model, array_type='lil') basic_mets = efm.basic_columns_rat_java(stoich_mat.transpose().toarray(), tolerance=0) dependent = [model.metabolites[i] for i in set(range(len(model.metabolites))) - set(basic_mets)] if dependent: model.remove_metabolites(dependent)
[docs] def compress_model_coupled(model, compression_backend='sparse_rref', propagate_gpr=False, suppressed_reactions=None): """Compress by lumping stoichiometrically coupled (dependent) reactions. Identifies groups of reactions whose flux vectors are proportional in every steady state (i.e. they share a common nullspace direction) and merges each group into a single lumped reaction. Both the pure-Python and legacy Java backends perform this operation; the compression_backend controls the nullspace algorithm. Args: model: COBRA model to compress in-place compression_backend: 'sparse_rref' (default, Python) or 'efmtool_rref' (Java legacy) propagate_gpr: If True, AND-combine GPR rules of merged reactions (with sympy simplification). Empty GPRs are skipped. Default False. suppressed_reactions: Set of reaction IDs to exclude from compression (Java backend only). Used to protect reactions referenced in strain design constraints from being deleted by the Java compressor's CoupledContradicting logic. Ignored for the Python backend (which handles contradicting groups correctly via bounds intersection). Returns: dict: Mapping {compressed_id: {orig_id: factor, ...}} """ # Save GPR AST bodies before either backend clears them if propagate_gpr: saved_gpr_bodies = {r.id: r.gpr.body for r in model.reactions} if compression_backend == 'efmtool_rref': from .efmtool_cmp_interface import compress_model_java reaction_map = compress_model_java(model, suppressed_reactions=suppressed_reactions) # Java backend handles contradicting groups internally (CoupledContradicting). # Clean up any remaining zero-flux reactions that the Java compressor created. zero_flux = {r for r in model.reactions if r.lower_bound == 0 and r.upper_bound == 0} for r in zero_flux: reaction_map.pop(r.id, None) if zero_flux: model.remove_reactions(list(zero_flux), remove_orphans=True) else: # Clear gene rules to match Java behavior for r in model.reactions: r.gene_reaction_rule = '' result = compress_cobra_model(model, methods=CompressionMethod.standard(), in_place=True) reaction_map = result.reaction_map # Python compressor handles contradicting groups internally via bounds # intersection in _handle_coupled_combine (removes zero-flux groups and # re-iterates to find new couplings). # Propagate GPR rules: AND-combine contributing reactions' GPR ASTs if propagate_gpr: for cmp_id, orig_map in reaction_map.items(): try: rxn = model.reactions.get_by_id(cmp_id) except KeyError: continue gpr_bodies = [saved_gpr_bodies.get(orig_id) for orig_id in orig_map] rxn.gene_reaction_rule = _combine_gpr_and(gpr_bodies) return reaction_map
# Backward-compatibility alias (old name referenced efmtool, but the function # is backend-agnostic — the new name compress_model_coupled is preferred). compress_model_efmtool = compress_model_coupled
[docs] def compress_model_parallel(model, protected_rxns=set(), propagate_gpr=False): """Compress by lumping parallel reactions. Args: model: COBRA model to compress in-place protected_rxns: Reactions exempt from parallel compression propagate_gpr: If True, OR-combine GPR rules of lumped reactions (with sympy simplification). Default False. Returns: dict: Mapping {compressed_id: {orig_id: factor, ...}} """ old_num_reac = len(model.reactions) old_reac_ids = [r.id for r in model.reactions] if propagate_gpr: old_gpr_bodies = {r.id: r.gpr.body for r in model.reactions} stoichmat_T = create_stoichiometric_matrix(model, 'lil').transpose() factor = [d[0] if d else 1.0 for d in stoichmat_T.data] A = sparse.diags(factor) @ stoichmat_T lb = [float(r.lower_bound) for r in model.reactions] ub = [float(r.upper_bound) for r in model.reactions] fwd = sparse.lil_matrix([1. if (isinf(u) and f > 0 or isinf(l) and f < 0) else 0. for f, l, u in zip(factor, lb, ub)]).transpose() rev = sparse.lil_matrix([1. if (isinf(l) and f > 0 or isinf(u) and f < 0) else 0. for f, l, u in zip(factor, lb, ub)]).transpose() inh = sparse.lil_matrix([ i + 1 if not ((isinf(ub[i]) or ub[i] == 0) and (isinf(lb[i]) or lb[i] == 0)) else 0 for i in range(len(model.reactions)) ]).transpose() A = sparse.hstack((A, fwd, rev, inh), 'csr') # Find parallel reactions via hash comparison subset_list = [] prev_found = set() protected = [r.id in protected_rxns for r in model.reactions] hashes = [hash((tuple(A[i].indices), tuple(A[i].data))) for i in range(A.shape[0])] for i in range(A.shape[0]): if i in prev_found: continue if protected[i]: subset_list.append([i]) continue subset_i = [i] for j in range(i + 1, A.shape[0]): if (not protected[j] and j not in prev_found and hashes[i] == hashes[j] and np.array_equal(A[i].indices, A[j].indices) and np.array_equal(A[i].data, A[j].data)): subset_i.append(j) prev_found.add(j) subset_list.append(subset_i) # Lump parallel reactions del_rxns = [False] * len(model.reactions) obj_dict = getattr(model, '_suppressed_obj', None) for rxn_idx in subset_list: for i in range(1, len(rxn_idx)): main_rxn = model.reactions[rxn_idx[0]] other_rxn = model.reactions[rxn_idx[i]] if len(main_rxn.id) + len(other_rxn.id) < 220 and not main_rxn.id.endswith('...'): main_rxn.id += '*' + other_rxn.id elif not main_rxn.id.endswith('...'): main_rxn.id += '...' del_rxns[rxn_idx[i]] = True # Update stored objective (parallel factor = 1.0, sum contributions) if len(rxn_idx) > 1 and obj_dict is not None: merged_obj = sum(obj_dict.pop(old_reac_ids[j], 0.0) for j in rxn_idx) if merged_obj != 0: obj_dict[model.reactions[rxn_idx[0]].id] = merged_obj # Pre-compute combined GPR for each group (OR logic) before deletions. # Save (surviving_rxn_object, combined_gpr_string) pairs. if propagate_gpr: group_gpr = [] for rxn_idx_group in subset_list: main_rxn = model.reactions[rxn_idx_group[0]] gpr_bodies = [old_gpr_bodies.get(old_reac_ids[j]) for j in rxn_idx_group] group_gpr.append((main_rxn, _combine_gpr_or(gpr_bodies))) remove_list = [model.reactions[i] for i in np.where(del_rxns)[0]] if remove_list: model.remove_reactions(remove_list, remove_orphans=True) # Set combined GPR rules on surviving reactions if propagate_gpr: for rxn, combined_gpr in group_gpr: rxn.gene_reaction_rule = combined_gpr # Build compression map rational_map = {} subT = np.zeros((old_num_reac, len(model.reactions))) for i in range(subT.shape[1]): for j in subset_list[i]: subT[j, i] = 1 rational_map[model.reactions[i].id] = {old_reac_ids[j]: Rational(1) for j in subset_list[i]} return rational_map
# ============================================================================= # Exports # ============================================================================= __all__ = [ # Rational matrix and utilities 'RationalMatrix', 'float_to_rational', 'detect_max_precision', 'nullspace', 'basic_columns', 'basic_columns_from_numpy', # Core compression 'compress_cobra_model', 'CompressionResult', 'CompressionRecord', 'CompressionStatistics', 'CompressionMethod', 'CompressionConverter', 'StoichMatrixCompressor', # High-level API 'compress_model', 'compress_model_coupled', 'compress_model_efmtool', # backward-compat alias 'compress_model_parallel', # GPR propagation helpers '_gpr_ast_to_sympy', '_sympy_to_gpr_string', '_combine_gpr_and', '_combine_gpr_or', # Preprocessing 'remove_blocked_reactions', 'remove_ext_mets', 'remove_conservation_relations', 'remove_dummy_bounds', 'stoichmat_coeff2rational', 'stoichmat_coeff2float', ]