Source code for pype_schema.epyt_utils

import sys
import json
import numpy as np
from epyt import epanet

content_placeholder = "DrinkingWater"


[docs]class NpEncoder(json.JSONEncoder): """Custom JSON encoder for numpy types"""
[docs] def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super(NpEncoder, self).default(obj)
[docs]def epyt2pypes(inp_file, out_file, add_nodes=False): """Convert an EPANET input file to a PYPES JSON file Parameters ---------- inp_file : str Path to the EPANET input file out_file : str Path to the PYPES JSON file add_nodes : bool Whether to add additional nodes of Pumps """ G = epanet(inp_file) node_ids = {} # EPANET node index to PYPES node id nodes = {} connections = {} obj_counts = { "Joint": 0, "Tank": 0, "Reservoir": 0, "Pipe": 0, "Pump": 0, } for n in G.getNodeIndex(): # Node type is one of: Junction, Reservoir, Tank if G.getNodeType(n).upper() == "JUNCTION": id_str = "Joint" + str(obj_counts["Joint"] + 1) node_obj = { "id": id_str, "type": "Joint", "contents": content_placeholder, "tags": {}, } node_ids[n] = id_str nodes[id_str] = node_obj obj_counts["Joint"] += 1 elif G.getNodeType(n).upper() == "RESERVOIR": id_str = "Reservoir" + str(obj_counts["Reservoir"] + 1) node_obj = { "id": id_str, "type": "Reservoir", "contents": content_placeholder, "levation (meters)": G.getNodeElevations(n), "tags": {}, } node_ids[n] = id_str nodes[id_str] = node_obj obj_counts["Reservoir"] += 1 elif G.getNodeType(n).upper() == "TANK": id_str = "Tank" + str(obj_counts["Tank"] + 1) node_obj = { "id": id_str, "type": "Tank", "contents": content_placeholder, "levation (meters)": G.getNodeElevations(n), "volume (cubic meters)": G.getNodeTankVolume(n), "tags": {}, } node_ids[n] = id_str nodes[id_str] = node_obj obj_counts["Tank"] += 1 else: raise ValueError(f"Node type {G.getNodeType(n)} not recognized") for connection in G.getLinkIndex(): if add_nodes: # Link type is one of: Pipe, Pump, Valve if G.getLinkType(connection).upper() == "PIPE": connection_obj = { "id": "Pipe" + str(obj_counts["Pipe"]), "type": "Pipe", "contents": content_placeholder, "source": node_ids[G.getLinkNodesIndex(connection)[0]], "destination": node_ids[G.getLinkNodesIndex(connection)[1]], "tags": {}, } connections["Pipe" + str(obj_counts["Pipe"])] = connection_obj obj_counts["Pipe"] += 1 elif G.getLinkType(connection).upper() == "PUMP": pump_obj1 = { "id": "Pump" + str(obj_counts["Pump"]), "type": "Pump", "contents": content_placeholder, "tags": {}, } nodes["Pump" + str(obj_counts["Pump"])] = pump_obj1 obj_counts["Pump"] += 1 connection_obj = { "id": "Pipe" + str(obj_counts["Pipe"]), "type": "Pipe", "contents": content_placeholder, "source": node_ids[G.getLinkNodesIndex(connection)[0]], "destination": "Pump" + str(obj_counts["Pump"]), "tags": {}, } connections["Pipe" + str(obj_counts["Pipe"])] = connection_obj obj_counts["Pipe"] += 1 connection_obj = { "id": "Pipe" + str(obj_counts["Pipe"]), "type": "Pipe", "contents": content_placeholder, "source": "Pump" + str(obj_counts["Pump"]), "destination": node_ids[G.getLinkNodesIndex(connection)[1]], "tags": {}, } connections["Pipe" + str(obj_counts["Pipe"])] = connection_obj obj_counts["Pipe"] += 1 elif G.getLinkType(connection).upper() == "VALVE": raise NotImplementedError("Valves not yet supported in PyPES") # TODO: create Valve object # then seperate valve into multiple pipes and a valve # Assign the first node to source, and the other nodes to destination # sources = [] # destinations = [] # for i, linknode in enumerate(G.getLinkNodesIndex(connection)): # if i == 0: # sources.append(node_ids[linknode]) # else: # destinations.append(node_ids[linknode]) # joint_obj = { # "id": "Joint" + str(obj_counts["Joint"]), # "type": "Joint", # "contents": content_placeholder, # "tags": {}, # } # nodes["Joint" + str(obj_counts["Joint"])] = joint_obj # obj_counts["Joint"] += 1 # for source in sources: # connection_obj = { # "id": "Pipe" + str(obj_counts["Pipe"]), # "type": "Pipe", # "contents": content_placeholder, # "source": source, # "destination": "Joint" + str(obj_counts["Joint"] - 1), # "tags": {}, # } # connections["Pipe" + str(obj_counts["Pipe"])] = connection_obj # obj_counts["Pipe"] += 1 # for destination in destinations: # connection_obj = { # "id": "Pipe" + str(obj_counts["Pipe"]), # "type": "Pipe", # "contents": content_placeholder, # "source": "Joint" + str(obj_counts["Joint"] - 1), # "destination": destination, # "tags": {}, # } # connections["Pipe" + str(obj_counts["Pipe"])] = connection_obj # obj_counts["Pipe"] += 1 else: raise ValueError( f"Connection type {G.getLinkType(connection)} not recognized" ) else: type_str = G.getLinkType(connection).upper() type_str = type_str[0].upper() + type_str[1:].lower() id_str = type_str + str(connection) connection_obj = { "id": id_str, "type": "Pipe", "contents": content_placeholder, "source": node_ids[G.getLinkNodesIndex(connection)[0]], "destination": node_ids[G.getLinkNodesIndex(connection)[1]], "tags": {}, } connections[id_str] = connection_obj obj_counts[type_str] += 1 data = { "nodes": list(nodes.keys()), "connections": list(connections.keys()), "virtual_tags": {}, } for node_name, node_obj in nodes.items(): temp_node = node_obj.copy() temp_node.pop("id") data[node_name] = temp_node for connection_name, connection_obj in connections.items(): temp_connection = connection_obj.copy() temp_connection.pop("id") data[connection_name] = temp_connection with open(out_file, "w") as f: json.dump(data, f, indent=2, cls=NpEncoder) return data
if __name__ == "__main__": args = sys.argv[1:] epyt2pypes(args[0], args[1]) print("EPANET to PYPES conversion from {} to {} complete".format(args[0], args[1]))