Source code for dr_simulator.utils

"""This module contains utility functions for DR Simulator"""

# pylint: disable=line-too-long, too-many-arguments, too-many-locals
import re
import json
import pickle
import datetime as dt
from enum import Enum
from warnings import warn
import numpy as np
import pandas as pd


[docs] class DistributionTypes(Enum): """Enum class for supported distributions""" NORMAL = "normal" UNIFORM = "uniform" POISSON = "poisson"
distr_param_mapping = { DistributionTypes.NORMAL: ["loc", "scale"], DistributionTypes.UNIFORM: ["low", "high"], DistributionTypes.POISSON: ["lam"], }
[docs] def validate_distribution_params(distr_type, distr_params): """Validates the distribution parameters Parameters ---------- distr_type : DistributionTypes Distribution type distr_params : dict Distribution parameters Returns ------- bool True if the parameters are valid, False otherwise """ if distr_type not in DistributionTypes: return False if not isinstance(distr_params, dict): return False if not all(k in distr_params for k in distr_param_mapping[distr_type]): return False return True
[docs] def parse_freq(freq): """Parses a time frequency code string, returning its type and its freq_binsize Parameters ---------- freq: str string of the form [type][freq_binsize], where type corresponds to a numpy.timedelta64 encoding and freq binsize is an integer giving the number of increments of `type` of one binned increment of our time variable (for example '6h' means the data are grouped into increments of 6 hours) Returns ------- tuple tuple of the form (`int`,`str`) giving the binsize and type of the time frequency given """ freq_type = re.sub("[0-9]", "", freq) freq_binsize = int(re.sub("[^0-9]", "", freq)) return freq_binsize, freq_type
[docs] def get_freq_binsize_minutes(freq): """Gets size of a given time frequency expressed in units of minutes Parameters ---------- freq: str a string of the form [type][freq_binsize], where type corresponds to a numpy.timedelta64 encoding and freq binsize is an integer giving the number of increments of `type` of one binned increment of our time variable (for example '6h' means the data are grouped into increments of 6 hours) Raises ------ ValueError when resolution is not minute, hourly, or daily Returns ------- int integer giving the number of minutes in the given time frequency unit """ freq_binsize, freq_type = parse_freq(freq) if freq_type == "m": multiplier = 1 elif freq_type == "h": multiplier = 60 elif freq_type in ["D", "d"]: multiplier = 60 * 24 else: raise ValueError( "Cannot deal with data that are not in minute, hourly, or daily resolution" ) return multiplier * freq_binsize
[docs] def text_to_param_dict(distr_type, distr_params_text): """Converts distribution parameters entered as text to param_dict Parameters ---------- distr_type : DistributionTypes Distribution type distr_params_text : str Distribution parameters entered as text Returns ------- dict Distribution parameters as dictionary """ param_list = distr_params_text.split(",") if not len(param_list) == len(distr_param_mapping[distr_type]): raise ValueError(f"Invalid number of parameters for {distr_type} distribution") param_dict = { k: int(v) for k, v in zip(distr_param_mapping[distr_type], param_list) } return param_dict
[docs] def days_in_year_month(year, month): """ Parameters ---------- year : int month : int Returns ------- int number of days in a `month` of `year` """ if month == 2: # Check if it's a leap year if year % 4 == 0: return 29 return 28 if month in [4, 6, 9, 11]: return 30 return 31
[docs] def pickle_load(path): """Loads a pickled object (fitted model, dictionary with data, etc) Parameters ---------- path : str path to object to load Returns ------- object unpickled object found at the `path` """ with open(path, "rb") as f: object_ = pickle.load(f) return object_
[docs] def pickle_dump(object_, path): """Pickles an object (fitted model, dictionary with data, etc) Parameters ---------- object_ object to compress path : str path where the pickled object is saved """ with open(path, "wb") as f: pickle.dump(object_, f)
[docs] def json_load(path): """Loads a json string to python Parameters ---------- path : str path to object to load Returns ------- object_ : object python object converted from str """ with open(path, "r", encoding="utf-8") as f: object_ = json.load(f) return object_
[docs] def json_dump(object_, path): """Dumps a python object to a json string Parameters ---------- object_ : object python object to convert to str path : str path where the json string is saved """ with open(path, "w", encoding="utf-8") as f: json.dump(object_, f, ensure_ascii=False, indent=4)
[docs] def create_calender(dates): """ This function creates a calendar for the given dates Parameters ---------- dates : list List of dates Returns ------- woy : numpy.ndarray Week of the year of the dates dow : numpy.ndarray Day of the week of the dates calendar : numpy.ndarray Calendar of the dates """ woy, dow = zip(*[d.isocalendar()[1:] for d in dates]) woy = np.array(woy) - min(woy) # make lowest week 0 dow = np.array(dow) - 1 # make Monday 0 ni = max(woy) + 1 # number of weeks in dates calendar = np.zeros((ni, 7)) # create arrays of Zeros for the calendar return woy, dow, calendar
[docs] def get_n_similar_weekdays(date, prev_event_days, n_weekdays=10): """ This function gets the 10 similar weekdays excluding the event days Parameters ---------- date : datetime.datetime Date of the event prev_event_days : list List of previous event days n_weekdays : int (default=10) Number of similar weekdays to return Returns ------- similar_weekdays : list List of length of previous n_weekdays excluding the event days """ similar_weekdays = [] while len(similar_weekdays) < n_weekdays: date = date - dt.timedelta(days=1) if date.weekday() < 5 and date not in prev_event_days: similar_weekdays.append(date) return similar_weekdays
[docs] def get_date_range_prev_month(date, n_days): """ This function gets the date range for the previous month Parameters ---------- date : datetime.datetime Date of the event n_days : int Number of days in the previous month Returns ------- date_range : list List of length n_days with the dates of the previous month """ date_range = [] for _ in range(n_days): date = date - dt.timedelta(days=1) date_range.append(date) return date_range
[docs] def get_hourly_average_consumption( similar_days, event_hour, output_data, electricity_purchase_varnames, datetime_varname, ): """Get the baseline consumption for a given demand response event Parameters ---------- similar_days : np.array of np.datetime64 List of similar weekdays event_hour : int Hour of the day of the demand response event output_data : pandas.DataFrame the output data for baseline calculation electricity_purchase_varnames : list List of electricity purchase variable names datetime_varname : str Name of the datetime variable in the output data Returns ------- baseline_consumption : float Baseline consumption in kWh """ # Extract the unique years, months, and days similar_days = pd.Series(similar_days) years = np.unique(similar_days.dt.year) months = np.unique(similar_days.dt.month) days = np.unique(similar_days.dt.day) # Filter the output data to only include the similar days filter_mask = ( np.isin(output_data[datetime_varname].dt.year, years) & np.isin(output_data[datetime_varname].dt.month, months) & np.isin(output_data[datetime_varname].dt.day, days) & (output_data[datetime_varname].dt.hour == event_hour) ) if filter_mask.sum() == 0: raise ValueError("No similar days found in the output data") baseline_consumption = ( output_data.loc[filter_mask, electricity_purchase_varnames].sum(axis=1).mean() ) return baseline_consumption
[docs] def get_day_of_adj_ratio( dr_period_details, output_data, electricity_purchase_varnames, datetime_varname, day_of_adj_details=None, ): """ Get the day-of-adjustment ratio for a given demand response event. The day-of-adjustment ratio is calculated as the average consumption of the window hours before the event divided by the average consumption of the same hours on similar weekdays. The day-of-adjustment ratio is capped at the day_of_adj_max value. Parameters ---------- dr_period_details : dict Dictionary with the details of the demand response event period and baseline_days output_data : pandas.DataFrame the output data for baseline calculation electricity_purchase_varnames : list List of electricity purchase variable names datetime_varname : str Name of the datetime variable in the output data day_of_adj_details : dict Dictionary with the details of the day-of-adjustment calculation (default is { "maximum": 0.4, "hours before": 4, "duration": 3 }) for PG&E's CBP DR Returns ------- day_of_adj_ratio : float Day-of-adjustment ratio """ if day_of_adj_details is None: # Default values for PG&E's CBP DR day_of_adj_details = {"maximum": 0.4, "hours before": 4, "duration": 3} day_of_adj_baseline_consumption = 0 day_of_adj_consumption = 0 doa_max = day_of_adj_details["maximum"] doa_hours_before = day_of_adj_details["hours before"] doa_duration = day_of_adj_details["duration"] baseline_days = dr_period_details["baseline_days"] event_start_dt, _ = dr_period_details["event_dts"] doa_hours_before = np.timedelta64(doa_hours_before - 1, "h") doa_duration = np.timedelta64(doa_duration - 1, "h") for day_of_adj_hour in pd.date_range( event_start_dt - doa_hours_before, event_start_dt - doa_hours_before + doa_duration, freq="h", ): baseline_consumption = get_hourly_average_consumption( baseline_days, day_of_adj_hour.hour, output_data, electricity_purchase_varnames, datetime_varname, ) day_of_adj_baseline_consumption += baseline_consumption day_of_adj_consumption = get_hourly_average_consumption( np.array([np.datetime64(day_of_adj_hour)]), day_of_adj_hour.hour, output_data, electricity_purchase_varnames, datetime_varname, ) day_of_adj_consumption += day_of_adj_consumption day_of_adj_consumption /= 3 day_of_adj_baseline_consumption /= 3 day_of_adj_ratio = ( day_of_adj_consumption / day_of_adj_baseline_consumption if day_of_adj_baseline_consumption != 0 else 0 ) day_of_adj_ratio = min(day_of_adj_ratio, doa_max) return day_of_adj_ratio
[docs] def get_hourly_dr_event_arrays( event_start_dt, horizon_start_dt, horizon_end_dt, resolution="15m" ): """ Get the hourly demand response event arrays for the optimization data with 1 for the hours of the demand response event and 0 otherwise for each demand response event hours Parameters ---------- event_start_dt : np.datetime64 Start datetime of the demand response event event_end_dt : np.datetime64 End datetime of the demand response event horizon_start_dt : np.datetime64 Start datetime of the optimization horizon horizon_end_dt : np.datetime64 End datetime of the optimization horizon resolution : str Resolution of the optimization data Returns ------- hourly_dr_event_arrays : list of np.array List of np.array with 1 for the hours of the demand response event and 0 otherwise """ res_binsize_minutes = get_freq_binsize_minutes(resolution) n_per_hour = int(60 / res_binsize_minutes) ntsteps = int( (horizon_end_dt - horizon_start_dt) / np.timedelta64(res_binsize_minutes, "m") ) datetime = pd.DataFrame( np.array( [ horizon_start_dt + np.timedelta64(i * res_binsize_minutes, "m") for i in range(ntsteps) ] ), columns=["DateTime"], ) hourly_dr_event_arrays = np.zeros(ntsteps) event_idx = np.where(datetime["DateTime"] == event_start_dt) if len(event_idx[0]) == 0: return hourly_dr_event_arrays event_idx = event_idx[0][0] hourly_dr_event_arrays[event_idx : event_idx + n_per_hour] = 1 return hourly_dr_event_arrays
[docs] def get_dr_dates(event_details, horizon_start_dt, horizon_end_dt): """ Get the demand response event dates for the optimization horizon Parameters ---------- event_details : list of dict Dictionary with the details of the demand response event horizon_start_dt : np.datetime64 Start datetime of the optimization horizon horizon_end_dt : np.datetime64 End datetime of the optimization horizon Returns ------- dr_events_dts : dict of np.datetime64 List of np.datetime64 with the demand response event dates for the optimization horizon """ if event_details[0]["day"] is None: # If there are no demand response events return {} dr_events_dts = {} for i, event in enumerate(event_details): event_start_dt = np.datetime64( dt.datetime( event["year"], event["month"], event["day"], event["start_time"], 0, 0 ), "s", ) event_end_dt = event_start_dt + np.timedelta64(event["duration"], "h") start_dt, end_dt = get_start_end_dt( event_start_dt, event_end_dt, horizon_start_dt, horizon_end_dt ) if start_dt is not None: dr_events_dts[f"event_{i}"] = {} dr_events_dts[f"event_{i}"]["event_dts"] = np.array([start_dt, end_dt]) dr_events_dts[f"event_{i}"]["baseline_days"] = event["baseline_days"] dr_events_dts = ( combine_overlapping_dr_events(dr_events_dts) if len(dr_events_dts) > 1 else dr_events_dts ) return dr_events_dts
[docs] def combine_overlapping_dr_events(dr_events_dts): """ Sorts the events in choronological order and combines overlapping demand response (DR) events into a single event Parameters ---------- dr_events_dts : dict A dictionary where values are lists or arrays of start and end times of DR events. Returns ------- dr_events_dts: dict A dictionary where values are lists or arrays of start and end times of combined if overlapping DR events. """ # Convert dictionary values to a numpy array and sort by start times dr_dates_array = np.array( [val["event_dts"] for val in dr_events_dts.values()], dtype="datetime64" ) baseline_days = [val["baseline_days"] for val in dr_events_dts.values()] dr_dates_array = dr_dates_array[np.argsort(dr_dates_array[:, 0])] baseline_days = [baseline_days[i] for i in np.argsort(dr_dates_array[:, 0])] # Initialize the combined events array combined_events = [dr_dates_array[0]] for i in range(1, len(dr_dates_array)): if dr_dates_array[i][0] < combined_events[-1][1]: raise ValueError("Overlapping DR events are not allowed") return dr_events_dts
[docs] def get_start_end_dt(dr_start_dt, dr_end_dt, horizon_start_dt, horizon_end_dt): """ Get the start and end datetime of the demand response event within the optimization horizon Parameters ---------- dr_start_dt : np.datetime64 Start datetime of the demand response event dr_end_dt : np.datetime64 End datetime of the demand response event horizon_start_dt : np.datetime64 Start datetime of the optimization horizon horizon_end_dt : np.datetime64 End datetime of the optimization horizon Returns ------- start_dt : np.datetime64 Start datetime of the demand response event within the optimization horizon end_dt : np.datetime64 End datetime of the demand response event within the optimization horizon """ start_dt = dr_start_dt end_dt = dr_end_dt # check if the DR event is within the control horizon if dr_start_dt < horizon_start_dt: if dr_end_dt < horizon_start_dt: warn("DR event is before the control horizon") start_dt = None end_dt = None if dr_end_dt >= horizon_end_dt: warn( "DR event engulfs the control horizon. Setting start time " "and end time to start and end of horizon respectively" ) start_dt = horizon_start_dt end_dt = horizon_end_dt if horizon_start_dt <= dr_end_dt < horizon_end_dt: warn( "DR event starts before the control horizon. Setting start time " "to the start of the control horizon" ) start_dt = horizon_start_dt elif horizon_start_dt <= dr_start_dt < horizon_end_dt: if dr_end_dt >= horizon_end_dt: warn( "DR event ends past the control horizon. Setting end time " "to the end of the control horizon" ) end_dt = horizon_end_dt else: warn("DR event is after the control horizon") start_dt = None end_dt = None return start_dt, end_dt
[docs] def convert_dr_event_details(event_details): """ Converts the demand response event details to the format required by the optimizer Parameters ---------- event_details : dict Dictionary with the details of the demand response event Returns ------- event_details : dict Dictionary with the details of the demand response event in the format required by the optimizer """ event_details["baseline_days"] = np.array( [ np.datetime64(similar_weekday) for similar_weekday in event_details["baseline_days"] ], dtype="datetime64", ) event_details["day"] = int(event_details["day"]) event_details["month"] = int(event_details["month"]) event_details["year"] = int(event_details["year"]) event_details["start_time"] = int(event_details["start_time"]) event_details["duration"] = int(event_details["duration"]) return event_details
[docs] def sanitize_dr_data(dr_data): """ Sanitize the demand response data Parameters ---------- dr_data : dict Dictionary with the demand response data Returns ------- dr_data : dict Dictionary with the sanitized demand response data """ if dr_data["name"] is None: return dr_data for i, event_details in enumerate(dr_data["events detail"]): dr_data["events detail"][i] = convert_dr_event_details(event_details) return dr_data
[docs] def get_dr_baseline_dates(dr_data): """ Get the demand response baseline dates Parameters ---------- dr_data : dict Dictionary with the demand response data Returns ------- dr_baseline_dates : array of np.datetime64 np.array of np.datetime64 with the demand response baseline dates """ dr_baseline_dates = [] for event_details in dr_data["events detail"]: dr_baseline_dates.extend(event_details["baseline_days"]) return np.array(dr_baseline_dates, dtype="datetime64")