Page MenuHomec4science

bms_util.py
No OneTemporary

File Metadata

Created
Wed, Jun 19, 00:50

bms_util.py

__author__ = 'Olivier Van Cutsem'
from math import sqrt
import numpy
from ve_util import statDistribution
VE_PARAM_LP_MODEL = 'LP_model'
VE_PARAM_LP_ACTIVITY = 'LP_activity'
VE_PARAM_LP_ACTIVITY_LIST = 'LP_activity_list'
VE_PARAM_LP_ACTIVITY_PERIOD = 'LP_activity_period'
VE_PARAM_LP_STATES = 'LP_states'
VE_PARAM_LP_STATES_DEFAULT = 'state_default'
VE_PARAM_LP_STATES_LIST = 'state_list'
VE_PARAM_LP_MODES = 'LP_modes'
VE_PARAM_LP_SEQUENCES = 'LP_seq'
VE_PARAM_LP_SEQUENCES_LIST = 'LP_seq_list'
VE_PARAM_LP_SEQUENCES_TYPE = 'LP_seq_type'
VE_PARAM_LP_SEQUENCES_TYPE_PREDEF = 'LP_seq_predefined'
VE_PARAM_LP_SEQUENCES_TYPE_STOCHA = 'LP_seq_stochastic'
VE_PARAM_LP_TRANSITION = 'LP_trans'
class loadProfile:
def __init__(self, l_param):
# User activity
if VE_PARAM_LP_ACTIVITY in l_param and l_param[VE_PARAM_LP_ACTIVITY] is not None:
self._activities_sequence = activitySequence(l_param[VE_PARAM_LP_ACTIVITY][VE_PARAM_LP_ACTIVITY_LIST],
l_param[VE_PARAM_LP_ACTIVITY][VE_PARAM_LP_ACTIVITY_PERIOD])
else:
self._activities_sequence = activitySequence([], 0)
# Internal description
self._states = {}
if VE_PARAM_LP_STATES in l_param:
self._states = dict(l_param[VE_PARAM_LP_STATES])
# List of modes the load can follow
self._modes = {}
if VE_PARAM_LP_MODES in l_param:
for m in l_param[VE_PARAM_LP_MODES]: # l_param[VE_PARAM_LP_MODES] is a list
self._modes[m["name"]] = entityMode(m["name"], m["p_min"], m["p_max"], [m["stat_distr_type"], m["stat_distr_param"]])
# Sequences per state
self._modes_sequence = {}
if VE_PARAM_LP_SEQUENCES in l_param and l_param[VE_PARAM_LP_SEQUENCES] is not None:
for s in l_param[VE_PARAM_LP_SEQUENCES]:
seq = l_param[VE_PARAM_LP_SEQUENCES][s]
self._modes_sequence[s] = {}
self._modes_sequence[s][s] = modeSequence(seq[VE_PARAM_LP_SEQUENCES_LIST], seq[VE_PARAM_LP_SEQUENCES_TYPE])
# each state must have a sequence:
if len(self._states.keys()) > 0:
for s in self._states[VE_PARAM_LP_STATES_LIST]:
if s not in self._modes_sequence:
self._modes_sequence[s][s] = {}
# Sequences for each possible transition state
if VE_PARAM_LP_TRANSITION in l_param and l_param[VE_PARAM_LP_TRANSITION] is not None:
for s_start in l_param[VE_PARAM_LP_TRANSITION]:
for s_end in l_param[VE_PARAM_LP_TRANSITION][s_start]:
self._modes_sequence[s_start][s_end] = modeSequence(l_param[VE_PARAM_LP_TRANSITION][s_start][s_end],
VE_PARAM_LP_SEQUENCES_TYPE_PREDEF)
@property
def activity(self):
return self._activities_sequence
@property
def modes(self):
return self._modes
@property
def states(self):
return self._states
@property
def modes_sequence(self):
return self._modes_sequence
def addActivity(self, a):
self._activities_sequence.insertActivity(a)
def addMode(self, m):
self._modes[m.name] = m
def addModeInSequence(self, m, index):
self._modes[m.name] = m
def max_power(self):
max_p = 0
for _m in self._modes.values():
max_p = max(max_p, _m.max)
return max_p
class activitySequence:
def __init__(self, l_param_activities, duration):
self._seq_activities = []
self.dur = duration
for param_activities in l_param_activities:
start_state = param_activities["start_state"]
t_start = statDistribution(param_activities["t_start_stat_model_type"], param_activities["t_start_stat_model_param"])
stop_state = param_activities["stop_state"]
dur = statDistribution(param_activities["dur_stat_model_type"], param_activities["dur_stat_model_param"])
prob = param_activities["proba"]
param = param_activities["param"]
self.insertActivity(entityActivity(start_state, t_start, stop_state, dur, prob, param))
def __iter__(self):
return self._seq_activities
@property
def seq_activities(self):
return self._seq_activities
@property
def size(self):
return len(self._seq_activities)
@property
def period(self):
return self.dur
# list sorted by t_start.mean
def insertActivity(self, new_a):
i = 0
for a in self._seq_activities:
if new_a.start_time < a.start_time:
break
i += 1
self._seq_activities.insert(i, new_a)
class entityActivity:
def __init__(self, start_state, start_time, stop_state, duration, proba, param):
self._states = {"START": start_state, "STOP": stop_state}
self._start_time = start_time # starting time distribution
self._duration = duration # activity duration distribution
self._proba = proba # activity duration distribution
self._param = param # activity param
@property
def start_time(self):
return self._start_time
@start_time.setter
def start_time(self, t):
self._start_time = t
@property
def duration(self):
return self._duration
@duration.setter
def duration(self, d):
self._duration = d
@property
def proba(self):
return self._proba
@proba.setter
def proba(self, p):
self._proba = p
@property
def param(self):
return self._param
@param.setter
def param(self, p):
self._param = p
@property
def states(self):
return self._states
@states.setter
def states(self, s):
self._states = s
class modeSequence:
label_index = {"label": 0, "mode": 1, "duration": 2, "proba": 3}
def __init__(self, l_param_mode, type_seq):
self._seq_len = 0
self._isDeterministic = False
if type_seq == VE_PARAM_LP_SEQUENCES_TYPE_PREDEF:
self._isDeterministic = True
self._seq_modes = []
for param_mode in l_param_mode:
if type_seq == VE_PARAM_LP_SEQUENCES_TYPE_PREDEF:
atom_label = param_mode["label"]
else:
atom_label = param_mode["mode"]
atom_mode = param_mode["mode"]
dur_distr = statDistribution(param_mode["dur_stat_model_type"], param_mode["dur_stat_model_param"])
proba = param_mode["proba"]
self.addModeInSequence(self._seq_len, atom_label, atom_mode, dur_distr, proba)
@property
def sequence(self):
return self._seq_modes
@property
def isDeterministic(self):
return self._isDeterministic
@property
def size(self):
return self._seq_len
def get_cumulative_distr(self):
sum = 0
acc_proba = []
for i in range(0, self._seq_len):
p = self.getProbaInSequence(i)
sum += p
acc_proba.append(sum)
return acc_proba
def select_next_mode(self, current_index):
""" return None if last mode """
if type(current_index) is not int:
current_index = self.getIndex(current_index)
if self.isDeterministic: # deterministic: follow the order, with some proba to skip some
new_mode = current_index + 1
r = numpy.random.random_sample()
while new_mode < self._seq_len and self.getProbaInSequence(new_mode) < r:
new_mode += 1
if new_mode == self._seq_len:
return None
else:
return new_mode
else: # stochastic: follow the cumulative distribution
acc_proba = self.get_cumulative_distr()
r = numpy.random.random_sample()
for i in range(0, len(acc_proba)):
if acc_proba[i] > r:
return i
def getIndex(self, mode_label):
i = 0
for mode in self._seq_modes:
if mode[self.label_index["label"]] == mode_label:
return i
i += 1
return None
def setProbaInSequence(self, label, proba):
self.setValue("proba", label, proba)
def setDurationInSequence(self, label, d):
self.setValue("duration", label, d)
def setValue(self, prop, key, v):
i = key
if key is not int:
i = self.getIndex(key)
if i is None:
return KeyError
self._seq_modes[i][self.label_index[prop]] = v
def getProbaInSequence(self, key):
return self.getValue("proba", key)
def getDurationInSequence(self, key):
return self.getValue("duration", key)
def getModeInSequence(self, key):
return self.getValue("mode", key)
def getLabelInSequence(self, key):
return self.getValue("label", key)
def getValue(self, prop, key):
i = key
if type(key) is not int: # key is therefore the label of the sequence
i = self.getIndex(key)
if i is None or i >= self._seq_len:
return KeyError
return self._seq_modes[i][self.label_index[prop]]
def addModeInSequence(self, index, label, mode, dur, p):
if index < 0:
self._seq_modes.append([label, mode, dur, p])
else:
self._seq_modes.insert(index, [label, mode, dur, p])
self._seq_len += 1
class entityMode:
def __init__(self, name, pmin, pmax, statParam):
# Static: min max
self.label = name
self.modeBounds = [pmin, pmax]
# Dynamic
self._statDist = statDistribution(statParam[0], statParam[1])
@property
def name(self):
return self.label
@property
def bounds(self):
return self.modeBounds
@property
def max(self):
return self.modeBounds[1]
@property
def min(self):
return self.modeBounds[0]
@property
def statDist(self):
return self._statDist
def setStatDistribution(self, type, param):
self._statDist = statDistribution(type, param)
#################################################################
############ TOOL TO GENERATE A LOAD PROFILE ####################
#################################################################
class distributionSample:
def __init__(self, l, precision):
self.prec = precision
self._n = len(l)
self._sample = {}
self.parseList(l)
# Init values
self._change_mean = True
self._mean = self.mean
self._change_std = True
self._std_dev = self.mean
def addValue(self, v):
self.appendVal(v)
self._n += 1
self._change_mean = True
self._change_std = True
def appendVal(self, e):
e = round(e, self.prec)
if e in self._sample:
self._sample[e] += 1
else:
self._sample[e] = 1
def getStatDistribution(self, t):
if t not in statDistribution.modelsList:
t = statDistribution.modelsList[0]
if t == statDistribution.modelsList[0]:
return statDistribution(t, [self.mean, self.std_dev])
if t == statDistribution.modelsList[1]:
return statDistribution(t, [min(self._sample), max(self._sample)])
@property
def size(self):
return self._n
@property
def sample(self):
l = []
for e in self._sample:
for i in range(self._sample[e]):
l.append(e)
return l
@property
def mean(self):
if self._change_mean:
self._change_mean = False
if self._n > 0:
self._mean = sum(x*self._sample[x] for x in self._sample) / float(self._n)
else:
self._mean = 0
return self._mean
@property
def std_dev(self):
if self._change_std:
self._change_std = False
if self._n > 1:
self._std_dev = sqrt(sum(self._sample[x]*(x - self.mean)*(x - self.mean) for x in self._sample) / float(self._n - 1))
else:
self._std_dev = 0
return self._std_dev
def parseList(self, l):
for e in l:
self.appendVal(e)

Event Timeline