import warnings
from enum import Enum, auto
from pandas import DataFrame, Series
import pandas as pd # noqa: F401
import numpy as np # noqa: F401
import scipy as sp # noqa: F401
from numpy import ndarray, issubdtype
from .utils import count_args
from .units import u
from .operations import * # noqa: F401, F403
from .logbook import Logbook
[docs]class TagType(Enum):
"""Enum to represent types of SCADA tags"""
Flow = auto() # flow through a connection
Volume = auto()
Level = auto()
Pressure = auto()
Temperature = auto()
RunTime = auto()
RunStatus = auto()
VSS = auto() # volatile suspended solids
TSS = auto() # total suspended solids
TDS = auto() # total dissolved solids
COD = auto() # chemical oxygen demand
BOD = auto() # biochemical oxygen demand
pH = auto()
Conductivity = auto()
Turbidity = auto()
Rotation = auto()
Efficiency = auto()
StateOfCharge = auto()
InFlow = auto() # flow into a node
OutFlow = auto() # flow out of a node
NetFlow = auto() # net flow through a node
Speed = auto()
Frequency = auto()
Concentration = auto()
SetPoint = auto() # history of control set points
[docs]class DownsampleType(Enum):
"""Enum to represent common methods of downsampling data"""
Average = auto()
Decimation = auto()
Reservoir = auto()
CONTENTLESS_TYPES = [
TagType.RunTime,
TagType.RunStatus,
TagType.Rotation,
TagType.Efficiency,
TagType.Speed,
TagType.Frequency,
]
[docs]class Tag:
"""Class to represent a SCADA or other data tag
Parameters
----------
id : str
Tag ID
units : str or Unit
Units represented as a string or Pint unit.
E.g., 'MGD' or 'cubic meters' or <Unit('MGD')>
tag_type : TagType
Type of data saved under the tag. E.g., `InfluentFlow` or `RunTime`
source_unit_id : int or str
integer representing unit number, or `total` if a combined data point
across all units of the source node
dest_unit_id : int or str
integer representing unit number, or `total` if a combined data point
across all units of the destination node.
None if the Tag is associated with a Node object instead of a Connection
parent_id : str
ID for the parent object (either a Node or Connection)
totalized : bool
True if data is totalized. False otherwise
contents : ContentsType
Data stream contents. E.g., `WasteActivatedSludge` or `NaturalGas`
manufacturer : str
Name of the manufacturer for the physical sensor hardware. Default is None
measure_freq : pint.Quantity
Measurement frequency of the data with units. None by default
report_freq : pint.Quantity
Reporting frequency of the data with units. None by default
downsample_method : DownsampleType
None by default, meaning that data is reported on the same frequency as measured
calibration : Logbook
A history of sensor calibration.
Attributes
----------
id : str
Tag ID
units : Unit
Units represented as a Pint unit. E.g., <Unit('MGD')>
tag_type : TagType
Type of data saved under the tag. E.g., `InfluentFlow` or `RunTime`
source_unit_id : int or str
integer representing unit number, or `total` if a combined data point
across all units of the sources node
dest_unit_id : int or str
integer representing unit number, or `total` if a combined data point
across all units of the destination node
parent_id : str
ID for the parent object (either a Node or Connection)
totalized : bool
True if data is totalized. False by default
contents : ContentsType
Contents moving through the node
manufacturer : str
Name of the manufacturer for the physical sensor hardware. Default is None
measure_freq : pint.Quantity
Measurement frequency of the data with units. None by default
report_freq : pint.Quantity
Reporting frequency of the data with units. None by default
downsample_method : DownsampleType
None by default, meaning that data is reported on the same frequency as measured
calibration : Logbook
A history of sensor calibration.
"""
def __init__(
self,
id,
units,
tag_type,
source_unit_id,
dest_unit_id,
parent_id,
totalized=False,
contents=None,
manufacturer=None,
measure_freq=None,
report_freq=None,
downsample_method=None,
calibration=Logbook(),
):
self.id = id
self.units = units
self.contents = contents
self.tag_type = tag_type
self.totalized = totalized
self.source_unit_id = source_unit_id
self.dest_unit_id = dest_unit_id
self.parent_id = parent_id
self.manufacturer = manufacturer
# convert to Pint units if string value
if isinstance(measure_freq, str):
self.measure_freq = u.Quantity(measure_freq)
else:
self.measure_freq = measure_freq
if isinstance(report_freq, str):
self.report_freq = u.Quantity(report_freq)
else:
self.report_freq = report_freq
self.downsample_method = downsample_method
self.calibration = calibration
def __repr__(self):
return (
f"<pype_schema.tag.Tag id:{self.id} units:{self.units} "
f"tag_type:{self.tag_type} source_unit_id:{self.source_unit_id} "
f"dest_unit_id:{self.dest_unit_id} parent_id:{self.parent_id} "
f"totalized:{self.totalized} contents:{self.contents} "
f"manufacturer:{self.manufacturer} measure_freq:{self.measure_freq} "
f"report_freq:{self.report_freq} "
f"downsample_method:{self.downsample_method} "
f"calibration:{self.calibration}>\n"
)
def __eq__(self, other):
# don't attempt to compare against unrelated types
if not isinstance(other, self.__class__):
return False
return (
self.id == other.id
and self.contents == other.contents
and self.tag_type == other.tag_type
and self.totalized == other.totalized
and self.source_unit_id == other.source_unit_id
and self.dest_unit_id == other.dest_unit_id
and self.units == other.units
and self.parent_id == other.parent_id
and self.manufacturer == other.manufacturer
and self.measure_freq == other.measure_freq
and self.report_freq == other.report_freq
and self.downsample_method == other.downsample_method
and self.calibration == other.calibration
)
def __hash__(self):
return hash(
(
self.id,
self.contents,
self.tag_type,
self.totalized,
self.source_unit_id,
self.dest_unit_id,
self.units,
self.parent_id,
self.manufacturer,
self.measure_freq,
self.report_freq,
self.downsample_method,
self.calibration,
)
)
def __lt__(self, other):
# don't attempt to compare against unrelated types
if not isinstance(other, self.__class__):
return NotImplemented
if self.id != other.id:
return self.id < other.id
elif self.contents != other.contents:
return self.contents.value < other.contents.value
elif self.tag_type != other.tag_type:
return self.tag_type.value < other.tag_type.value
elif self.totalized != other.totalized:
return not self.totalized
elif self.source_unit_id != other.source_unit_id:
if self.source_unit_id == "total":
return False
elif other.source_unit_id == "total":
return True
else:
return self.source_unit_id < other.source_unit_id
elif self.dest_unit_id != other.dest_unit_id:
if self.dest_unit_id == "total":
return False
elif other.dest_unit_id == "total":
return True
else:
return self.dest_unit_id < other.dest_unit_id
elif self.units != other.units:
return str(self.units) < str(other.units)
elif self.measure_freq != other.measure_freq:
return self.measure_freq < other.measure_freq
elif self.report_freq != other.report_freq:
return self.report_freq < other.report_freq
elif self.downsample_method != other.downsample_method:
return self.downsample_method < other.downsample_method
elif self.calibration != other.calibration:
return len(self.calibration) < len(other.calibration)
else:
return self.parent_id < other.parent_id
[docs] def get_manufacturer(self):
try:
return self._manufacturer
except AttributeError:
return None
[docs] def set_manufacturer(self, manufacturer):
self._manufacturer = manufacturer
[docs] def del_manufacturer(self):
del self._manufacturer
manufacturer = property(get_manufacturer, set_manufacturer, del_manufacturer)
[docs] def get_report_freq(self):
try:
return self._report_freq
except AttributeError:
return None
[docs] def set_report_freq(self, report_freq):
self._report_freq = report_freq
[docs] def del_report_freq(self):
del self._report_freq
report_freq = property(get_report_freq, set_report_freq, del_report_freq)
[docs] def get_measure_freq(self):
try:
return self._measure_freq
except AttributeError:
return None
[docs] def set_measure_freq(self, measure_freq):
self._measure_freq = measure_freq
[docs] def del_measure_freq(self):
del self._measure_freq
measure_freq = property(get_measure_freq, set_measure_freq, del_measure_freq)
[docs] def get_downsample_method(self):
try:
return self._downsample_method
except AttributeError:
return None
[docs] def set_downsample_method(self, downsample_method):
self._downsample_method = downsample_method
[docs] def del_downsample_method(self):
del self._downsample_method
downsample_method = property(
get_downsample_method, set_downsample_method, del_downsample_method
)
[docs] def get_calibration(self):
try:
return self._calibration
except AttributeError:
return Logbook()
[docs] def set_calibration(self, calibration):
self._calibration = calibration
[docs] def del_calibration(self):
del self._calibration
calibration = property(get_calibration, set_calibration, del_calibration)
[docs] def check_type_compatibility(self, other_type):
"""Check if the given tag_type is compatible with another
Parameters
----------
other_type : TagType
Type of tag to compare against
Returns
-------
bool
True if compatible, False otherwise
"""
if not isinstance(other_type, TagType):
raise TypeError("tag_type must be a TagType object")
flow_types = [TagType.Flow, TagType.InFlow, TagType.OutFlow, TagType.NetFlow]
if self.tag_type in flow_types and other_type in flow_types:
return True
if self.tag_type == other_type:
return True
return False
[docs]class VirtualTag:
"""Representation for data that is not in the SCADA system, but is instead
a combination of existing tags combined via the `operations` lambda function string
Parameters
----------
id : str
VirtualTag ID
tags : list of Tag
List of Tag objects to combine
operations : str
String a lambda function to apply to all tags,
must have number of args equal to number of tags
tag_type : TagType
Type of data saved under the tag. E.g., `InfluentFlow` or `RunTime`.
Default is None, and it will be automatically determined from constituent
Tags if they all have the same type.
parent_id : str
ID for the parent object (either a Node or Connection)
contents : ContentsType
Contents moving through the node. Default is None, and it will be automatically
determined from consituent Tag contents
Raises
------
ValueError
When `operations` lambda function has the wrong number of elements
When `tag_type` is not specified and constituent tags have different types.
When `contents` of the constituent tags are different types.
UserWarning
When a mix of totalized and detotalized tags are combined
When tags have different units
Attributes
----------
id : str
Tag ID
tags : list of Tag
List of Tag objects to combine
operations :
String giving a lambda function to apply to constituent tags
units : str or Unit
Units represented as a string or Pint unit.
E.g., 'MGD' or 'cubic meters' or <Unit('MGD')>
tag_type : TagType
Type of data saved under the tag. E.g., `InfluentFlow` or `RunTime`
totalized : bool
True if data is totalized. False otherwise
parent_id : str
ID for the parent object (either a Node or Connection)
contents : ContentsType
Contents moving through the node
"""
def __init__(
self,
id,
tags,
operations=None,
units=None,
tag_type=None,
parent_id=None,
contents=None,
):
# TODO: inherit report_freq from child tags
# TODO: incorporate DownsampleMethod for different report_freq
self.id = id
self.parent_id = parent_id
self.tags = tags
self.units = units
units = []
totalized = None
determine_type = True if tag_type is None else False
determine_contents = True if contents is None else False
totalized_mix = False
for tag in tags:
units.append(tag.units)
if totalized is not None and not totalized_mix:
if totalized != tag.totalized:
warnings.warn(
"Tags should have the same value for 'totalized'. "
"Setting `totalized` to false under the assumption "
"that data has been cleaned and detotalized already."
)
totalized = False
totalized_mix = True
else:
totalized = tag.totalized
if determine_type:
if tag_type is not None:
if not tag.check_type_compatibility(tag_type):
raise ValueError(
"All Tags must have the same value for 'tag_type'"
)
else:
tag_type = tag.tag_type
if determine_contents and tag_type not in CONTENTLESS_TYPES:
if contents is not None:
if contents != tag.contents:
raise ValueError(
"All Tags must have the same value for 'contents'"
)
else:
contents = tag.contents
if tag_type in CONTENTLESS_TYPES:
self.contents = None
else:
self.contents = contents
self.tag_type = tag_type
self.totalized = totalized
if operations is not None and operations:
if count_args(operations) != len(tags):
raise ValueError(
"Operations lambda function must have the same "
"number of arguments as the Tag list"
)
elif len(tags) > 1:
raise ValueError(
"Operations lambda function must be specified "
"if multiple tags are given"
)
self.operations = operations
def __repr__(self):
return (
f"<pype_schema.tag.VirtualTag id:{self.id} units:{self.units} "
f"tag_type:{self.tag_type} totalized:{self.totalized} "
f"contents:{self.contents} tags:{[tag.id for tag in self.tags]} "
f"operations:{self.operations} "
f"parent_id:{self.parent_id}>\n"
)
def __eq__(self, other):
# don't attempt to compare against unrelated types
if not isinstance(other, self.__class__):
return False
return (
self.id == other.id
and self.contents == other.contents
and self.tag_type == other.tag_type
and self.totalized == other.totalized
and self.units == other.units
and self.tags == other.tags
and self.operations == other.operations
)
def __hash__(self):
return hash(
(
self.id,
str(self.tags),
str(self.operations),
self.contents,
self.tag_type,
self.totalized,
self.units,
)
)
def __lt__(self, other):
if isinstance(other, Tag):
return False
elif not isinstance(other, self.__class__):
raise NotImplementedError
elif len(self.tags) < len(other.tags):
return True
elif len(self.tags) > len(other.tags):
return False
elif self.operations != other.operations:
return self.operations < other.operations
elif self.id != other.id:
return self.id < other.id
elif self.contents != other.contents:
return self.contents.value < other.contents.value
elif self.tag_type != other.tag_type:
return self.tag_type.value < other.tag_type.value
elif self.totalized != other.totalized:
return other.totalized
else:
return str(self.units) < str(other.units)
[docs] def process_ops(self, data, tag_to_var_map={}):
"""Transform the given data according to the VirtualTag's lambda string
Parameters
----------
data : list, array, dict, or DataFrame
a list, numpy array, or pandas DataFrame of data that has the
correct dimensions. I.e., the number of columns is one more than
binary operations and same length as unary operations
tag_to_var_map : dict
dictionary of the form { tag.id : variable_name } for using data files
that differ from the original SCADA tag naming system
Returns
-------
list, array, or Series
numpy array of combined dataset
"""
result = data.copy()
num_ops = count_args(self.operations)
func_ = eval(self.operations)
if isinstance(data, list):
if num_ops == len(data):
result = func_(*[data_ for data_ in data])
else:
raise ValueError(
"Data must have the correct dimensions "
"(same length as unary operations). "
"Currently there are {} unary operations and {} data tags".format(
num_ops, len(data)
)
)
elif isinstance(data, ndarray):
if issubdtype(data.dtype, (int)):
result = result.astype("float")
if num_ops == data.shape[1]:
result = func_(*[data[:, i] for i in range(data.shape[1])])
else:
raise ValueError(
"Data must have the correct dimensions "
"(same length as number of args in operations lambda function). "
"Currently there are {} args and {} data tags".format(
num_ops, data.shape[1]
)
)
elif isinstance(data, (dict, DataFrame)):
varnames = []
for tag_obj in self.tags:
varname = tag_to_var_map[tag_obj.id] if tag_to_var_map else tag_obj.id
varnames.append(varname)
if isinstance(tag_obj, self.__class__):
data[varname] = tag_obj.calculate_values(data)
result = func_(*[data[varname] for varname in varnames])
if isinstance(result, Series):
result.rename(self.id, inplace=True)
else:
raise TypeError("Data must be either a list, array, dict, or DataFrame")
return result
[docs] def calculate_values(self, data, tag_to_var_map={}):
"""Combine the given data according to the VirtualTag's operations
Parameters
----------
data : list, array, dict, or DataFrame
a list, numpy array, or pandas DataFrame of data that has the
correct dimensions. I.e., the number of columns is one more than
binary operations and same length as unary operations
tag_to_var_map : dict
dictionary of the form { tag.id : variable_name } for using data files
that differ from the original SCADA tag naming system
Returns
-------
list, array, or Series
numpy array of combined dataset
"""
if self.operations is not None and self.operations:
data = self.process_ops(data, tag_to_var_map=tag_to_var_map)
elif isinstance(data, (dict, DataFrame)):
# if ops, get appropriate column and rename
data = data[self.tags[0].id].rename(self.id)
elif isinstance(data, ndarray):
# flatten array since operations do that automatically
data = data[:, 0]
return data