Source code for straindesign.efmtool_cmp_interface

#!/usr/bin/env python3
#
# Copyright 2022 Max Planck Insitute Magdeburg
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""EFMtool compression interface for straindesign.

This module provides compression utilities for metabolic networks.
By default, the pure Python 'sparse_rref' compression backend is used.
The Java EFMTool backend is available via compression_backend='efmtool_rref' (requires jpype1).

For the documentation of the compression API provided by StrainDesign,
refer to straindesign.compression.compress_model.
"""

import logging
import numpy as np
import os
import sys

# =============================================================================
# Pure Python Implementation (Default)
# =============================================================================


[docs] def basic_columns_rat(mx, tolerance=0): """Find basic columns using exact rational arithmetic (FLINT or sympy).""" from .compression import basic_columns_from_numpy return basic_columns_from_numpy(mx)
# ============================================================================= # Lazy Java Initialization # ============================================================================= _JAVA_INITIALIZED = False _JPYPE_AVAILABLE = None # Java classes (populated by _init_java) DefaultBigIntegerRationalMatrix = None Gauss = None CompressionMethod = None StoichMatrixCompressor = None BigFraction = None BigInteger = None subset_compression = None jTrue = None jSystem = None def _check_jpype_available(): """Check if jpype is available without importing it.""" global _JPYPE_AVAILABLE if _JPYPE_AVAILABLE is None: import importlib.util _JPYPE_AVAILABLE = importlib.util.find_spec("jpype") is not None return _JPYPE_AVAILABLE def _check_sympy_available(): """Check if sympy is available without importing it.""" import importlib.util return importlib.util.find_spec("sympy") is not None def _search_for_jvm(): """Search for JVM in common locations.""" common_java_paths = [ "C:\\Program Files\\Java", # Windows "/usr/lib/jvm", # Linux "/Library/Java/JavaVirtualMachines", # macOS os.path.dirname(sys.executable) ] for base in common_java_paths: if os.path.exists(base): for root, _dirs, files in os.walk(base): if any(lib in files for lib in ["jvm.dll", "libjvm.so", "libjvm.dylib"]): return root return None def _init_java(): """ Initialize JVM and Java classes. This function is called lazily only when compression_backend='efmtool_rref'. Raises ImportError if jpype is not installed. """ global _JAVA_INITIALIZED global DefaultBigIntegerRationalMatrix, Gauss, CompressionMethod global StoichMatrixCompressor, BigFraction, BigInteger global subset_compression, jTrue, jSystem if _JAVA_INITIALIZED: return if not _check_jpype_available(): raise ImportError("jpype1 is not installed. Legacy Java compression requires jpype1.\n" "Install with: pip install jpype1\n" "Or use the default Python compression (compression_backend='sparse_rref').") if not _check_sympy_available(): raise ImportError("sympy is not installed. Legacy Java compression requires sympy.\n" "Install with: pip install sympy\n" "Or use the default Python compression (compression_backend='sparse_rref').") import jpype import io from contextlib import redirect_stdout, redirect_stderr # Add efmtool.jar to classpath efmtool_jar = os.path.join(os.path.dirname(__file__), 'efmtool.jar') if not os.path.exists(efmtool_jar): raise FileNotFoundError(f"efmtool.jar not found at {efmtool_jar}. " "Legacy Java compression requires the efmtool.jar file.") jpype.addClassPath(efmtool_jar) if not jpype.isJVMStarted(): # Look up JVM at different locations if not os.environ.get("JAVA_HOME"): candidate = _search_for_jvm() if candidate: os.environ["JAVA_HOME"] = candidate try: with redirect_stdout(io.StringIO()), redirect_stderr(io.StringIO()): jpype.startJVM() except Exception as e: extra_info = "" if not os.environ.get("JAVA_HOME"): extra_info = " JAVA_HOME is not defined." raise RuntimeError( "Failed to start JVM. Please ensure that Java (OpenJDK) is installed." + extra_info + " If using conda, install openjdk from conda-forge and set JAVA_HOME to the OpenJDK installation path.") from e import jpype.imports # Import Java classes import ch.javasoft.smx.impl.DefaultBigIntegerRationalMatrix as _DefaultBigIntegerRationalMatrix import ch.javasoft.smx.ops.Gauss as _Gauss import ch.javasoft.metabolic.compress.CompressionMethod as _CompressionMethod import ch.javasoft.metabolic.compress.StoichMatrixCompressor as _StoichMatrixCompressor import ch.javasoft.math.BigFraction as _BigFraction import java.math.BigInteger as _BigInteger # Assign to module-level globals DefaultBigIntegerRationalMatrix = _DefaultBigIntegerRationalMatrix Gauss = _Gauss CompressionMethod = _CompressionMethod StoichMatrixCompressor = _StoichMatrixCompressor BigFraction = _BigFraction BigInteger = _BigInteger subset_compression = CompressionMethod[:]( [CompressionMethod.CoupledZero, CompressionMethod.CoupledCombine, CompressionMethod.CoupledContradicting]) jTrue = jpype.JBoolean(True) jSystem = jpype.JClass("java.lang.System") _JAVA_INITIALIZED = True # ============================================================================= # Java Conversion Utilities # =============================================================================
[docs] def numpy_mat2jpypeArrayOfArrays(npmat): """Convert numpy matrix to jpype array of arrays (requires Java init).""" _init_java() import jpype rows = npmat.shape[0] cols = npmat.shape[1] jmat = jpype.JDouble[rows, cols] for r in range(rows): for c in range(cols): jmat[r][c] = npmat[r, c] return jmat
[docs] def jpypeArrayOfArrays2numpy_mat(jmat): """Convert jpype array of arrays to numpy matrix.""" rows = len(jmat) cols = len(jmat[0]) npmat = np.zeros((rows, cols)) for r in range(rows): for c in range(cols): npmat[r, c] = jmat[r][c] return npmat
[docs] def sympyRat2jBigIntegerPair(val): """Convert Fraction or sympy Rational to Java BigInteger pair (requires Java init).""" _init_java() # Support both fractions.Fraction (.numerator/.denominator) and sympy.Rational (.p/.q) numer = val.numerator if hasattr(val, 'numerator') else val.p if numer.bit_length() <= 63: numer = BigInteger.valueOf(numer) else: numer = BigInteger(str(numer)) denom = val.denominator if hasattr(val, 'denominator') else val.q if denom.bit_length() <= 63: denom = BigInteger.valueOf(denom) else: denom = BigInteger(str(denom)) return (numer, denom)
[docs] def jBigFraction2sympyRat(val): """Convert Java BigFraction to sympy Rational (requires Java init).""" return jBigIntegerPair2sympyRat(val.getNumerator(), val.getDenominator())
[docs] def jBigIntegerPair2sympyRat(numer, denom): """Convert Java BigInteger pair to sympy Rational (requires sympy).""" import sympy if numer.bitLength() <= 63: numer = numer.longValue() else: numer = str(numer.toString()) if denom.bitLength() <= 63: denom = denom.longValue() else: denom = str(denom.toString()) return sympy.Rational(numer, denom)
# ============================================================================= # Legacy Java Compression Functions # =============================================================================
[docs] def basic_columns_rat_java(mx, tolerance=0): """ Find basic columns using Java Gaussian elimination. Legacy implementation using jpype and Java efmtool. Requires jpype1 and sympy to be installed. Args: mx: Matrix (numpy array or Java matrix) tolerance: Tolerance (unused in exact arithmetic) Returns: Array of indices of basic columns """ _init_java() import jpype if isinstance(mx, np.ndarray): mx = DefaultBigIntegerRationalMatrix(numpy_mat2jpypeArrayOfArrays(mx), jTrue, jTrue) row_map = jpype.JInt[mx.getRowCount()] col_map = jpype.JInt[:](range(mx.getColumnCount())) rank = Gauss.getRationalInstance().rowEchelon(mx, False, row_map, col_map) return col_map[0:rank]
[docs] def compress_model_java(model): """Legacy Java compression using jpype (requires jpype and sympy). Args: model: COBRA model (will be modified in place) Returns: dict: Reaction map from compressed to original reactions with scaling factors """ import jpype from .networktools import stoichmat_coeff2rational # Initialize Java if not already done _init_java() # Convert to rational coefficients for Java stoichmat_coeff2rational(model) for r in model.reactions: r.gene_reaction_rule = '' num_met = len(model.metabolites) num_reac = len(model.reactions) old_reac_ids = [r.id for r in model.reactions] stoich_mat = DefaultBigIntegerRationalMatrix(num_met, num_reac) reversible = jpype.JBoolean[:]([r.reversibility for r in model.reactions]) flipped = [] for i in range(num_reac): if model.reactions[i].upper_bound <= 0: model.reactions[i] *= -1 flipped.append(i) logging.debug("Flipped " + model.reactions[i].id) for k, v in model.reactions[i]._metabolites.items(): n, d = sympyRat2jBigIntegerPair(v) stoich_mat.setValueAt(model.metabolites.index(k.id), i, BigFraction(n, d)) # compress smc = StoichMatrixCompressor(subset_compression) reacNames = jpype.JString[:](model.reactions.list_attr('id')) comprec = smc.compress(stoich_mat, reversible, jpype.JString[num_met], reacNames, None) subset_matrix = jpypeArrayOfArrays2numpy_mat(comprec.post.getDoubleRows()) del_rxns = np.logical_not(np.any(subset_matrix, axis=1)) for j in range(subset_matrix.shape[1]): rxn_idx = subset_matrix[:, j].nonzero()[0] r0 = rxn_idx[0] model.reactions[r0].subset_rxns = [] model.reactions[r0].subset_stoich = [] # Scale objective coefficient by POST factors combined_obj = 0.0 for r in rxn_idx: factor = jBigFraction2sympyRat(comprec.post.getBigFractionValueAt(r, j)) # Accumulate objective contribution before scaling combined_obj += model.reactions[r].objective_coefficient * float(factor) model.reactions[r] *= factor if model.reactions[r].lower_bound not in (0, -float('inf')): model.reactions[r].lower_bound /= abs(subset_matrix[r, j]) if model.reactions[r].upper_bound not in (0, float('inf')): model.reactions[r].upper_bound /= abs(subset_matrix[r, j]) model.reactions[r0].subset_rxns.append(r) if r in flipped: model.reactions[r0].subset_stoich.append(-factor) else: model.reactions[r0].subset_stoich.append(factor) model.reactions[r0].objective_coefficient = combined_obj for r in rxn_idx[1:]: if len(model.reactions[r0].id) + len(model.reactions[r].id) < 220 and model.reactions[r0].id[-3:] != '...': model.reactions[r0].id += '*' + model.reactions[r].id elif not model.reactions[r0].id[-3:] == '...': model.reactions[r0].id += '...' model.reactions[r0] += model.reactions[r] if model.reactions[r].lower_bound > model.reactions[r0].lower_bound: model.reactions[r0].lower_bound = model.reactions[r].lower_bound if model.reactions[r].upper_bound < model.reactions[r0].upper_bound: model.reactions[r0].upper_bound = model.reactions[r].upper_bound del_rxns[r] = True del_rxns = np.where(del_rxns)[0] for i in range(len(del_rxns) - 1, -1, -1): model.reactions[del_rxns[i]].remove_from_model(remove_orphans=True) subT = np.zeros((num_reac, len(model.reactions))) rational_map = {} for j in range(subT.shape[1]): subT[model.reactions[j].subset_rxns, j] = [float(v) for v in model.reactions[j].subset_stoich] rational_map.update( {model.reactions[j].id: { old_reac_ids[i]: v for i, v in zip(model.reactions[j].subset_rxns, model.reactions[j].subset_stoich) }}) return rational_map
# ============================================================================= # Exports # ============================================================================= __all__ = [ # Pure Python 'basic_columns_rat', # Java initialization '_init_java', '_check_jpype_available', # Java compression 'basic_columns_rat_java', 'compress_model_java', # Java conversion utilities 'numpy_mat2jpypeArrayOfArrays', 'jpypeArrayOfArrays2numpy_mat', 'sympyRat2jBigIntegerPair', 'jBigFraction2sympyRat', 'jBigIntegerPair2sympyRat', ]