Source code for pype_schema.epyt_utils

import sys
import json
import warnings
import numpy as np
from epyt import epanet


[docs]class NpEncoder(json.JSONEncoder): """Custom JSON encoder for numpy types"""
[docs] def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NpEncoder, self).default(obj)
[docs]def epyt2pypes( inp_file, out_file, add_nodes=False, use_name_as_id=False, content_placeholder="DrinkingWater", ): """Convert an EPANET input file to a PYPES JSON file Parameters ---------- inp_file : str Path to the EPANET input file out_file : str Path to the PYPES JSON file add_nodes : bool Whether to add additional nodes of Pumps use_name_as_id : bool Whether to use the node name as the node id. False by default, and ignored if `add_nodes` is True Returns ------- dict dictionary of `Node`, `Connection`, and `VirtualTag` objects with keys "nodes", "connections", and "virtual_tags" """ if add_nodes and use_name_as_id: warnings.warn("use_name_as_id is ignored if add_nodes is True") G = epanet(inp_file) node_ids = {} # EPANET node index to PYPES node id nodes = {} connections = {} obj_counts = { "Junction": 0, "Tank": 0, "Reservoir": 0, "Pipe": 0, "Pump": 0, "Valve": 0, } for n in G.getNodeIndex(): node_type = G.getNodeType(n).upper() elevation = G.getNodeElevations(n) if isinstance(elevation, np.ndarray): elevation = float(elevation.flat[0]) # Node type is one of: Junction, Reservoir, Tank if node_type == "JUNCTION": if use_name_as_id and not add_nodes: id_str = G.getNodeNameID(n) else: id_str = "Junction" + str(obj_counts["Junction"] + 1) node_obj = { "id": id_str, "type": "Junction", "contents": content_placeholder, "tags": {}, } node_ids[n] = id_str nodes[id_str] = node_obj obj_counts["Junction"] += 1 elif node_type == "RESERVOIR": if use_name_as_id and not add_nodes: id_str = G.getNodeNameID(n) else: id_str = "Reservoir" + str(obj_counts["Reservoir"] + 1) node_obj = { "id": id_str, "type": "Reservoir", "contents": content_placeholder, "elevation (meters)": elevation, "tags": {}, } node_ids[n] = id_str nodes[id_str] = node_obj obj_counts["Reservoir"] += 1 elif node_type == "TANK": volume = G.getNodeTankMaximumWaterVolume(n) if isinstance(volume, np.ndarray): volume = float(volume.flat[0]) if use_name_as_id and not add_nodes: id_str = G.getNodeNameID(n) else: id_str = "Tank" + str(obj_counts["Tank"] + 1) node_obj = { "id": id_str, "type": "Tank", "contents": content_placeholder, "elevation (meters)": elevation, "volume (cubic meters)": volume, "tags": {}, } node_ids[n] = id_str nodes[id_str] = node_obj obj_counts["Tank"] += 1 else: raise ValueError(f"Node type {node_type} not recognized") for connection in G.getLinkIndex(): if add_nodes: conn_type = G.getLinkType(connection).upper() # Link type is one of: Pipe, Pump, Valve if conn_type == "PIPE": connection_obj = { "id": "Pipe" + str(obj_counts["Pipe"] + 1), "type": "Pipe", "contents": content_placeholder, "source": node_ids[G.getLinkNodesIndex(connection)[0]], "destination": node_ids[G.getLinkNodesIndex(connection)[1]], "tags": {}, } connections["Pipe" + str(obj_counts["Pipe"] + 1)] = connection_obj obj_counts["Pipe"] += 1 elif conn_type == "PUMP": pump_obj1 = { "id": "Pump" + str(obj_counts["Pump"] + 1), "type": "Pump", "contents": content_placeholder, "tags": {}, } nodes["Pump" + str(obj_counts["Pump"] + 1)] = pump_obj1 connection_obj = { "id": "Pipe" + str(obj_counts["Pipe"] + 1), "type": "Pipe", "contents": content_placeholder, "source": node_ids[G.getLinkNodesIndex(connection)[0]], "destination": "Pump" + str(obj_counts["Pump"] + 1), "tags": {}, } connections["Pipe" + str(obj_counts["Pipe"] + 1)] = connection_obj obj_counts["Pipe"] += 1 connection_obj = { "id": "Pipe" + str(obj_counts["Pipe"] + 1), "type": "Pipe", "contents": content_placeholder, "source": "Pump" + str(obj_counts["Pump"] + 1), "destination": node_ids[G.getLinkNodesIndex(connection)[1]], "tags": {}, } connections["Pipe" + str(obj_counts["Pipe"] + 1)] = connection_obj obj_counts["Pipe"] += 1 # wait to increment pump count until all pipes are set up correctly obj_counts["Pump"] += 1 # TODO: change PRV to VALVE_TYPE_LIST to support multiple valve types elif conn_type in ["PRV"]: # Separate valve into multiple pipes and a valve # Assign the first node to source, and the other nodes to destination sources = [] destinations = [] for i, linknode in enumerate(G.getLinkNodesIndex(connection)): if i == 0: sources.append(node_ids[linknode]) else: destinations.append(node_ids[linknode]) valve_obj = { "id": "Valve" + str(obj_counts["Valve"] + 1), "type": "PRV", "contents": content_placeholder, "tags": {}, } nodes["Valve" + str(obj_counts["Valve"] + 1)] = valve_obj for source in sources: connection_obj = { "id": "Pipe" + str(obj_counts["Pipe"] + 1), "type": "Pipe", "contents": content_placeholder, "source": source, "destination": "Valve" + str(obj_counts["Valve"] + 1), "tags": {}, } connections["Pipe" + str(obj_counts["Pipe"] + 1)] = connection_obj obj_counts["Pipe"] += 1 for destination in destinations: connection_obj = { "id": "Pipe" + str(obj_counts["Pipe"] + 1), "type": "Pipe", "contents": content_placeholder, "source": "Valve" + str(obj_counts["Valve"] + 1), "destination": destination, "tags": {}, } connections["Pipe" + str(obj_counts["Pipe"] + 1)] = connection_obj obj_counts["Pipe"] += 1 # wait to increment valve count until all pipes are set up correctly obj_counts["Valve"] += 1 else: raise ValueError(f"Connection type {conn_type} not recognized") else: type_str = G.getLinkType(connection).upper() # TODO: change this to VALVE_TYPE_LIST to support multiple valve types if type_str in ["PRV"]: type_str = "Valve" else: type_str = type_str[0].upper() + type_str[1:].lower() if use_name_as_id: id_str = G.getLinkNameID(connection) else: id_str = type_str + str(connection) connection_obj = { "id": id_str, "type": "Pipe", "contents": content_placeholder, "source": node_ids[G.getLinkNodesIndex(connection)[0]], "destination": node_ids[G.getLinkNodesIndex(connection)[1]], "tags": {}, } connections[id_str] = connection_obj obj_counts[type_str] += 1 data = { "nodes": list(nodes.keys()), "connections": list(connections.keys()), "virtual_tags": {}, } for node_name, node_obj in nodes.items(): temp_node = node_obj.copy() temp_node.pop("id") data[node_name] = temp_node for connection_name, connection_obj in connections.items(): temp_connection = connection_obj.copy() temp_connection.pop("id") data[connection_name] = temp_connection with open(out_file, "w") as f: json.dump(data, f, indent=2, cls=NpEncoder) return data
if __name__ == "__main__": # TODO: add flags to command line argument args = sys.argv[1:] epyt2pypes(args[0], args[1]) print("EPANET to PYPES conversion from {} to {} complete".format(args[0], args[1]))