Page Menu
Home
c4science
Search
Configure Global Search
Log In
Files
F64467148
mpc.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Subscribers
None
File Metadata
Details
File Info
Storage
Attached
Created
Mon, May 27, 02:21
Size
15 KB
Mime Type
text/x-python
Expires
Wed, May 29, 02:21 (2 d)
Engine
blob
Format
Raw Data
Handle
17908823
Attached To
R3852 EMS for Smart-Building
mpc.py
View Options
__author__
=
'Olivier Van Cutsem'
from
emscore.ems_main
import
EnergyManagementSystem
from
building_data_management.category_management.category_config
import
*
from
ems_config
import
*
from
building_data_management.interface_management.generic_interfaces
import
EMS_INTERFACE_WINDOW
import
numpy
as
np
import
cvxpy
import
time
## MPC Parameters
MPC_HORIZON
=
2
*
60
# in minutes
## Thermal parameters
OPEN_INTERFACE_R
=
1
class
EMS_MPC
(
EnergyManagementSystem
):
def
__init__
(
self
,
in_q
,
out_q
,
gui_q
):
"""
Initialize ClusteringManagementSystem object.
Read messages from the BMS interface bubble messages through a Queue
"""
super
(
EMS_MPC
,
self
)
.
__init__
(
in_q
,
out_q
,
gui_q
)
def
update
(
self
):
"""
TODO
"""
list_room_ids
=
self
.
building_data
.
room_ids_list
### ---- FORECAST estimation ----
# Time vectors
t_hor
=
MPC_HORIZON
*
60
# time horizon, in seconds
t_step
=
EMS_TIME_STEP
# time step, in seconds
t_cur
=
self
.
current_time
t_pack
=
(
t_cur
,
t_hor
,
t_step
)
# current time [in s], horizon [in s] and time step [in s]
nb_pts_T
=
int
(
t_hor
/
t_step
)
+
1
## Environmental conditions
temp_pred
=
self
.
get_environment_data
(
EMS_ENV_DATA_TEMP
,
t_pack
)
irr_pred
=
self
.
get_environment_data
(
EMS_ENV_DATA_IRR
,
t_pack
)
lum_pred
=
self
.
get_environment_data
(
EMS_ENV_DATA_LUM
,
t_pack
)
lum_factor_pred
=
None
price_signal
=
self
.
get_elec_price
(
t_pack
)
## (non-controllable) Load power consumption
loads_nc
=
self
.
building_data
.
get_entity_list
([
EMS_CATEGORY_ENTITY_LOAD
,
EMS_CATEGORY_ENTITY_LOAD_USER_DRIVEN
])
loads_def
=
self
.
building_data
.
get_entity_list
([
EMS_CATEGORY_ENTITY_LOAD
,
EMS_CATEGORY_ENTITY_LOAD_DEFERRABLE
])
loads_list
=
loads_nc
+
loads_def
loads_power_pred
=
nb_pts_T
*
[
0
]
for
_l
in
loads_list
:
loads_power_pred
=
list
(
np
.
add
(
loads_power_pred
,
_l
.
consumption_forecast
(
t_pack
)))
## Local generation estimation
gen_power_pred
=
nb_pts_T
*
[
0
]
generation_list
=
self
.
building_data
.
get_entity_list
([
EMS_CATEGORY_ENTITY_GENERATION
])
for
_g
in
generation_list
:
gen_power_pred
=
list
(
np
.
add
(
gen_power_pred
,
_g
.
generation_forecast
(
t_pack
,
temp_pred
,
irr_pred
,
lum_pred
)))
## Sun heat gain, per room
sun_heat_gain
=
dict
((
r_id
,
[])
for
r_id
in
list_room_ids
)
for
r_id
in
sun_heat_gain
.
keys
():
if
self
.
building_data
.
room
(
r_id
)
.
has_interface_with
(
neighbor
=
EMS_OUTSIDE_ROOM_NAME
,
type_interface
=
EMS_INTERFACE_WINDOW
):
sun_heat_gain
[
r_id
]
=
self
.
building_data
.
room
(
r_id
)
.
get_solar_heat_gain
(
t_pack
,
irr_pred
)
### ---- CONTROL and MODEL VARIABLES definition ----
# --- Thermal loads
list_therm_loads
=
self
.
building_data
.
get_entity_list
([
EMS_CATEGORY_ENTITY_LOAD
,
EMS_CATEGORY_ENTITY_LOAD_THERM
])
p_load_therm
=
cvxpy
.
Variable
(
len
(
list_therm_loads
),
nb_pts_T
)
# --- Deferrable loads
pass
# --- Local power Storage
list_battery
=
self
.
building_data
.
get_entity_list
([
EMS_CATEGORY_ENTITY_STORAGE
])
p_storage
=
cvxpy
.
Variable
(
len
(
list_battery
),
nb_pts_T
)
energy_storage
=
cvxpy
.
Variable
(
len
(
list_battery
),
nb_pts_T
+
1
)
# --- Rooms temperature
list_room_active_simu
=
[]
list_room_fix_simu
=
[]
for
r_id
in
list_room_ids
:
if
not
(
self
.
building_data
.
room
(
r_id
)
.
has_forecast
(
t_pack
,
EMS_COMFORT_temperature
)):
list_room_active_simu
.
append
(
r_id
)
# Active in the simu: temperature to be simulated
else
:
list_room_fix_simu
.
append
(
r_id
)
# Fix in the simu: temperature forecast given
# interface nodes temperature
nb_interf_nodes
=
self
.
get_nb_interface_nodes
(
list_room_active_simu
)
temp_node
=
cvxpy
.
Variable
(
len
(
list_room_active_simu
)
+
nb_interf_nodes
,
nb_pts_T
+
1
)
# --- Total power asked to the grid
grid_power
=
cvxpy
.
Variable
(
1
,
nb_pts_T
)
### ---- CONSTRAINTS ----
constr
=
[]
# -- 1) TIME INDEPENDENT
# THERMAL LOADS
print
(
"THERMAL LOAD: {0} "
.
format
(
list_therm_loads
))
for
i
in
range
(
len
(
list_therm_loads
)):
print
(
"MAX POWER: {0}"
.
format
(
list_therm_loads
[
i
]
.
max_power
))
constr
+=
[
p_load_therm
[
i
,
:]
<=
list_therm_loads
[
i
]
.
max_power
]
constr
+=
[
p_load_therm
[
i
,
:]
>=
0
]
# BATTERY power
for
i
in
range
(
len
(
list_battery
)):
constr
+=
[
p_storage
[
i
,
:]
<=
list_battery
[
i
]
.
max_power
]
constr
+=
[
p_storage
[
i
,
:]
>=
list_battery
[
i
]
.
min_power
]
constr
+=
[
energy_storage
[
i
,
:]
<=
list_battery
[
i
]
.
capacity
]
constr
+=
[
energy_storage
[
i
,
:]
>=
0
]
# -- 2) TIME DEPENDENT
for
t
in
range
(
nb_pts_T
+
1
):
t_abs
=
t_cur
+
t
*
t_step
# ROOM TEMPERATURE
i
=
0
for
r_id
in
list_room_active_simu
:
temp_constr
=
self
.
building_data
.
room
(
r_id
)
.
get_comfort_constraint
(
EMS_COMFORT_temperature
)
# time dependent
if
temp_constr
is
not
None
:
constr
+=
[
temp_node
[
i
,
t
]
<=
temp_constr
.
get_max
(
t_abs
)]
constr
+=
[
temp_node
[
i
,
t
]
>=
temp_constr
.
get_min
(
t_abs
)]
i
+=
1
# Grid purchase consumption definition
for
t
in
range
(
nb_pts_T
):
constr
+=
[
grid_power
[
0
,
t
]
==
sum
(
p_load_therm
[:,
t
])
+
loads_power_pred
[
t
]
+
sum
(
p_storage
[:,
t
])
+
gen_power_pred
[
t
]]
### ---- MODEL EQUATIONS ----
nb_active_rooms
=
len
(
list_room_active_simu
)
nb_fix_rooms
=
len
(
list_room_fix_simu
)
size_x
=
nb_active_rooms
+
nb_interf_nodes
# #room + #nodes-interface
temp_0_var
=
size_x
*
[
0
]
# Unknown temperature points for the horizon N
idx
=
0
for
r_id
in
list_room_active_simu
:
temp_0_var
[
idx
]
=
self
.
building_data
.
room
(
r_id
)
.
get_comfort_value
(
EMS_COMFORT_temperature
)
idx
+=
1
# Forecast temperature points for the horizon N
temp_fix_forecast
=
list
()
for
r_id
in
list_room_fix_simu
:
temp_fix_forecast
.
append
(
self
.
building_data
.
room
(
r_id
)
.
get_forecast
(
EMS_COMFORT_temperature
,
t_pack
))
# Thermal simulation RC matrix
aa
,
pp
=
self
.
get_thermal_matrix
(
t_step
,
list_room_active_simu
,
list_room_fix_simu
,
nb_interf_nodes
)
# heat gain due to the HVACs
bb
=
np
.
zeros
((
size_x
,
len
(
list_therm_loads
)))
heat_gain_vect
=
[]
for
i
in
range
(
size_x
):
heat_gain_vect
.
append
(
nb_pts_T
*
[
0
])
idx_th
=
0
for
(
l_id
,
l_obj
)
in
list_therm_loads
:
r_id
=
self
.
get_thermal_load_room
(
l_id
)
if
r_id
is
None
or
r_id
not
in
list_room_active_simu
:
# Only the HVACs in simulated room make sense
continue
# Put the right variable in the vector "u"
idx_r
=
list_room_active_simu
.
index
(
r_id
)
bb
[
idx_th
][
idx_r
]
=
l_obj
.
thermal_efficiency
# Get the thermal efficiency (elec -> therm heat) from the load object and place it at the right place
idx_th
+=
1
# heat gain due to the sun
dd
=
np
.
eye
(
size_x
)
idx_r
=
0
for
r_id
in
list_room_active_simu
:
if
self
.
building_data
.
room
(
r_id
)
.
has_interface_with
(
neighbor
=
EMS_OUTSIDE_ROOM_NAME
,
type_interface
=
EMS_INTERFACE_WINDOW
):
heat_gain_vect
[
idx_r
]
=
self
.
building_data
.
room
(
r_id
)
.
get_solar_heat_gain
(
t_pack
,
irr_pred
)
# Equations
constr
+=
[
temp_node
[:,
0
]
==
temp_0_var
]
# load the initial temperature values from the measurements
for
t
in
range
(
0
,
nb_pts_T
):
t_abs
=
t_cur
+
t
*
t_step
# --- Thermal model equations
# Room temp unknown, room temp forecast and interface node temp
d
=
list
(
heat_gain_vect
[
i
][
t
]
for
i
in
range
(
len
(
heat_gain_vect
)))
# sun disturbance
p
=
list
(
temp_fix_forecast
[
i
][
t
]
for
i
in
range
(
len
(
temp_fix_forecast
)))
# forecast temp. influence
if
bb
.
size
>
0
:
u_term
=
bb
*
p_load_therm
[:,
t
]
else
:
u_term
=
np
.
zeros
((
size_x
,
1
))
constr
+=
[
temp_node
[:,
t
+
1
]
==
aa
*
temp_node
[:,
t
]
+
u_term
+
np
.
matmul
(
dd
,
d
)
+
np
.
matmul
(
pp
,
p
)]
# --- Batteries energy equations
for
i
in
range
(
len
(
list_battery
)):
bat
=
list_battery
[
i
]
constr
+=
[
energy_storage
[
i
,
t
+
1
]
==
t_step
*
bat
.
leak_coeff
*
energy_storage
[
i
,
t
]
+
bat
.
efficiency
*
t_step
*
p_storage
[
i
,
t
]]
### ---- Objective
obj_fct
=
0.0
for
t
in
range
(
nb_pts_T
):
obj_fct
+=
price_signal
[
t
]
*
grid_power
[
0
,
t
]
# --- MPC step solve --- #
mpc_build_prob
=
cvxpy
.
Problem
(
cvxpy
.
Minimize
(
obj_fct
),
constr
)
start
=
time
.
time
()
mpc_build_prob
.
solve
(
verbose
=
False
)
elapsed_time
=
time
.
time
()
-
start
print
(
"MPC calculation time: {0} [sec]"
.
format
(
elapsed_time
))
if
mpc_build_prob
.
status
==
cvxpy
.
OPTIMAL
:
print
(
"OK :)"
)
else
:
print
(
"PAS OK :( --- {0}"
.
format
(
mpc_build_prob
.
status
))
return
[]
def
get_thermal_matrix
(
self
,
t_step
,
list_active_room
,
list_fix_room
,
nb_interf_node
):
nb_active_rooms
=
len
(
list_active_room
)
size_x
=
nb_interf_node
+
nb_active_rooms
mat_state
=
np
.
ones
((
size_x
,
size_x
),
dtype
=
np
.
float
)
# A matrix for RC model, initialized with 1 on the diagonal
mat_fix_tmp
=
np
.
zeros
((
size_x
,
len
(
list_fix_room
)),
dtype
=
np
.
float
)
# P matrix for forecast temp
# For matrix creation purpose: link 2 rooms to their interface
room_link_already_visited
=
dict
()
for
r_id
in
list_active_room
:
room_link_already_visited
[
r_id
]
=
list
()
k
=
nb_active_rooms
# the index pointing to the right interface temperature node
for
i
in
range
(
nb_active_rooms
):
r_id
=
list_active_room
[
i
]
c_room
=
self
.
building_data
.
room
(
r_id
)
.
thermal_capa
# Loop over all the interfaces this room has with other rooms
list_room_interface
=
self
.
building_data
.
room
(
r_id
)
.
interfaces
for
n_id
in
list_room_interface
.
keys
():
# This interface has already been handled ?
if
n_id
in
list_active_room
and
r_id
in
room_link_already_visited
[
n_id
]:
continue
else
:
room_link_already_visited
[
r_id
]
.
append
(
n_id
)
# The general case is to consider multiple interface for each neighbor
interf_list
=
list_room_interface
[
n_id
]
if
type
(
interf_list
)
is
not
list
:
interf_list
=
[
interf_list
]
# Go through all the interface objects this room r_id has with the neighbor room n_id
for
i_obj
in
interf_list
:
# RC parameters
rc_model
=
i_obj
.
thermalProp
r_list
=
rc_model
.
resistiveParameters
c_list
=
rc_model
.
capacitiveParam
if
i_obj
.
isOpen
:
r_list
=
[
OPEN_INTERFACE_R
]
c_list
=
[]
# Normal case: multiple intermediate temperature points
if
len
(
c_list
)
>
0
:
# ROOM r_id
coeff_room
=
t_step
/
(
r_list
[
0
]
*
c_room
)
mat_state
[
i
][
i
]
-=
coeff_room
mat_state
[
i
][
k
]
+=
coeff_room
# ROOM n_id
if
n_id
in
list_active_room
:
idx_n
=
list_active_room
.
index
(
n_id
)
dk
=
rc_model
.
orderModel
-
1
coeff_room
=
t_step
/
(
r_list
[
dk
]
*
self
.
building_data
.
room
(
n_id
)
.
thermal_capa
)
mat_state
[
idx_n
][
idx_n
]
-=
coeff_room
mat_state
[
idx_n
][
k
+
dk
]
+=
coeff_room
for
j
in
range
(
rc_model
.
orderModel
):
idx1
=
k
-
1
idx2
=
k
+
1
# Update the influence of room n_id temperature
if
j
==
0
:
idx1
=
i
mat_state
[
k
][
k
]
-=
t_step
*
(
1.0
/
(
r_list
[
j
]
*
c_list
[
j
])
+
1.0
/
(
r_list
[
j
+
1
]
*
c_list
[
j
]))
mat_state
[
k
][
idx1
]
=
t_step
/
(
r_list
[
j
]
*
c_list
[
j
])
# Update the influence of room n_id temperature
if
j
==
rc_model
.
orderModel
-
1
:
if
n_id
in
list_active_room
:
idx2
=
list_active_room
.
index
(
n_id
)
else
:
idx2
=
list_fix_room
.
index
(
n_id
)
if
n_id
in
list_fix_room
and
j
==
rc_model
.
orderModel
-
1
:
# Take the fixed temperature in this case
mat_fix_tmp
[
k
][
idx2
]
=
t_step
/
(
r_list
[
j
+
1
]
*
c_list
[
j
])
else
:
mat_state
[
k
][
idx2
]
=
t_step
/
(
r_list
[
j
+
1
]
*
c_list
[
j
])
k
+=
1
# INCREMENT THE INDEX OF THE INTERFACE TEMPERATURE POINTS
else
:
# Particular case: direct link between two rooms via a resistance
r
=
r_list
[
0
]
coeff1
=
t_step
/
(
r
*
c_room
)
mat_state
[
i
][
i
]
-=
coeff1
# neighbor effect on r_id
if
n_id
in
list_active_room
:
# Interaction with another temperature point to simulate
coeff2
=
t_step
/
(
r
*
self
.
building_data
.
room
(
n_id
)
.
thermal_capa
)
n_index
=
list_active_room
.
index
(
n_id
)
mat_state
[
i
][
n_index
]
+=
coeff1
# neighbor effect on r_id
mat_state
[
n_index
][
n_index
]
-=
coeff2
# effect of r_id on the neighbor
mat_state
[
n_index
][
i
]
+=
coeff2
# effect of r_id on the neighbor
else
:
# Interaction with a fixed temperature: don't
n_fix_index
=
list_fix_room
.
index
(
n_id
)
mat_fix_tmp
[
i
][
n_fix_index
]
+=
coeff1
return
mat_state
,
mat_fix_tmp
def
get_nb_interface_nodes
(
self
,
active_rooms
):
x
=
0
already_visited
=
list
()
for
r_id
in
active_rooms
:
interf
=
self
.
building_data
.
room
(
r_id
)
.
interfaces
for
n_id
in
interf
.
keys
():
# Skip this link if n_id -> r_id has already been taken into account
if
n_id
in
already_visited
:
continue
i_obj
=
interf
[
n_id
]
if
type
(
i_obj
)
is
list
:
for
ii_obj
in
i_obj
:
x
+=
ii_obj
.
thermalProp
.
orderModel
else
:
x
+=
i_obj
.
thermalProp
.
orderModel
already_visited
.
append
(
r_id
)
return
x
def
get_thermal_load_room
(
self
,
th_l_id
):
for
r_id
in
self
.
building_data
.
room_ids_list
:
if
th_l_id
in
self
.
building_data
.
room
(
r_id
)
.
get_comfort_loads
(
EMS_COMFORT_temperature
):
return
r_id
return
None
Event Timeline
Log In to Comment