import warnings
import numpy as np
import pandas as pd
import scipy as sp # noqa: F401
from pint import DimensionalityError
from .units import u
[docs]class Constant:
"""Representation for a constant value that is not in the SCADA system.
Only need for `Algebraic` mode since lambda expressions support constant
values directly.
Parameters
----------
value : float
The value of this constant at all timesteps
Attributes
----------
id : str
value : float
The value of this constant at all timesteps
"""
def __init__(self, value, parent_id=None):
self.id = "Constant(" + str(value) + ")"
self.value = value
self.parent_id = parent_id
def __repr__(self):
return (
f"<pype_schema.operations.Constant id:{self.id} "
f"value:{self.value} parent_id:{self.parent_id}>\n"
)
def __eq__(self, other):
# don't attempt to compare against unrelated types
if not isinstance(other, self.__class__):
return False
return (
self.id == other.id
and self.value == other.value
and self.parent_id == other.parent_id
)
def __lt__(self, other):
# don't attempt to compare against unrelated types
if not isinstance(other, self.__class__):
return NotImplemented
if self.value != other.value:
return self.value < other.value
elif self.id != other.id:
return self.id < other.id
elif self.parent_id != other.parent_id:
if self.parent_id is None:
return True
elif other.parent_id is None:
return False
else:
return self.parent_id < other.parent_id
else: # if all equal, less than is False
return False
[docs]def get_change(variable, delta_t=1, split=False):
"""Converts cumulative value to rate-of-change value using finite differences
Note: assumes rate of change at time t is equal to the difference between
the value at time t+1 and t
Parameters
----------
variable : pandas.Series, numpy.ndarray
variable to convert
delta_t : int
Time difference between two consecutive values of the variable (default is 1)
split: bool
Whether to split the variable into a negative change and a positive change
Returns
-------
tuple, pandas.Series or numpy.ndarray
Rate of change variable or tuple of netative, positive rate of change variable
"""
variable = variable.values if isinstance(variable, pd.Series) else variable
change = (variable[1:] - variable[:-1]) / delta_t
change = np.concatenate([change, np.array([np.nan])])
change_neg, change_pos = change.copy(), change.copy()
change_neg[change_neg > 0] = 0
change_neg = -change_neg
change_pos[change_pos < 0] = 0
if split:
return change_neg, change_pos
else:
return change
[docs]def binary_helper(operation, unit, prev_unit, totalized_mix=False):
"""Helper for parsing operations and checking units of binary operations
Parameters
----------
operation : ["+", "-", "*", "/"]
Function to apply when combining tags.
Supported functions are "+", "-", "*", and "/".
unit : Unit
Units for the right side of the operation, represented as a Pint unit.
prev_unit : Unit
Units for the left side of the operation, represented as a Pint unit.
totalized_mix : bool
Skip unit checking when there is a mix of totalized and detotalized variables.
Default is False
Raises
------
ValueError
When units are incompatible for addition or subtraction.
When `operation` is unsupported.
UserWarning
When a mix of totalized and detotalized variables makes it impossible
to verify unit compatibility
Returns
-------
Unit
Resulting Pint Unit from combining the `unit` and `prev_unit`
according to `operation`
"""
# convert unit and prev_unit to dimensionless if None
unit = u.dimensionless if unit is None else unit
prev_unit = u.dimensionless if prev_unit is None else prev_unit
if operation == "+" or operation == "-":
if unit != prev_unit:
try:
u.convert(1, unit, prev_unit)
except DimensionalityError:
if totalized_mix:
warnings.warn(
"Unable to verify units since there is a mix of "
"totalized and detotalized variables"
)
else:
raise ValueError(
"Units for addition and subtraction must be compatible"
)
elif operation == "*" or operation == "/":
if operation == "/":
prev_unit = prev_unit / unit
else:
prev_unit = prev_unit * unit
else:
raise ValueError("Unsupported operation " + operation)
return prev_unit
[docs]def unary_helper(data, un_op):
"""Transform the given data according to the VirtualTag's unary operator
Parameters
----------
data : list, array, or Series
a list, numpy array, or pandas Series of data to apply a unary operation to
un_op : ["noop", "delta", "<<", ">>", "~", "-"]
Supported operations are:
"noop" : null operator, useful when
skipping tags in a list of unary operations.
"delta" : calculate the difference between
the current timestep and previous timestep
"<<" : shift all data left one timestep,
so that the last time step will be NaN
">>" : shift all data right one timestep,
so that the first time step will be NaN
"~" : Boolean not
"-" : unary negation
Note that "delta", "<<", and ">>" return a timeseries padded
with NaN so that it is the same length as input data
Returns
-------
list, array, or Series
numpy array of dataset trannsformed by unary operation
"""
# allow for multiple unary operations to be performed sequentially
if isinstance(un_op, list):
result = data.copy()
for op in un_op:
result = unary_helper(result, op)
elif un_op == "noop":
result = data.copy()
elif un_op == "delta":
r_shift = unary_helper(data, ">>")
result = data - r_shift
elif un_op == "-":
if isinstance(data, list):
result = [-x for x in data]
elif isinstance(data, (np.ndarray, pd.Series)):
result = -data
else:
raise TypeError("Data must be either a list, array, or Series")
elif un_op == "~":
if isinstance(data, list):
result = result = [not bool(x) for x in data]
elif isinstance(data, (np.ndarray, pd.Series)):
result = data == 0
else:
raise TypeError("Data must be either a list, array, or Series")
else:
if isinstance(data, list):
result = data.copy()
if un_op == "<<":
for i in range(len(data) - 1):
result[i] = data[i + 1]
result[len(data) - 1] = float("nan")
elif un_op == ">>":
for i in range(1, len(data)):
result[i] = data[i - 1]
result[0] = float("nan")
elif isinstance(data, np.ndarray):
result = data.copy().astype("float")
if un_op == "<<":
for i in range(len(data) - 1):
result[i] = data[i + 1]
result[len(data) - 1] = np.nan
elif un_op == ">>":
for i in range(1, len(data)):
result[i] = data[i - 1]
result[0] = np.nan
elif isinstance(data, pd.Series):
if un_op == "<<":
result = data.shift(-1)
elif un_op == ">>":
result = data.shift(1)
else:
raise TypeError("Data must be either a list, array, or Series")
return result