Source code for straindesign.cplex_interface

#!/usr/bin/env python3
#
# Copyright 2022 Max Planck Insitute Magdeburg
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
#
"""CPLEX solver interface for LP and MILP"""

from random import randint
from scipy import sparse
from numpy import nan, inf, isinf
from cplex import Cplex, infinity, _const
from cplex.exceptions import CplexError
from typing import Tuple, List
import logging
import io
from psutil import virtual_memory
from straindesign.names import *


[docs]class Cplex_MILP_LP(Cplex): """CPLEX interface for MILP and LP This class is a wrapper for the CPLEX-Python API to offer bindings and namings for functions for the construction and manipulation of MILPs and LPs in an vector-matrix-based manner that are consistent with those of the other solver interfaces in the StrainDesign package. The purpose is to unify the instructions for operating with MILPs and LPs throughout StrainDesign. The CPLEX interface provides support for indicator constraints as well as for the populate function. Accepts a (mixed integer) linear problem in the form: minimize(c), subject to: A_ineq * x <= b_ineq, A_eq * x = b_eq, lb <= x <= ub, forall(i) type(x_i) = vtype(i) (continous, binary, integer), indicator constraints: x(j) = [0|1] -> a_indic * x [<=|=|>=] b_indic Please ensure that the number of variables and (in)equalities is consistent Example: cplex = Cplex_MILP_LP(c, A_ineq, b_ineq, A_eq, b_eq, lb, ub, vtype, indic_constr) Args: c (list of float): (Default: None) The objective vector (Objective sense: minimization). A_ineq (sparse.csr_matrix): (Default: None) A coefficient matrix of the static inequalities. b_ineq (list of float): (Default: None) The right hand side of the static inequalities. A_eq (sparse.csr_matrix): (Default: None) A coefficient matrix of the static equalities. b_eq (list of float): (Default: None) The right hand side of the static equalities. lb (list of float): (Default: None) The lower variable bounds. ub (list of float): (Default: None) The upper variable bounds. vtype (str): (Default: None) A character string that specifies the type of each variable: 'c'ontinous, 'b'inary or 'i'nteger indic_constr (IndicatorConstraints): (Default: None) A set of indicator constraints stored in an object of IndicatorConstraints (see reference manual or docstring). Returns: (Cplex_MILP_LP): A CPLEX MILP/LP interface class. """ def __init__(self, c=None, A_ineq=None, b_ineq=None, A_eq=None, b_eq=None, lb=None, ub=None, vtype=None, indic_constr=None): super().__init__() self.objective.set_sense(self.objective.sense.minimize) try: numvars = A_ineq.shape[1] except: numvars = A_eq.shape[1] # replace numpy inf with cplex infinity for i, v in enumerate(b_ineq): if isinf(v): b_ineq[i] = infinity # concatenate right hand sides b = b_ineq + b_eq # prepare coefficient matrix if isinstance(A_eq, list): if not A_eq: A_eq = sparse.csr_matrix((0, numvars)) if isinstance(A_ineq, list): if not A_ineq: A_ineq = sparse.csr_matrix((0, numvars)) A = sparse.vstack((A_ineq, A_eq), format='coo') # concatenate coefficient matrices sense = len(b_ineq) * 'L' + len(b_eq) * 'E' # construct CPLEX problem. Add variables and linear constraints self.variables.add(obj=c, lb=lb, ub=ub, types=vtype) self.linear_constraints.add(rhs=b, senses=sense) if A.nnz: self.linear_constraints.set_coefficients(zip(A.row.tolist(), A.col.tolist(), A.data.tolist())) # add indicator constraints if not indic_constr == None: # cast variables and translate coefficient matrix A to right input format for CPLEX A = [[[int(i) for i in list(a.indices)], [float(i) for i in list(a.data)]] for a in sparse.csr_matrix(indic_constr.A)] b = [float(i) for i in indic_constr.b] sense = [str(i) for i in indic_constr.sense] indvar = [int(i) for i in indic_constr.binv] complem = [1 - int(i) for i in indic_constr.indicval] # call CPLEX function to add indicators self.indicator_constraints.add_batch(lin_expr=A, sense=sense, rhs=b, indvar=indvar, complemented=complem) # set parameters self.set_log_stream(io.StringIO()) # don't show output stream self.set_error_stream(io.StringIO()) self.set_warning_stream(io.StringIO()) self.set_results_stream(io.StringIO()) self.parameters.simplex.tolerances.optimality.set(1e-9) self.parameters.simplex.tolerances.feasibility.set(1e-9) if 'B' in vtype or 'I' in vtype: # set usable working memory to 3/4 of the total available memory self.parameters.workmem.set(round(virtual_memory().total / 1024 / 1024 * 0.75)) # self.parameters.threads.set(cpu_count()) # yield only optimal solutions in pool seed = randint(0, _const.CPX_BIGINT) # logging.info(' MILP Seed: '+str(seed)) self.parameters.randomseed = seed self.parameters.mip.pool.absgap.set(0.0) self.parameters.mip.pool.relgap.set(0.0) self.parameters.mip.pool.intensity.set(4) # no integrality tolerance self.parameters.mip.tolerances.integrality.set(0.0)
[docs] def solve(self) -> Tuple[List, float, float]: """Solve the MILP or LP Example: sol_x, optim, status = cplex.solve() Returns: (Tuple[List, float, float]) solution_vector, optimal_value, optimization_status """ try: super().solve() # call parent solve function (that was overwritten in this class) status = self.solution.get_status() if status in [1, 101, 102, 115, 128, 129, 130]: # solution integer optimal min_cx = self.solution.get_objective_value() status = OPTIMAL elif status == 108: # timeout without solution x = [nan] * self.variables.get_num() min_cx = nan status = TIME_LIMIT return x, min_cx, status elif status in [3, 103]: # infeasible x = [nan] * self.variables.get_num() min_cx = nan status = INFEASIBLE return x, min_cx, status elif status in [11, 107]: # timeout with solution min_cx = self.solution.get_objective_value() status = TIME_LIMIT_W_SOL elif status in [2, 4, 118, 119]: # solution unbounded x = [nan] * self.variables.get_num() min_cx = -inf status = UNBOUNDED return x, min_cx, status else: logging.exception(status) logging.exception(self.solution.get_status_string()) raise Exception("Case not yet handeld") x = self.solution.get_values() return x, min_cx, status except CplexError as exc: if not exc.args[2] == 1217: logging.error(exc) min_cx = nan x = [nan] * self.variables.get_num() return x, min_cx, ERROR
[docs] def slim_solve(self) -> float: """Solve the MILP or LP, but return only the optimal value Example: optim = cplex.slim_solve() Returns: (float) Optimum value of the objective function. """ try: super().solve() # call parent solve function (that was overwritten in this class) status = self.solution.get_status() if status in [1, 101, 102, 107, 115, 128, 129, 130]: # solution integer optimal (tolerance) opt = self.solution.get_objective_value() elif status in [118, 119]: # solution unbounded (or inf or unbdd) opt = -inf elif status in [103, 108]: # infeasible opt = nan else: logging.exception(status) logging.exception(self.solution.get_status_string()) raise Exception("Case not yet handeld") return opt except CplexError as exc: return nan
[docs] def populate(self, n) -> Tuple[List, float, float]: """Generate a solution pool for MILPs Example: sols_x, optim, status = cplex.populate() Returns: (Tuple[List of lists, float, float]) solution_vectors, optimal_value, optimization_status """ try: if isinf(n): self.parameters.mip.pool.capacity.set(self.parameters.mip.pool.capacity.max()) else: self.parameters.mip.pool.capacity.set(n) self.populate_solution_pool() # call parent solve function (that was overwritten in this class) status = self.solution.get_status() if status in [101, 102, 115, 128, 129, 130]: # solution integer optimal min_cx = self.solution.get_objective_value() status = OPTIMAL elif status == 108: # timeout without solution x = [] min_cx = nan status = TIME_LIMIT return x, min_cx, status elif status == 103: # infeasible x = [] min_cx = nan status = INFEASIBLE return x, min_cx, status elif status == 107: # timeout with solution min_cx = self.solution.get_objective_value() status = TIME_LIMIT_W_SOL elif status in [118, 119]: # solution unbounded min_cx = -inf status = UNBOUNDED else: logging.exception(status) logging.exception(self.solution.get_status_string()) raise Exception("Case not yet handeld") x = [self.solution.pool.get_values(i) for i in range(self.solution.pool.get_num())] return x, min_cx, status except CplexError as exc: if not exc.args[2] == 1217: logging.error(exc) min_cx = nan x = [] return x, min_cx, ERROR
[docs] def set_objective(self, c): """Set the objective function with a vector""" self.objective.set_linear([[i, c[i]] for i in range(len(c))])
[docs] def set_objective_idx(self, C): """Set the objective function with index-value pairs e.g.: C=[[1, 1.0], [4,-0.2]]""" self.objective.set_linear(C)
[docs] def set_ub(self, ub): """Set the upper bounds to a given vector""" self.variables.set_upper_bounds(ub)
[docs] def set_time_limit(self, t): """Set the computation time limit (in seconds)""" if isinf(t): self.parameters.timelimit.set(self.parameters.timelimit.max()) else: self.parameters.timelimit.set(t)
[docs] def add_ineq_constraints(self, A_ineq, b_ineq): """Add inequality constraints to the model Additional inequality constraints have the form A_ineq * x <= b_ineq. The number of columns in A_ineq must match with the number of variables x in the problem. Args: A_ineq (sparse.csr_matrix): The coefficient matrix b_ineq (list of float): The right hand side vector """ numconst = self.linear_constraints.get_num() numnewconst = A_ineq.shape[0] newconst_idx = [numconst + i for i in range(numnewconst)] for i, v in enumerate(b_ineq): if isinf(v): b_ineq[i] = infinity self.linear_constraints.add(rhs=b_ineq, senses='L' * numnewconst) # retrieve row and column indices from sparse matrix and convert them to int A_ineq = A_ineq.tocoo() rows_A = [int(a) + numconst for a in A_ineq.row] cols_A = [int(a) for a in A_ineq.col] # convert matrix coefficients to float data_A = [float(a) for a in A_ineq.data] self.linear_constraints.set_coefficients(zip(rows_A, cols_A, data_A))
[docs] def add_eq_constraints(self, A_eq, b_eq): """Add equality constraints to the model Additional equality constraints have the form A_eq * x = b_eq. The number of columns in A_eq must match with the number of variables x in the problem. Args: A_eq (sparse.csr_matrix): The coefficient matrix b_eq (list of float): The right hand side vector """ numconst = self.linear_constraints.get_num() numnewconst = A_eq.shape[0] self.linear_constraints.add(rhs=b_eq, senses='E' * numnewconst) # retrieve row and column indices from sparse matrix and convert them to int A_eq = A_eq.tocoo() rows_A = [int(a) + numconst for a in A_eq.row] cols_A = [int(a) for a in A_eq.col] # convert matrix coefficients to float data_A = [float(a) for a in A_eq.data] self.linear_constraints.set_coefficients(zip(rows_A, cols_A, data_A))
[docs] def set_ineq_constraint(self, idx, a_ineq, b_ineq): """Replace a specific inequality constraint Replace the constraint with the index idx with the constraint a_ineq*x ~ b_ineq Args: idx (int): Index of the constraint a_ineq (list of float): The coefficient vector b_ineq (float): The right hand side value """ if isinf(b_ineq): b_ineq = infinity self.linear_constraints.set_coefficients(zip([idx] * len(a_ineq), range(len(a_ineq)), a_ineq)) self.linear_constraints.set_rhs([[idx, b_ineq]])