Source code for electric_emission_cost.utils

import re
import pytz
import datetime
import numpy as np
import cvxpy as cp
import pyomo.environ as pyo
from pyomo.core.expr.numeric_expr import (
    SumExpression,
    LinearExpression,
    MonomialTermExpression,
)
from pyomo.core.base.var import ScalarVar
from pyomo.core.base.expression import IndexedExpression

# Dictionary mapping region types to timezone strings
TIMEZONE_DICT = {
    "iso_rto_code": {
        "CAISO": "America/Los_Angeles",  # Pacific Time (PT)
        "ERCOT": "America/Chicago",  # Central Time (CT)
        "ISONE": "America/New_York",  # Eastern Time (ET)
        "MISO": "America/Chicago",  # Central Time (CT)
        "NYISO": "America/New_York",  # Eastern Time (ET)
        "OTHER": "America/New_York",  # Default to Eastern Time (ET)
        "PJM": "America/New_York",  # Eastern Time (ET)
        "SPP": "America/Chicago",  # Central Time (CT)
    },
    "nerc_region": {
        "MRO": "America/Chicago",  # Central Time (CT)
        "NPCC": "America/New_York",  # Eastern Time (ET)
        "RFC": "America/New_York",  # Eastern Time (ET)
        "SERC": "America/New_York",  # Eastern Time (ET)
        "SPP": "America/Chicago",  # Central Time (CT)
        "TRE": "America/Chicago",  # Central Time (CT)
        "WECC": "America/Denver",  # Mountain Time (MT) / Pacific Time (PT)
    },
    "state": {
        "AK": "America/Anchorage",  # Alaska Time (AKT)
        "AL": "America/Chicago",  # Central Time (CT)
        "AR": "America/Chicago",  # Central Time (CT)
        "AZ": "America/Phoenix",  # Mountain Standard Time (MST, no DST)
        "CA": "America/Los_Angeles",  # Pacific Time (PT)
        "CO": "America/Denver",  # Mountain Time (MT)
        "CT": "America/New_York",  # Eastern Time (ET)
        "DE": "America/New_York",  # Eastern Time (ET)
        "FL": "America/New_York",  # Eastern Time (ET)
        "GA": "America/New_York",  # Eastern Time (ET)
        "IA": "America/Chicago",  # Central Time (CT)
        "ID": "America/Boise",  # Mountain Time (MT)
        "IL": "America/Chicago",  # Central Time (CT)
        "IN": "America/Indiana/Indianapolis",  # Mostly Eastern Time (ET)
        "KS": "America/Chicago",  # Central Time (CT)
        "KY": "America/New_York",  # Eastern Time (ET)
        "LA": "America/Chicago",  # Central Time (CT)
        "MA": "America/New_York",  # Eastern Time (ET)
        "MD": "America/New_York",  # Eastern Time (ET)
        "ME": "America/New_York",  # Eastern Time (ET)
        "MI": "America/Detroit",  # Mostly Eastern Time (ET)
        "MN": "America/Chicago",  # Central Time (CT)
        "MO": "America/Chicago",  # Central Time (CT)
        "MS": "America/Chicago",  # Central Time (CT)
        "MT": "America/Denver",  # Mountain Time (MT)
        "NC": "America/New_York",  # Eastern Time (ET)
        "ND": "America/Denver",  # Central Time (CT)
        "NE": "America/Chicago",  # Mountain Time (MT)
        "NH": "America/New_York",  # Eastern Time (ET)
        "NJ": "America/New_York",  # Eastern Time (ET)
        "NM": "America/Denver",  # Mountain Time (MT)
        "NV": "America/Denver",  # Mountain Time (MT)
        "NY": "America/New_York",  # Eastern Time (ET)
        "OH": "America/New_York",  # Eastern Time (ET)
        "OK": "America/Chicago",  # Central Time (CT)
        "OR": "America/Los_Angeles",  # Pacific Time (PT)
        "PA": "America/New_York",  # Eastern Time (ET)
        "RI": "America/New_York",  # Eastern Time (ET)
        "SC": "America/New_York",  # Eastern Time (ET)
        "SD": "America/Denver",  # Mountain Time (MT)
        "TN": "America/Chicago",  # Central Time (CT)
        "TX": "America/Chicago",  # Central Time (CT)
        "UT": "America/Denver",  # Mountain Time (MT)
        "VA": "America/New_York",  # Eastern Time (ET)
        "VT": "America/New_York",  # Eastern Time (ET)
        "WA": "America/Los_Angeles",  # Pacific Time (PT)
        "WI": "America/Chicago",  # Central Time (CT)
        "WV": "America/New_York",  # Eastern Time (ET)
        "WY": "America/Denver",  # Mountain Time (MT)
    },
    "egrid_subregions": {
        "AKGD": "America/Anchorage",  # Alaska Time (AKT)
        "AZNM": "America/Phoenix",  # Mountain Standard Time (MST, no DST)
        "CAMX": "America/Los_Angeles",  # Pacific Time (PT)
        "ERCT": "America/Chicago",  # Central Time (CT)
        "FRCC": "America/New_York",  # Eastern Time (ET)
        "MROE": "America/Chicago",  # Central Time (CT)
        "MROW": "America/Chicago",  # Central Time (CT)
        "NEWE": "America/New_York",  # Eastern Time (ET)
        "NWPP": "America/Los_Angeles",  # Pacific Time (PT)
        "NYCW": "America/New_York",  # Eastern Time (ET)
        "NYLI": "America/New_York",  # Eastern Time (ET)
        "NYUP": "America/New_York",  # Eastern Time (ET)
        "RFCE": "America/New_York",  # Eastern Time (ET)
        "RFCM": "America/Chicago",  # Central Time (CT)
        "RFCW": "America/Chicago",  # Central Time (CT)
        "RMPA": "America/Denver",  # Mountain Time (MT)
        "SPNO": "America/Chicago",  # Central Time (CT)
        "SPSO": "America/Chicago",  # Central Time (CT)
        "SRMV": "America/New_York",  # Eastern Time (ET)
        "SRMW": "America/Chicago",  # Central Time (CT)
        "SRSO": "America/New_York",  # Eastern Time (ET)
        "SRTV": "America/New_York",  # Eastern Time (ET)
        "SRVC": "America/New_York",  # Eastern Time (ET)
    },
}


[docs]def idxparam_value(idx_param): """Returns the parameter value at the given index. Parameters ---------- idx_param : pyomo.environ.Param or pyomo.environ.Var The Pyomo parameter or variable to be converted Returns ------- numpy.ndarray Indexed variable or parameter as a numpy array """ return np.array([idx_param[i].value for i in range(len(idx_param))])
[docs]def max(expression, model=None, varstr=None): """Elementwise maximum of an expression or array Parameters ---------- expression : [ numpy.Array, cvxpy.Expression, pyomo.core.expr.numeric_expr.NumericExpression, pyomo.core.expr.numeric_expr.NumericNDArray, pyomo.core.expr.numeric_expr.NumericNDArray, pyomo.environ.Param, pyomo.environ.Var ] The expression to find the maximum of model : pyomo.environ.Model The model object associated with the problem. Only used in the case of Pyomo, so `None` by default. varstr : str Name of the variable to be created if using a Pyomo `model` Raises ------ TypeError When `expression` is not of type `numpy.Array`, `cvxpy.Expression`, `pyomo.core.expr.numeric_expr.NumericExpression`, `pyomo.core.expr.numeric_expr.NumericNDArray`, `pyomo.environ.Param`, or `pyomo.environ.Var` Returns ------- ([numpy.Array, cvxpy.Expression, pyomo.environ.Var], pyomo.environ.Model) Expression representing max of `expression` """ if isinstance( expression, (LinearExpression, SumExpression, MonomialTermExpression, ScalarVar) ): model.add_component(varstr, pyo.Var()) var = model.find_component(varstr) def const_rule(model): return var >= expression constraint = pyo.Constraint(rule=const_rule) model.add_component(varstr + "_constraint", constraint) return (var, model) elif isinstance(expression, (IndexedExpression, pyo.Param, pyo.Var)): model.add_component(varstr, pyo.Var()) var = model.find_component(varstr) def const_rule(model, t): return var >= expression[t] constraint = pyo.Constraint(model.t, rule=const_rule) model.add_component(varstr + "_constraint", constraint) return (var, model) elif isinstance( expression, (int, float, np.int32, np.int64, np.float32, np.float64, np.ndarray) ): return (np.max(expression), model) elif isinstance(expression, cp.Expression): return (cp.max(expression), None) else: raise TypeError( "Only CVXPY or Pyomo variables and NumPy arrays are currently supported." )
[docs]def sum(expression, axis=0, model=None, varstr=None): """Elementwise maximum of an expression or array Parameters ---------- expression : [ numpy.Array, cvxpy.Expression, pyomo.core.expr.numeric_expr.NumericExpression, pyomo.core.expr.numeric_expr.NumericNDArray, pyomo.environ.Param, pyomo.environ.Var ] Expression representing a matrix to sum axis: int Optional axis along which to compute sum. Default is 0 model : pyomo.environ.Model The model object associated with the problem. Only used in the case of Pyomo, so `None` by default. varstr : str Name of the variable to be created if using a Pyomo `model` Raises ------ TypeError When `expression` is not of type `numpy.Array`, `cvxpy.Expression`, `pyomo.core.expr.numeric_expr.NumericExpression`, `pyomo.core.expr.numeric_expr.NumericNDArray` `pyomo.environ.Param`, or `pyomo.environ.Var` Returns ------- [numpy.Array, cvxpy.Expression, pyomo.environ.Expression] Expression representing sum of `expression` along `axis` """ if isinstance(expression, (SumExpression, IndexedExpression, pyo.Param, pyo.Var)): model.add_component(varstr, pyo.Var()) var = model.find_component(varstr) def const_rule(model): total = 0 for i in range(len(expression)): total += expression[i] return var == total constraint = pyo.Constraint(rule=const_rule) model.add_component(varstr + "_constraint", constraint) return (var, model) elif isinstance( expression, (int, float, np.int32, np.int64, np.float32, np.float64, np.ndarray) ): return (np.sum(expression, axis=axis), model) elif isinstance(expression, cp.Expression): return (cp.sum(expression, axis=axis), None) else: raise TypeError( "Only CVXPY or Pyomo variables and NumPy arrays are currently supported." )
[docs]def max_pos(expression, model=None, varstr=None): """Returns the maximum positive scalar value of an expression. I.e., max([x, 0]) where x is any element of the expression (if a matrix) Parameters ---------- expression : [ numpy.Array, cvxpy.Expression, pyomo.core.expr.numeric_expr.NumericExpression, pyomo.core.expr.numeric_expr.NumericNDArray, pyomo.environ.Param, pyomo.environ.Var ] Expression representing a matrix, vector, or scalar model : pyomo.environ.Model The model object associated with the problem. Only used in the case of Pyomo, so `None` by default. varstr : str Name of the variable to be created if using a Pyomo `model` Raises ------ TypeError When `expression` is not of type `numpy.Array`, `cvxpy.Expression`, `pyomo.core.expr.numeric_expr.NumericNDArray`, `pyomo.core.expr.numeric_expr.NumericExpression`, `pyomo.environ.Param`, or `pyomo.environ.Var` Returns ------- ( [numpy.float, numpy.int, numpy.Array, cvxpy.Expression, or pyomo.environ.Var], pyomo.environ.Model ) Expression representing maximum positive scalar value of `expression` """ if isinstance( expression, (LinearExpression, SumExpression, MonomialTermExpression, ScalarVar) ): model.add_component(varstr, pyo.Var(initialize=0, bounds=(0, None))) var = model.find_component(varstr) def const_rule(model): return var >= expression constraint = pyo.Constraint(rule=const_rule) model.add_component(varstr + "_constraint", constraint) return (var, model) elif isinstance(expression, (IndexedExpression, pyo.Param, pyo.Var)): model.add_component(varstr, pyo.Var(bounds=(0, None))) var = model.find_component(varstr) def const_rule(model, t): return var >= expression[t] constraint = pyo.Constraint(model.t, rule=const_rule) model.add_component(varstr + "_constraint", constraint) return (var, model) elif isinstance( expression, (int, float, np.int32, np.int64, np.float32, np.float64, np.ndarray) ): return (np.max(expression), model) if np.max(expression) > 0 else (0, model) elif isinstance(expression, cp.Expression): return cp.max(cp.vstack([expression, 0])), None else: raise TypeError( "Only CVXPY or Pyomo variables and NumPy arrays are currently supported." )
[docs]def multiply(expression1, expression2, model=None, varstr=None): """Implements elementwise multiplication operation on two optimization expressions Parameters ---------- expression1 : [ numpy.Array, cvxpy.Expression, pyomo.core.expr.numeric_expr.NumericExpression, pyomo.core.expr.numeric_expr.NumericNDArray, pyomo.environ.Param, pyomo.environ.Var ] LHS of multiply operation expression2 : [ numpy.Array, cvxpy.Expression, pyomo.core.expr.numeric_expr.NumericExpression, pyomo.core.expr.numeric_expr.NumericNDArray, pyomo.environ.Param, pyomo.environ.Var ] RHS of multiply operation model : pyomo.environ.Model The model object associated with the problem. Only used in the case of Pyomo, so `None` by default. varstr : str Name of the variable to be created if using a Pyomo `model` Raises ------ TypeError When `expression` is not of type `numpy.Array`, `cvxpy.Expression`, `pyomo.core.expr.numeric_expr.NumericNDArray`, `pyomo.core.expr.numeric_expr.NumericExpression`, `pyomo.environ.Param`, or `pyomo.environ.Var` Returns ------- [ numpy.Array, cvxpy.Expression, pyomo.core.expr.numeric_expr.NumericNDArray, pyomo.environ.Expression ] result from elementwise multiplication of `expression1` and `expression2` """ if isinstance(expression1, cp.Expression) or isinstance(expression2, cp.Expression): return (cp.multiply(expression1, expression2), None) elif isinstance( expression1, (SumExpression, IndexedExpression, pyo.Param, pyo.Var) ) or isinstance( expression2, (SumExpression, IndexedExpression, pyo.Param, pyo.Var) ): if len(expression1) > 1: # TODO: replace model.t with better way to get dimensions model.add_component(varstr, pyo.Var(model.t)) var = model.find_component(varstr) def const_rule(model, t): return var[t] == expression1[t] * expression2[t] constraint = pyo.Constraint(model.t, rule=const_rule) model.add_component(varstr + "_constraint", constraint) return (var, model) else: return (expression1 * expression2, model) elif isinstance( expression1, (int, float, np.int32, np.int64, np.float32, np.float64, np.ndarray), ) and isinstance( expression2, (int, float, np.int32, np.int64, np.float32, np.float64, np.ndarray), ): return (np.multiply(expression1, expression2), model) else: raise TypeError( "Only CVXPY or Pyomo variables and NumPy arrays are currently supported." )
[docs]def parse_freq(freq): """Parses a time frequency code string, returning its type and its freq_binsize Parameters ---------- freq: str a string of the form [type][freq_binsize], where type corresponds to a numpy.timedelta64 encoding and freq binsize is an integer giving the number of increments of `type` of one binned increment of our time variable (for example '6h' means the data are grouped into increments of 6 hours) Returns ------- tuple tuple of the form (`int`,`str`) giving the binsize and units (freq_type) """ freq_type = re.sub("[0-9]", "", freq) freq_binsize = int(re.sub("[^0-9]", "", freq)) return freq_binsize, freq_type
[docs]def get_freq_binsize_minutes(freq): """Gets size of a given time frequency expressed in units of minutes Parameters ---------- freq: str a string of the form [type][freq_binsize], where type corresponds to a numpy.timedelta64 encoding and freq binsize is an integer giving the number of increments of `type` of one binned increment of our time variable (for example '6h' means the data are grouped into increments of 6 hours) Raises ------ ValueError when resolution is not minute, hourly, or daily Returns ------- int integer giving the number of minutes in the given time frequency unit """ freq_binsize, freq_type = parse_freq(freq) if freq_type == "m": multiplier = 1 elif freq_type == "h": multiplier = 60 elif freq_type in ["D", "d"]: multiplier = 60 * 24 else: raise ValueError( "Cannot deal with data that are not in minute, hourly, or daily resolution" ) return multiplier * freq_binsize
[docs]def convert_utc_to_timezone(utc_hour, timezone_str): """ Convert UTC hour (0-23) to the corresponding hour in a specified timezone. Parameters: utc_hour (int): Hour in UTC (0-23). timezone_str (str): Timezone string, e.g., 'America/New_York'. Returns: int: Corresponding hour in the specified timezone. """ # Ensure the UTC hour is within the valid range if not (0 <= utc_hour <= 23): raise ValueError("UTC hour must be between 0 and 23.") # Create a UTC datetime object with the specified hour utc_time = datetime.datetime.utcnow().replace( hour=utc_hour, minute=0, second=0, microsecond=0, tzinfo=pytz.utc ) # Convert to the specified timezone target_timezone = pytz.timezone(timezone_str) local_time = utc_time.astimezone(target_timezone) return local_time.hour
[docs]def sanitize_varstr(varstr): """Sanitizes a variable string by removing non-alphanumeric characters and replacing spaces with underscores. Parameters ---------- varstr : str The variable string to sanitize. Returns ------- str The sanitized variable string. """ return re.sub(r"[^a-zA-Z0-9_]", "_", varstr).replace(" ", "_")