from enum import Enum, auto
from pint import UndefinedUnitError
from .units import u
[docs]def count_args(func_str):
"""Count the arguments for a lambda function string
Parameters
----------
func_str : str
A string representation of a lambda function
Returns
-------
int :
The lambda function's number of arguments
"""
func = eval(func_str)
nargs = 0
if func.__code__.co_argcount is not None:
nargs += func.__code__.co_argcount
if func.__defaults__ is not None:
nargs -= len(func.__defaults__)
if func.__code__.co_kwonlyargcount is not None:
nargs += func.__code__.co_kwonlyargcount
if func.__kwdefaults__ is not None:
nargs -= len(func.__kwdefaults__)
return nargs
[docs]def get_tag_sum_lambda_func(unit_ids):
"""Generate a lambda function string to sum tags
Parameters
----------
unit_ids : list
A list of unit IDs to sum
Returns
-------
str :
A string representation of a lambda function that sums tags
"""
arguments = ",".join(["tag" + str(unit + 1) for unit, _ in enumerate(unit_ids)])
ops = "+".join(["tag" + str(unit + 1) for unit, _ in enumerate(unit_ids)])
return f"lambda {arguments}: {ops}"
[docs]def parse_quantity(value, units):
"""Convert a value and unit string to a Pint quantity
Parameters
----------
value : float
units : str
Returns
-------
pint.Quantity
a Pint Quantity with the given value and units
"""
if value is not None:
if units:
return value * parse_units(units)
else:
return value
else:
return None
[docs]def parse_units(units):
"""Convert a unit string to a Pint Unit object
Parameters
----------
units : str
Returns
-------
Unit
a Pint Unit for the given string
"""
if units is None or units == "" or units.lower() == "none":
return
else:
clean_units = units.lower().replace(" ", "")
try:
return u(clean_units).units
except UndefinedUnitError:
if (
clean_units == "mgd"
or clean_units == "milliongalperday"
or clean_units == "milliongal/day"
or clean_units == "10**6gal/day"
or clean_units == "milliongallonperday"
or clean_units == "milliongallon/day"
or clean_units == "10**6gallon/day"
or clean_units == "milliongallonsperday"
or clean_units == "milliongallons/day"
or clean_units == "10**6gallons/day"
or clean_units == "milliongalperd"
or clean_units == "milliongal/d"
or clean_units == "10**6gal/d"
or clean_units == "milliongallonperd"
or clean_units == "milliongallon/d"
or clean_units == "10**6gallon/d"
or clean_units == "milliongallonsperd"
or clean_units == "milliongallons/d"
or clean_units == "10**6gallons/d"
):
return u.MGD
elif (
clean_units == "cubicmeters"
or clean_units == "cubicmeter"
or clean_units == "m**3"
or clean_units == "m^3"
or clean_units == "m3"
or clean_units == "meter3"
or clean_units == "meter**3"
or clean_units == "meter^3"
or clean_units == "meters3"
or clean_units == "meters**3"
or clean_units == "meters^3"
):
return u.m**3
elif clean_units == "horsepower" or clean_units == "hp":
return u.hp
elif (
clean_units == "scfm"
or clean_units == "cfm"
or clean_units == "cubicfeet/min"
or clean_units == "cubicfoot/min"
or clean_units == "ft3/min"
or clean_units == "ft**3/min"
or clean_units == "ft^3/min"
or clean_units == "foot3/min"
or clean_units == "foot^3/min"
or clean_units == "foot**3/min"
or clean_units == "feet3/min"
or clean_units == "feet**3/min"
or clean_units == "feet^3/min"
or clean_units == "cubicfeet/minute"
or clean_units == "cubicfoot/minute"
or clean_units == "ft3/minute"
or clean_units == "ft**3/minute"
or clean_units == "ft^3/minute"
or clean_units == "foot3/minute"
or clean_units == "foot^3/minute"
or clean_units == "foot**3/minute"
or clean_units == "feet3/minute"
or clean_units == "feet**3/minute"
or clean_units == "feet^3/minute"
):
return u.ft**3 / u.min
elif (
clean_units == "scf"
or clean_units == "cubicfeet"
or clean_units == "cubicfoot"
or clean_units == "ft3"
or clean_units == "ft**3"
or clean_units == "ft^3"
or clean_units == "foot3"
or clean_units == "foot**3"
or clean_units == "foot^3"
or clean_units == "feet3"
or clean_units == "feet**3"
or clean_units == "feet^3"
):
return u.ft**3
elif (
clean_units == "gpm"
or clean_units == "galpermin"
or clean_units == "gallonpermin"
or clean_units == "gallonspermin"
or clean_units == "galperminute"
or clean_units == "gallonperminute"
or clean_units == "gallonsperminute"
or clean_units == "gal/min"
or clean_units == "gal/minute"
or clean_units == "gallon/min"
or clean_units == "gallon/minute"
or clean_units == "gallons/min"
or clean_units == "gallons/minute"
):
return u.gal / u.min
elif (
clean_units == "gal"
or clean_units == "gallon"
or clean_units == "gallons"
):
return u.gal
elif (
clean_units == "gpd"
or clean_units == "galperday"
or clean_units == "gallonperday"
or clean_units == "gallonsperday"
or clean_units == "gal/d"
or clean_units == "gal/day"
or clean_units == "gallon/d"
or clean_units == "gallon/day"
or clean_units == "gallons/d"
or clean_units == "gallons/day"
):
return u.gal / u.day
elif (
clean_units == "m/s"
or clean_units == "meter/s"
or clean_units == "meters/s"
or clean_units == "m/second"
or clean_units == "meter/second"
or clean_units == "meters/second"
):
return u.m / u.s
elif (
clean_units == "cubicmeters/day"
or clean_units == "m3pd"
or clean_units == "cubicmeter/day"
or clean_units == "m**3/day"
or clean_units == "m^3/day"
or clean_units == "m3/day"
or clean_units == "meter3/day"
or clean_units == "meter**3/day"
or clean_units == "meter^3/day"
or clean_units == "meters3/day"
or clean_units == "meters**3/day"
or clean_units == "meters^3/day"
or clean_units == "cubicmeters/d"
or clean_units == "cubicmeter/d"
or clean_units == "m**3/d"
or clean_units == "m^3/d"
or clean_units == "m3/d"
or clean_units == "meter3/d"
or clean_units == "meter**3/d"
or clean_units == "meter^3/d"
or clean_units == "meters3/d"
or clean_units == "meters**3/d"
or clean_units == "meters^3/d"
):
return u.m**3 / u.day
elif (
clean_units == "psi"
or clean_units == "poundspersquareinch"
or clean_units == "poundpersquareinch"
or clean_units == "poundspersquarein"
or clean_units == "poundpersquarein"
or clean_units == "poundspersqin"
or clean_units == "poundpersqin"
or clean_units == "pound/inch**2"
or clean_units == "pounds/inch**2"
or clean_units == "lbs/inch**2"
or clean_units == "lb/inch**2"
or clean_units == "pound/inch^2"
or clean_units == "pounds/inch^2"
or clean_units == "lbs/inch^2"
or clean_units == "lb/inch^2"
or clean_units == "pound/squareinch"
or clean_units == "pounds/squareinch"
or clean_units == "lbs/squareinch"
or clean_units == "lb/squareinch"
or clean_units == "pound/in**2"
or clean_units == "pounds/in**2"
or clean_units == "lbs/in**2"
or clean_units == "lb/in**2"
or clean_units == "pound/in^2"
or clean_units == "pounds/in^2"
or clean_units == "lbs/in^2"
or clean_units == "lb/in^2"
):
return u.force_pound / (u.inch**2)
elif (
clean_units == "btu"
or clean_units == "btus"
or clean_units == "britishthermalunit"
or clean_units == "britishthermalunits"
):
return u.BTU
elif (
clean_units == "btu/scf"
or clean_units == "btus/scf"
or clean_units == "britishthermalunit/scf"
or clean_units == "britishthermalunits/scf"
or clean_units == "btu/cubicfeet"
or clean_units == "btus/cubicfeet"
or clean_units == "britishthermalunit/cubicfeet"
or clean_units == "britishthermalunits/cubicfeet"
or clean_units == "btu/cubicfoot"
or clean_units == "btus/cubicfoot"
or clean_units == "britishthermalunit/cubicfoot"
or clean_units == "britishthermalunits/cubicfoot"
or clean_units == "btu/ft3"
or clean_units == "btus/ft3"
or clean_units == "britishthermalunit/ft3"
or clean_units == "britishthermalunits/ft3"
or clean_units == "btu/ft**3"
or clean_units == "btus/ft**3"
or clean_units == "britishthermalunit/ft**3"
or clean_units == "britishthermalunits/ft**3"
or clean_units == "btu/ft^3"
or clean_units == "btus/ft^3"
or clean_units == "britishthermalunit/ft^3"
or clean_units == "britishthermalunits/ft^3"
or clean_units == "btu/foot3"
or clean_units == "btus/foot3"
or clean_units == "britishthermalunit/foot3"
or clean_units == "britishthermalunits/foot3"
or clean_units == "btu/foot**3"
or clean_units == "btus/foot**3"
or clean_units == "britishthermalunit/foot**3"
or clean_units == "britishthermalunits/foot**3"
or clean_units == "btu/feet3"
or clean_units == "btus/feet3"
or clean_units == "britishthermalunit/feet3"
or clean_units == "britishthermalunits/feet3"
or clean_units == "btu/foot^3"
or clean_units == "btus/foot^3"
or clean_units == "britishthermalunit/foot^3"
or clean_units == "britishthermalunits/foot^3"
or clean_units == "btu/feet**3"
or clean_units == "btus/feet**3"
or clean_units == "britishthermalunit/feet**3"
or clean_units == "britishthermalunits/feet**3"
or clean_units == "btu/feet^3"
or clean_units == "btus/feet^3"
or clean_units == "britishthermalunit/feet^3"
or clean_units == "britishthermalunits/feet^3"
):
return u.BTU / (u.ft**3)
elif (
clean_units == "kw*hour/scfm"
or clean_units == "kwhr/scfm"
or clean_units == "kwh/scfm"
or clean_units == "kilowatthr/scfm"
or clean_units == "kilowatthour/scfm"
or clean_units == "kilowatt*hour/scfm"
or clean_units == "kw*hour/ft**3*min"
or clean_units == "kwhr/ft**3*min"
or clean_units == "kwh/ft**3*min"
or clean_units == "kilowatthr/ft**3*min"
or clean_units == "kilowatthour/ft**3*min"
or clean_units == "kilowatt*hour/ft**3*min"
):
return u.kW * u.hr / u.ft**3 * u.min
elif (
clean_units == "kwh"
or clean_units == "kwhr"
or clean_units == "kilowatthr"
or clean_units == "hour*kilowatt"
or clean_units == "kilowatt*hour"
or clean_units == "kilowatthour"
):
return u.kW * u.hr
elif (
clean_units == "kilowatt*hour/meter**3"
or clean_units == "hour*kilowatt/meter**3"
or clean_units == "kwh/meter**3"
or clean_units == "kwhr/meter**3"
or clean_units == "kilowatthr/meter**3"
or clean_units == "kilowatthour/meter**3"
or clean_units == "kilowatt*hour/m^3"
or clean_units == "hour*kilowatt/m^3"
or clean_units == "kwh/m^3"
or clean_units == "kwhr/m^3"
or clean_units == "kilowatthr/m^3"
or clean_units == "kilowatthour/m^3"
or clean_units == "kilowatt*hour/cubicmeters"
or clean_units == "hour*kilowatt/cubicmeters"
or clean_units == "kwh/cubicmeters"
or clean_units == "kwhr/cubicmeters"
or clean_units == "kilowatthr/cubicmeters"
or clean_units == "kilowatthour/cubicmeters"
or clean_units == "kilowatt*hour/cubicmeter"
or clean_units == "hour*kilowatt/cubicmeter"
or clean_units == "kwh/cubicmeter"
or clean_units == "kwhr/cubicmeter"
or clean_units == "kilowatthr/cubicmeter"
or clean_units == "kilowatthour/cubicmeter"
or clean_units == "kilowatt*hour/m**3"
or clean_units == "hour*kilowatt/m**3"
or clean_units == "kwh/m**3"
or clean_units == "kwhr/m**3"
or clean_units == "kilowatthr/m**3"
or clean_units == "kilowatthour/m**3"
or clean_units == "kilowatt*hour/m3"
or clean_units == "hour*kilowatt/m3"
or clean_units == "kwh/m3"
or clean_units == "kwhr/m3"
or clean_units == "kilowatthr/m3"
or clean_units == "kilowatthour/m3"
or clean_units == "kilowatt*hour/meter3"
or clean_units == "hour*kilowatt/meter3"
or clean_units == "kwh/meter3"
or clean_units == "kwhr/meter3"
or clean_units == "kilowatthr/meter3"
or clean_units == "kilowatthour/meter3"
or clean_units == "kilowatt*hour/meter^3"
or clean_units == "hour*kilowatt/meter^3"
or clean_units == "kwh/meter^3"
or clean_units == "kwhr/meter^3"
or clean_units == "kilowatthr/meter^3"
or clean_units == "kilowatthour/meter^3"
or clean_units == "kilowatt*hour/meters3"
or clean_units == "hour*kilowatt/meters3"
or clean_units == "kwh/meters3"
or clean_units == "kwhr/meters3"
or clean_units == "kilowatthr/meters3"
or clean_units == "kilowatthour/meters3"
or clean_units == "kilowatt*hour/meters**3"
or clean_units == "hour*kilowatt/meters**3"
or clean_units == "kwh/meters**3"
or clean_units == "kwhr/meters**3"
or clean_units == "kilowatthr/meters**3"
or clean_units == "kilowatthour/meters**3"
or clean_units == "kilowatt*hour/meters^3"
or clean_units == "hour*kilowatt/meters^3"
or clean_units == "kwh/meters^3"
or clean_units == "kwhr/meters^3"
or clean_units == "kilowatthr/meters^3"
or clean_units == "kilowatthour/meters^3"
):
return u.kW * u.hr / (u.m**3)
elif clean_units == "kw" or clean_units == "kilowatt":
return u.kW
elif (
clean_units == "meters" or clean_units == "m" or clean_units == "meter"
):
return u.m
elif (
clean_units == "inches" or clean_units == "in" or clean_units == "inch"
):
return u.inch
elif (
clean_units == "hz"
or clean_units == "hertz"
or clean_units == "1/s"
or clean_units == "1/second"
or clean_units == "1/sec"
):
return u.Hz
elif (
clean_units == "lmh"
or clean_units == "liter**2/meters**2/hour"
or clean_units == "liter^2/meters^2/hour"
or clean_units == "liter2/meters2/hour"
or clean_units == "liter^2/hour/meters^2"
or clean_units == "liter**2/hour/meters**2"
or clean_units == "liter2/hour/meters2"
or clean_units == "l**2/m**2/h"
or clean_units == "l^2/m^2/h"
or clean_units == "l2/m2/h"
or clean_units == "l^2/h/m^2"
or clean_units == "l**2/h/m**2"
or clean_units == "l2/h/m2"
):
return u.LMH
elif (
clean_units == "permeability"
or clean_units == "lmh/bar"
or clean_units == "flux_lmh/bar"
or clean_units == "liter**2/meters**2/hour/bar"
or clean_units == "liter^2/meters^2/hour/bar"
or clean_units == "liter2/meters2/hour/bar"
or clean_units == "liter^2/hour/meters^2/bar"
or clean_units == "liter**2/hour/meters**2/bar"
or clean_units == "liter2/hour/meters2/bar"
or clean_units == "l**2/m**2/h/bar"
or clean_units == "l^2/m^2/h/bar"
or clean_units == "l2/m2/h/bar"
or clean_units == "l^2/h/m^2/bar"
or clean_units == "l**2/h/m**2/bar"
or clean_units == "l2/h/m2/bar"
):
return u.LMH / u.bar
elif (
clean_units == "intensity"
or clean_units == "w/m**2"
or clean_units == "w/m^2"
or clean_units == "w/m2"
or clean_units == "w/meter**2"
or clean_units == "w/meter^2"
or clean_units == "w/meter2"
or clean_units == "watt/meter**2"
or clean_units == "watt/meter^2"
or clean_units == "watt/meter2"
):
return u.W / (u.m**2)
else:
raise UndefinedUnitError("Unsupported unit: " + units)
[docs]class ContentsType(Enum):
"""Class to represent any possible contents,
whether they are sludge, water, or gas"""
UntreatedSewage = auto()
PrimaryEffluent = auto()
SecondaryEffluent = auto()
TertiaryEffluent = auto()
TreatedSewage = auto()
DrinkingWater = auto()
PotableReuse = auto()
NonpotableReuse = auto()
Biogas = auto()
NaturalGas = auto()
GasBlend = auto()
FatOilGrease = auto()
PrimarySludge = auto()
TPS = auto() # thickened primary sludge
WasteActivatedSludge = auto()
TWAS = auto() # thickened waste activated sludge
Scum = auto()
FoodWaste = auto()
SludgeBlend = auto()
ThickenedSludgeBlend = auto()
Electricity = auto()
Brine = auto()
Seawater = auto()
SurfaceWater = auto()
Groundwater = auto()
Stormwater = auto()
Heat = auto()
Oil = auto()
Grease = auto()
Air = auto()
Chemical = auto()
Coagulant = auto()
Disinfectant = auto()
Deodorant = auto()
IndustrialWastewater = auto()
MunicipalWastewater = auto()
DisinfectedEffluent = auto()
SolidWaste = auto()
PretreatedWater = auto()
ProductWater = auto()
ChlorinatedSeawater = auto()
CoagulatedWater = auto()
FilterBackwash = auto()
Filtrate = auto()
WFBS = auto() # Water Filter Backwash Solids
ControlSignal = auto()
DataTransfer = auto()
[docs]class PumpType(Enum):
"""Enum to represent constant vs. variable drive pumps"""
Constant = auto()
VFD = auto() # variable frequency drive
ERD = auto() # energy recovery device
AirBlower = auto()
[docs]class DigesterType(Enum):
"""Enum to represent types of digesters"""
Aerobic = auto()
Anaerobic = auto()
[docs]class DosingType(Enum):
"""Enum to represent types of dosing"""
NaOCl = auto() # sodium hypochlorite
FeCl3 = auto() # ferric chloride
Antiscalant = auto()
CO2 = auto() # carbon dioxide
CaOH2 = auto() # calcium hydroxide
OrthoPolyphosphate = auto()
SBS = auto() # sodium bisulphite
Polymer = auto()
UVLight = auto()
[docs]def select_objs_helper(
obj,
obj_source_node=None,
obj_dest_node=None,
obj_source_unit_id=None,
obj_dest_unit_id=None,
obj_exit_point=None,
obj_entry_point=None,
source_id=None,
dest_id=None,
source_unit_id=None,
dest_unit_id=None,
exit_point_id=None,
entry_point_id=None,
source_node_type=None,
dest_node_type=None,
exit_point_type=None,
entry_point_type=None,
tag_type=None,
recurse=False,
):
"""Helper to select from objects which match source/destination node class,
unit ID, and contents
Parameters
----------
obj : [Node, Connection, Tag]
Object to check if it meets the specified filtering criteria
obj_source_node : Node
Optional source `Node` to check the type and id of. None by default
obj_dest_node : Node
Optional destination `Node` to check the type and id of. None by default
obj_source_unit_id : int, str
Object's source unit ID to match against. None by default
obj_dest_unit_id : int, str
Object's destination unit ID to match against. None by default
obj_exit_point : Node
Optional `exit_point` `Node` to check the type and id of. None by default
obj_entry_point : Node
Optional `entry_point` `Node` to check the type and id of. None by default
source_id : str
Optional id of the source node to filter by. None by default
dest_id : str
Optional id of the destination node to filter by. None by default
source_unit_id : int, str
Optional unit id of the source to filter by. None by default
dest_unit_id : int, str
Optional unit id of the destination to filter by. None by default
source_node_type : class
Optional source `Node` subclass to filter by. None by default
dest_node_type : class
Optional destination `Node` subclass to filter by. None by default
tag_type : TagType
Optional tag type to filter by. None by default
recurse : bool
Whether to search for objects within nodes. False by default
Raises
------
ValueError
When a source/destination node type is provided to subset tags
TypeError
When the objects to select among are not of type
{'pype_schema.Tag' or `pype_schema.Connection`}
Returns
-------
bool
True if `obj` fits the filter criteria; False otherwise.
"""
# convert string source and destination unit IDs to integers
if isinstance(source_unit_id, str) and source_unit_id != "total":
source_unit_id = int(source_unit_id)
if isinstance(dest_unit_id, str) and dest_unit_id != "total":
dest_unit_id = int(dest_unit_id)
if source_id is not None and (
not hasattr(obj_source_node, "id") or obj_source_node.id != source_id
):
if (
not recurse
or not hasattr(obj_exit_point, "id")
or obj_exit_point.id != source_id
):
return False
if dest_id is not None and (
not hasattr(obj_dest_node, "id") or obj_dest_node.id != dest_id
):
if (
not recurse
or not hasattr(obj_entry_point, "id")
or obj_entry_point.id != dest_id
):
return False
if source_unit_id is not None and source_unit_id != obj_source_unit_id:
return False
if dest_unit_id is not None and dest_unit_id != obj_dest_unit_id:
return False
if exit_point_id is not None and (
not hasattr(obj_exit_point, "id") or obj_exit_point.id != exit_point_id
):
return False
if entry_point_id is not None and (
not hasattr(obj_entry_point, "id") or obj_entry_point.id != entry_point_id
):
return False
if source_node_type is not None and not isinstance(
obj_source_node, source_node_type
):
if not recurse or not isinstance(obj_exit_point, source_node_type):
return False
if dest_node_type is not None and not isinstance(obj_dest_node, dest_node_type):
if not recurse or not isinstance(obj_entry_point, dest_node_type):
return False
if exit_point_type is not None and not isinstance(obj_exit_point, exit_point_type):
return False
if entry_point_type is not None and isinstance(obj_entry_point, entry_point_type):
return False
if tag_type is not None and (
not hasattr(obj, "tag_type") or obj.tag_type != tag_type
):
return False
return True
[docs]def recursive_get(key, dict_):
"""Recursively search a nested dictionary for a key
Parameters
----------
key : str
Key to search
dict_ : dict
(nested) dictionary to search
Returns : any, None
Value of the key if found, None otherwise
"""
if key in dict_:
return dict_[key]
elif isinstance(dict_, dict):
for k, v in dict_.items():
if isinstance(v, dict):
item = recursive_get(key, v)
if item is not None:
return item