Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F91058539
bms_util.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Thu, Nov 7, 10:31
Size
13 KB
Mime Type
text/x-python
Expires
Sat, Nov 9, 10:31 (2 d)
Engine
blob
Format
Raw Data
Handle
22187804
Attached To
R3852 EMS for Smart-Building
bms_util.py
View Options
__author__
=
'Olivier Van Cutsem'
from
math
import
sqrt
import
numpy
VE_PARAM_LP_MODEL
=
'LP_model'
VE_PARAM_LP_ACTIVITY
=
'LP_activity'
VE_PARAM_LP_ACTIVITY_LIST
=
'LP_activity_list'
VE_PARAM_LP_ACTIVITY_PERIOD
=
'LP_activity_period'
VE_PARAM_LP_STATES
=
'LP_states'
VE_PARAM_LP_STATES_DEFAULT
=
'state_default'
VE_PARAM_LP_STATES_LIST
=
'state_list'
VE_PARAM_LP_MODES
=
'LP_modes'
VE_PARAM_LP_SEQUENCES
=
'LP_seq'
VE_PARAM_LP_SEQUENCES_LIST
=
'LP_seq_list'
VE_PARAM_LP_SEQUENCES_TYPE
=
'LP_seq_type'
VE_PARAM_LP_SEQUENCES_TYPE_PREDEF
=
'LP_seq_predefined'
VE_PARAM_LP_SEQUENCES_TYPE_STOCHA
=
'LP_seq_stochastic'
VE_PARAM_LP_TRANSITION
=
'LP_trans'
class
loadProfile
:
def
__init__
(
self
,
l_param
):
# User activity
if
VE_PARAM_LP_ACTIVITY
in
l_param
and
l_param
[
VE_PARAM_LP_ACTIVITY
]
is
not
None
:
self
.
_activities_sequence
=
activitySequence
(
l_param
[
VE_PARAM_LP_ACTIVITY
][
VE_PARAM_LP_ACTIVITY_LIST
],
l_param
[
VE_PARAM_LP_ACTIVITY
][
VE_PARAM_LP_ACTIVITY_PERIOD
])
else
:
self
.
_activities_sequence
=
activitySequence
([],
0
)
# Internal description
self
.
_states
=
{}
if
VE_PARAM_LP_STATES
in
l_param
:
self
.
_states
=
dict
(
l_param
[
VE_PARAM_LP_STATES
])
# List of modes the load can follow
self
.
_modes
=
{}
if
VE_PARAM_LP_MODES
in
l_param
:
for
m
in
l_param
[
VE_PARAM_LP_MODES
]:
# l_param[VE_PARAM_LP_MODES] is a list
self
.
_modes
[
m
[
"name"
]]
=
entityMode
(
m
[
"name"
],
m
[
"p_min"
],
m
[
"p_max"
],
[
m
[
"stat_distr_type"
],
m
[
"stat_distr_param"
]])
# Sequences per state
self
.
_modes_sequence
=
{}
if
VE_PARAM_LP_SEQUENCES
in
l_param
and
l_param
[
VE_PARAM_LP_SEQUENCES
]
is
not
None
:
for
s
in
l_param
[
VE_PARAM_LP_SEQUENCES
]:
seq
=
l_param
[
VE_PARAM_LP_SEQUENCES
][
s
]
self
.
_modes_sequence
[
s
]
=
{}
self
.
_modes_sequence
[
s
][
s
]
=
modeSequence
(
seq
[
VE_PARAM_LP_SEQUENCES_LIST
],
seq
[
VE_PARAM_LP_SEQUENCES_TYPE
])
# each state must have a sequence:
if
len
(
self
.
_states
.
keys
())
>
0
:
for
s
in
self
.
_states
[
VE_PARAM_LP_STATES_LIST
]:
if
s
not
in
self
.
_modes_sequence
:
self
.
_modes_sequence
[
s
][
s
]
=
{}
# Sequences for each possible transition state
if
VE_PARAM_LP_TRANSITION
in
l_param
and
l_param
[
VE_PARAM_LP_TRANSITION
]
is
not
None
:
for
s_start
in
l_param
[
VE_PARAM_LP_TRANSITION
]:
for
s_end
in
l_param
[
VE_PARAM_LP_TRANSITION
][
s_start
]:
self
.
_modes_sequence
[
s_start
][
s_end
]
=
modeSequence
(
l_param
[
VE_PARAM_LP_TRANSITION
][
s_start
][
s_end
],
VE_PARAM_LP_SEQUENCES_TYPE_PREDEF
)
@property
def
activity
(
self
):
return
self
.
_activities_sequence
@property
def
modes
(
self
):
return
self
.
_modes
@property
def
states
(
self
):
return
self
.
_states
@property
def
modes_sequence
(
self
):
return
self
.
_modes_sequence
def
addActivity
(
self
,
a
):
self
.
_activities_sequence
.
insertActivity
(
a
)
def
addMode
(
self
,
m
):
self
.
_modes
[
m
.
name
]
=
m
def
addModeInSequence
(
self
,
m
,
index
):
self
.
_modes
[
m
.
name
]
=
m
@property
def
max_power
(
self
):
max_p
=
0
for
_m
in
self
.
_modes
.
values
():
max_p
=
max
(
max_p
,
_m
.
max
)
return
max_p
class
activitySequence
:
def
__init__
(
self
,
l_param_activities
,
duration
):
self
.
_seq_activities
=
[]
self
.
dur
=
duration
for
param_activities
in
l_param_activities
:
start_state
=
param_activities
[
"start_state"
]
t_start
=
statDistribution
(
param_activities
[
"t_start_stat_model_type"
],
param_activities
[
"t_start_stat_model_param"
])
stop_state
=
param_activities
[
"stop_state"
]
dur
=
statDistribution
(
param_activities
[
"dur_stat_model_type"
],
param_activities
[
"dur_stat_model_param"
])
prob
=
param_activities
[
"proba"
]
param
=
param_activities
[
"param"
]
self
.
insertActivity
(
entityActivity
(
start_state
,
t_start
,
stop_state
,
dur
,
prob
,
param
))
def
__iter__
(
self
):
return
self
.
_seq_activities
@property
def
seq_activities
(
self
):
return
self
.
_seq_activities
@property
def
size
(
self
):
return
len
(
self
.
_seq_activities
)
@property
def
period
(
self
):
return
self
.
dur
# list sorted by t_start.mean
def
insertActivity
(
self
,
new_a
):
i
=
0
for
a
in
self
.
_seq_activities
:
if
new_a
.
start_time
<
a
.
start_time
:
break
i
+=
1
self
.
_seq_activities
.
insert
(
i
,
new_a
)
class
entityActivity
:
def
__init__
(
self
,
start_state
,
start_time
,
stop_state
,
duration
,
proba
,
param
):
self
.
_states
=
{
"START"
:
start_state
,
"STOP"
:
stop_state
}
self
.
_start_time
=
start_time
# starting time distribution
self
.
_duration
=
duration
# activity duration distribution
self
.
_proba
=
proba
# activity duration distribution
self
.
_param
=
param
# activity param
@property
def
start_time
(
self
):
return
self
.
_start_time
@start_time.setter
def
start_time
(
self
,
t
):
self
.
_start_time
=
t
@property
def
duration
(
self
):
return
self
.
_duration
@duration.setter
def
duration
(
self
,
d
):
self
.
_duration
=
d
@property
def
proba
(
self
):
return
self
.
_proba
@proba.setter
def
proba
(
self
,
p
):
self
.
_proba
=
p
@property
def
param
(
self
):
return
self
.
_param
@param.setter
def
param
(
self
,
p
):
self
.
_param
=
p
@property
def
states
(
self
):
return
self
.
_states
@states.setter
def
states
(
self
,
s
):
self
.
_states
=
s
class
modeSequence
:
label_index
=
{
"label"
:
0
,
"mode"
:
1
,
"duration"
:
2
,
"proba"
:
3
}
def
__init__
(
self
,
l_param_mode
,
type_seq
):
self
.
_seq_len
=
0
self
.
_isDeterministic
=
False
if
type_seq
==
VE_PARAM_LP_SEQUENCES_TYPE_PREDEF
:
self
.
_isDeterministic
=
True
self
.
_seq_modes
=
[]
for
param_mode
in
l_param_mode
:
if
type_seq
==
VE_PARAM_LP_SEQUENCES_TYPE_PREDEF
:
atom_label
=
param_mode
[
"label"
]
else
:
atom_label
=
param_mode
[
"mode"
]
atom_mode
=
param_mode
[
"mode"
]
dur_distr
=
statDistribution
(
param_mode
[
"dur_stat_model_type"
],
param_mode
[
"dur_stat_model_param"
])
proba
=
param_mode
[
"proba"
]
self
.
addModeInSequence
(
self
.
_seq_len
,
atom_label
,
atom_mode
,
dur_distr
,
proba
)
@property
def
sequence
(
self
):
return
self
.
_seq_modes
@property
def
isDeterministic
(
self
):
return
self
.
_isDeterministic
@property
def
size
(
self
):
return
self
.
_seq_len
def
get_cumulative_distr
(
self
):
sum
=
0
acc_proba
=
[]
for
i
in
range
(
0
,
self
.
_seq_len
):
p
=
self
.
getProbaInSequence
(
i
)
sum
+=
p
acc_proba
.
append
(
sum
)
return
acc_proba
def
select_next_mode
(
self
,
current_index
):
""" return None if last mode """
if
type
(
current_index
)
is
not
int
:
current_index
=
self
.
getIndex
(
current_index
)
if
self
.
isDeterministic
:
# deterministic: follow the order, with some proba to skip some
new_mode
=
current_index
+
1
r
=
numpy
.
random
.
random_sample
()
while
new_mode
<
self
.
_seq_len
and
self
.
getProbaInSequence
(
new_mode
)
<
r
:
new_mode
+=
1
if
new_mode
==
self
.
_seq_len
:
return
None
else
:
return
new_mode
else
:
# stochastic: follow the cumulative distribution
acc_proba
=
self
.
get_cumulative_distr
()
r
=
numpy
.
random
.
random_sample
()
for
i
in
range
(
0
,
len
(
acc_proba
)):
if
acc_proba
[
i
]
>
r
:
return
i
def
getIndex
(
self
,
mode_label
):
i
=
0
for
mode
in
self
.
_seq_modes
:
if
mode
[
self
.
label_index
[
"label"
]]
==
mode_label
:
return
i
i
+=
1
return
None
def
setProbaInSequence
(
self
,
label
,
proba
):
self
.
setValue
(
"proba"
,
label
,
proba
)
def
setDurationInSequence
(
self
,
label
,
d
):
self
.
setValue
(
"duration"
,
label
,
d
)
def
setValue
(
self
,
prop
,
key
,
v
):
i
=
key
if
key
is
not
int
:
i
=
self
.
getIndex
(
key
)
if
i
is
None
:
return
KeyError
self
.
_seq_modes
[
i
][
self
.
label_index
[
prop
]]
=
v
def
getProbaInSequence
(
self
,
key
):
return
self
.
getValue
(
"proba"
,
key
)
def
getDurationInSequence
(
self
,
key
):
return
self
.
getValue
(
"duration"
,
key
)
def
getModeInSequence
(
self
,
key
):
return
self
.
getValue
(
"mode"
,
key
)
def
getLabelInSequence
(
self
,
key
):
return
self
.
getValue
(
"label"
,
key
)
def
getValue
(
self
,
prop
,
key
):
i
=
key
if
type
(
key
)
is
not
int
:
# key is therefore the label of the sequence
i
=
self
.
getIndex
(
key
)
if
i
is
None
or
i
>=
self
.
_seq_len
:
return
KeyError
return
self
.
_seq_modes
[
i
][
self
.
label_index
[
prop
]]
def
addModeInSequence
(
self
,
index
,
label
,
mode
,
dur
,
p
):
if
index
<
0
:
self
.
_seq_modes
.
append
([
label
,
mode
,
dur
,
p
])
else
:
self
.
_seq_modes
.
insert
(
index
,
[
label
,
mode
,
dur
,
p
])
self
.
_seq_len
+=
1
class
entityMode
:
def
__init__
(
self
,
name
,
pmin
,
pmax
,
statParam
):
# Static: min max
self
.
label
=
name
self
.
modeBounds
=
[
pmin
,
pmax
]
# Dynamic
self
.
_statDist
=
statDistribution
(
statParam
[
0
],
statParam
[
1
])
@property
def
name
(
self
):
return
self
.
label
@property
def
bounds
(
self
):
return
self
.
modeBounds
@property
def
max
(
self
):
return
self
.
modeBounds
[
1
]
@property
def
min
(
self
):
return
self
.
modeBounds
[
0
]
@property
def
statDist
(
self
):
return
self
.
_statDist
def
setStatDistribution
(
self
,
type
,
param
):
self
.
_statDist
=
statDistribution
(
type
,
param
)
#################################################################
############ TOOL TO GENERATE A LOAD PROFILE ####################
#################################################################
class
statDistribution
:
modelsList
=
[
'NORM'
,
'UNIF'
]
def
__init__
(
self
,
t
,
param
):
self
.
statModel
=
t
if
t
not
in
self
.
modelsList
:
self
.
statModel
=
self
.
modelsList
[
0
]
self
.
_param
=
param
@property
def
model
(
self
):
return
self
.
statModel
@property
def
param
(
self
):
return
self
.
_param
def
generateValue
(
self
):
if
self
.
statModel
==
self
.
modelsList
[
0
]:
return
self
.
_param
[
0
]
+
self
.
_param
[
1
]
*
numpy
.
random
.
randn
()
elif
self
.
statModel
==
self
.
modelsList
[
1
]:
return
self
.
_param
[
0
]
+
(
self
.
_param
[
1
]
-
self
.
_param
[
0
])
*
numpy
.
random
.
random_sample
()
@property
def
equivalent_quantity
(
self
):
if
self
.
statModel
==
self
.
modelsList
[
0
]:
return
self
.
_param
[
0
]
elif
self
.
statModel
==
self
.
modelsList
[
1
]:
return
(
self
.
_param
[
0
]
+
self
.
_param
[
1
])
/
2
def
__le__
(
self
,
stat_distr
):
return
self
.
equivalent_quantity
<
stat_distr
.
equivalent_quantity
class
distributionSample
:
def
__init__
(
self
,
l
,
precision
):
self
.
prec
=
precision
self
.
_n
=
len
(
l
)
self
.
_sample
=
{}
self
.
parseList
(
l
)
# Init values
self
.
_change_mean
=
True
self
.
_mean
=
self
.
mean
self
.
_change_std
=
True
self
.
_std_dev
=
self
.
mean
def
addValue
(
self
,
v
):
self
.
appendVal
(
v
)
self
.
_n
+=
1
self
.
_change_mean
=
True
self
.
_change_std
=
True
def
appendVal
(
self
,
e
):
e
=
round
(
e
,
self
.
prec
)
if
e
in
self
.
_sample
:
self
.
_sample
[
e
]
+=
1
else
:
self
.
_sample
[
e
]
=
1
def
getStatDistribution
(
self
,
t
):
if
t
not
in
statDistribution
.
modelsList
:
t
=
statDistribution
.
modelsList
[
0
]
if
t
==
statDistribution
.
modelsList
[
0
]:
return
statDistribution
(
t
,
[
self
.
mean
,
self
.
std_dev
])
if
t
==
statDistribution
.
modelsList
[
1
]:
return
statDistribution
(
t
,
[
min
(
self
.
_sample
),
max
(
self
.
_sample
)])
@property
def
size
(
self
):
return
self
.
_n
@property
def
sample
(
self
):
l
=
[]
for
e
in
self
.
_sample
:
for
i
in
range
(
self
.
_sample
[
e
]):
l
.
append
(
e
)
return
l
@property
def
mean
(
self
):
if
self
.
_change_mean
:
self
.
_change_mean
=
False
if
self
.
_n
>
0
:
self
.
_mean
=
sum
(
x
*
self
.
_sample
[
x
]
for
x
in
self
.
_sample
)
/
float
(
self
.
_n
)
else
:
self
.
_mean
=
0
return
self
.
_mean
@property
def
std_dev
(
self
):
if
self
.
_change_std
:
self
.
_change_std
=
False
if
self
.
_n
>
1
:
self
.
_std_dev
=
sqrt
(
sum
(
self
.
_sample
[
x
]
*
(
x
-
self
.
mean
)
*
(
x
-
self
.
mean
)
for
x
in
self
.
_sample
)
/
float
(
self
.
_n
-
1
))
else
:
self
.
_std_dev
=
0
return
self
.
_std_dev
def
parseList
(
self
,
l
):
for
e
in
l
:
self
.
appendVal
(
e
)
Event Timeline
Log In to Comment