Page MenuHomec4science

mpc.py
No OneTemporary

File Metadata

Created
Thu, Jul 25, 23:17
__author__ = 'Olivier Van Cutsem'
import time
import cvxpy
import numpy as np
from building_data_management.category_management.category_config import *
from building_data_management.interface_management.generic_interfaces import EMS_INTERFACE_WINDOW
from ems_config import *
from ems_main import EnergyManagementSystem
## MPC Parameters
MPC_HORIZON = 4*60*60 # in seconds
COEFF_SOFT_CONSTRAINT = 1000
## Thermal parameters
OPEN_INTERFACE_R = 1
THERMAL_MODEL_VALUES = "THERMAL_MODEL_INTERFACE_POINTS_VALUES"
MPC_THERMAL_MODEL_INIT_TEMP = 25
class EMS_MPC(EnergyManagementSystem):
def __init__(self, in_q, out_q, gui_q):
"""
Initialize ClusteringManagementSystem object.
Read messages from the BMS interface bubble messages through a Queue
"""
super(EMS_MPC, self).__init__(in_q, out_q, gui_q)
self.model_variable = dict()
self.model_variable[THERMAL_MODEL_VALUES] = None
def update(self):
"""
TODO
"""
list_room_ids = self.building_data.room_ids_list
### ---- FORECAST estimation ----
# Time vectors
t_hor = MPC_HORIZON # time horizon, in seconds
t_step = EMS_TIME_STEP # time step, in seconds
t_cur = self.current_time
t_pack = (t_cur, t_hor, t_step) # current time [in s], horizon [in s] and time step [in s]
nb_pts_T = int(t_hor / t_step) + 1
print("Current time: {0}s - MPC nb points: {1}".format(t_cur, nb_pts_T))
## Environmental conditions
temp_pred = self.get_environment_data(EMS_ENV_DATA_TEMP, t_pack)
irr_pred = self.get_environment_data(EMS_ENV_DATA_IRR, t_pack)
lum_pred = self.get_environment_data(EMS_ENV_DATA_LUM, t_pack)
lum_factor_pred = None
price_signal = self.get_elec_price(t_pack)
## (non-controllable) Load power consumption
loads_nc = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_LOAD, EMS_CATEGORY_ENTITY_LOAD_USER_DRIVEN])
loads_def = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_LOAD, EMS_CATEGORY_ENTITY_LOAD_DEFERRABLE])
loads_list = loads_nc + loads_def
loads_power_pred = nb_pts_T * [0]
for _l in loads_list:
(_l_id, _l_obj) = _l
loads_power_pred = list(np.add(loads_power_pred, _l_obj.consumption_forecast(t_pack)))
## Local generation estimation
gen_power_pred = nb_pts_T * [0]
generation_list = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_GENERATION])
for _g in generation_list:
(_g_id, _g_obj) = _g
gen_power_pred = list(np.add(gen_power_pred, _g_obj.generation_forecast(t_pack, temp_pred, irr_pred, lum_pred)))
## Sun heat gain, per room
sun_heat_gain = dict((r_id, []) for r_id in list_room_ids)
for r_id in sun_heat_gain.keys():
if self.building_data.room(r_id).has_interface_with(neighbor=EMS_OUTSIDE_ROOM_NAME, type_interface=EMS_INTERFACE_WINDOW):
sun_heat_gain[r_id] = self.building_data.room(r_id).get_solar_heat_gain(t_pack, irr_pred)
### ---- CONTROL and MODEL VARIABLES definition ----
# --- Thermal loads
list_therm_loads = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_LOAD, EMS_CATEGORY_ENTITY_LOAD_THERM])
p_load_therm = cvxpy.Variable(len(list_therm_loads), nb_pts_T)
# --- Deferrable loads
pass
# --- Local power Storage
list_battery = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_STORAGE])
p_storage = cvxpy.Variable(len(list_battery), nb_pts_T)
energy_storage = cvxpy.Variable(len(list_battery), nb_pts_T + 1)
# --- Rooms temperature
list_room_active_simu = []
list_room_fix_simu = []
for r_id in list_room_ids:
if not(self.building_data.room(r_id).has_forecast(t_pack, EMS_COMFORT_temperature)):
list_room_active_simu.append(r_id) # Active in the simu: temperature to be simulated
else:
list_room_fix_simu.append(r_id) # Fix in the simu: temperature forecast given
# interface nodes temperature
nb_interf_nodes = self.get_nb_interface_nodes(list_room_active_simu)
temp_node = cvxpy.Variable(len(list_room_active_simu) + nb_interf_nodes, nb_pts_T + 1)
R_eps = np.diag((len(list_room_active_simu) + nb_interf_nodes) * [COEFF_SOFT_CONSTRAINT])
eps_temp = cvxpy.Variable(len(list_room_active_simu), nb_pts_T)
# --- Total power asked to the grid
grid_power = cvxpy.Variable(1, nb_pts_T)
### ---- CONSTRAINTS ----
constr = []
# -- 1) TIME INDEPENDENT
# THERMAL LOADS
for i in range(len(list_therm_loads)):
(therm_load_id, therm_load_obj) = list_therm_loads[i]
constr += [p_load_therm[i, :] <= therm_load_obj.max_power]
constr += [p_load_therm[i, :] >= 0]
# BATTERY power
for i in range(len(list_battery)):
(batt_id, batt_obj) = list_battery[i]
constr += [p_storage[i, :] <= batt_obj.max_power]
constr += [p_storage[i, :] >= batt_obj.min_power]
constr += [energy_storage[i, 1:-1] <= batt_obj.capacity]
constr += [energy_storage[i, 1:-1] >= 0]
constr += [energy_storage[i, 0] == (batt_obj.current_soc * batt_obj.capacity)]
# -- 2) TIME DEPENDENT
for t in range(1, nb_pts_T+1):
t_abs = t_cur + t * t_step
# ROOM TEMPERATURE
i = 0
for r_id in list_room_active_simu:
temp_constr = self.building_data.room(r_id).get_comfort_constraint(EMS_COMFORT_temperature) # time dependent
if temp_constr is not None:
constr += [temp_node[i, t] <= temp_constr.get_max(t_abs) + eps_temp[i, t-1]]
constr += [temp_node[i, t] >= temp_constr.get_min(t_abs) - eps_temp[i, t-1]]
constr += [eps_temp[i, t-1] >= 0]
i += 1
# Grid purchase consumption definition
for t in range(nb_pts_T):
constr += [grid_power[0, t] >= 0]
constr += [sum(p_storage[:, t]) == grid_power[0, t] - sum(p_load_therm[:, t]) - gen_power_pred[t]] # - loads_power_pred[t]
### ---- MODEL EQUATIONS ----
nb_active_rooms = len(list_room_active_simu)
nb_fix_rooms = len(list_room_fix_simu)
size_x = nb_active_rooms + nb_interf_nodes # #room + #nodes-interface
temp_0_var = size_x * [MPC_THERMAL_MODEL_INIT_TEMP]
# Initial condition of the thermal model
idx = 0
for r_id in list_room_active_simu: # Room temperature points
temp_0_var[idx] = self.building_data.room(r_id).get_comfort_value(EMS_COMFORT_temperature)
idx += 1
# Interface temperature intermediate points = the simulation from last iteration
if self.model_variable[THERMAL_MODEL_VALUES] is not None:
temp_0_var[nb_active_rooms:-1] = self.model_variable[THERMAL_MODEL_VALUES]
# Forecast temperature points for the horizon N
temp_fix_forecast = list()
for r_id in list_room_fix_simu:
temp_fix_forecast.append(self.building_data.room(r_id).get_forecast(EMS_COMFORT_temperature, t_pack))
# Thermal simulation RC matrix
aa, pp = self.get_thermal_matrix(t_step, list_room_active_simu, list_room_fix_simu, nb_interf_nodes)
print(aa)
print(pp)
# heat gain due to the HVACs
bb = np.zeros((size_x, len(list_therm_loads)))
heat_gain_vect = []
for i in range(size_x):
heat_gain_vect.append(nb_pts_T * [0])
idx_th = 0
for (l_id, l_obj) in list_therm_loads:
r_id = self.get_thermal_load_room(l_id)
if r_id is None or r_id not in list_room_active_simu: # Only the HVACs in simulated room make sense
continue
# Put the right variable in the vector "u"
idx_r = list_room_active_simu.index(r_id)
bb[idx_th][idx_r] = l_obj.thermal_efficiency / self.building_data.room(r_id).thermal_capa # Get the thermal efficiency (elec -> therm heat) from the load object and place it at the right place
idx_th += 1
# heat gain due to the sun
dd = np.eye(size_x)
idx_r = 0
for r_id in list_room_active_simu:
if self.building_data.room(r_id).has_interface_with(neighbor=EMS_OUTSIDE_ROOM_NAME, type_interface=EMS_INTERFACE_WINDOW):
heat_gain_vect[idx_r] = self.building_data.room(r_id).get_solar_heat_gain(t_pack, irr_pred)
dd[idx_r, idx_r] /= self.building_data.room(r_id).thermal_capa
# Equations
constr += [temp_node[:, 0] == temp_0_var] # load the initial temperature values from the measurements
for t in range(0, nb_pts_T):
t_abs = t_cur + t * t_step
# --- Thermal model equations
# Room temp unknown, room temp forecast and interface node temp
d = list(heat_gain_vect[i][t] for i in range(len(heat_gain_vect))) # sun disturbance
p = list(temp_fix_forecast[i][t] for i in range(len(temp_fix_forecast))) # forecast temp. influence
if bb.size > 0:
u_term = bb * p_load_therm[:, t]
else:
u_term = np.zeros((size_x, 1))
constr += [temp_node[:, t + 1] == aa * temp_node[:, t] + u_term + np.matmul(dd, d) + np.matmul(pp, p)]
# --- Batteries energy equations
for i in range(len(list_battery)):
(bat_id, bat_obj) = list_battery[i]
constr += [energy_storage[i, t + 1] == t_step * (1-bat_obj.leak_coeff) * energy_storage[i, t] + bat_obj.efficiency * t_step * p_storage[i, t]]
### ---- Objective
obj_fct = 0.0
for t in range(nb_pts_T):
obj_fct += price_signal[t] * grid_power[0, t] + cvxpy.sum_squares(R_eps * eps_temp[:, t])
# --- MPC step solve --- #
mpc_build_prob = cvxpy.Problem(cvxpy.Minimize(obj_fct), constr)
start = time.time()
mpc_build_prob.solve(verbose=False) # solver=cvxpy.CVXOPT
elapsed_time = time.time() - start
print("MPC calculation time: {0} [sec]".format(elapsed_time))
print("Problem status: {0}".format(mpc_build_prob.status))
list_commands = []
if mpc_build_prob.status == cvxpy.OPTIMAL or mpc_build_prob.status == cvxpy.OPTIMAL_INACCURATE:
# The wall temperatures are simulated: store them for the next iteration
self.model_variable[THERMAL_MODEL_VALUES] = temp_node.value[nb_active_rooms:-1, 1]
print('Temperatures:')
print(temp_node.value[:, :])
print('Soft constraints slack variables:')
print(eps_temp.value[:, :])
### Generate commands:
# Storage systems
s_idx = 0
for (s_id, s_obj) in list_battery:
list_commands.append(self.battery_set_point(s_id, p_storage.value[s_idx, 0]))
print("{0}: {1}W".format(s_id, p_storage.value[s_idx, 0]))
s_idx += 1
# HVAC systems
hvac_idx = 0
for (l_id, l_obj) in list_therm_loads:
new_p = p_load_therm.value[hvac_idx, 0]
hvac_max_p = l_obj.max_power
list_commands.append(self.hvac_set_point(l_id, new_p/hvac_max_p))
print("{0}: {1}W".format(l_id, p_load_therm.value[hvac_idx, 0]))
hvac_idx += 1
return list_commands
def get_thermal_matrix(self, t_step, list_active_room, list_fix_room, nb_interf_node):
nb_active_rooms = len(list_active_room)
size_x = nb_interf_node + nb_active_rooms
mat_state = np.diag(size_x * [1.0]) # A matrix for RC model, initialized with 1 on the diagonal
mat_fix_tmp = np.zeros((size_x, len(list_fix_room)), dtype=np.float) # P matrix for forecast temp
# For matrix creation purpose: link 2 rooms to their interface
room_link_already_visited = dict()
for r_id in list_active_room:
room_link_already_visited[r_id] = list()
k = nb_active_rooms # the index pointing to the right interface temperature node
for i in range(nb_active_rooms):
r_id = list_active_room[i]
c_room = self.building_data.room(r_id).thermal_capa
print(c_room)
# Loop over all the interfaces this room has with other rooms
list_room_interface = self.building_data.room(r_id).interfaces
for n_id in list_room_interface.keys():
# This interface has already been handled ?
if n_id in list_active_room and r_id in room_link_already_visited[n_id]:
continue
else:
room_link_already_visited[r_id].append(n_id)
# The general case is to consider multiple interface for each neighbor
interf_list = list_room_interface[n_id]
if type(interf_list) is not list:
interf_list = [interf_list]
# Go through all the interface objects this room r_id has with the neighbor room n_id
for i_obj in interf_list:
# RC parameters
rc_model = i_obj.thermalProp
r_list = rc_model.resistiveParameters
c_list = rc_model.capacitiveParam
if i_obj.isOpen:
r_list = [OPEN_INTERFACE_R]
c_list = []
# Normal case: multiple intermediate temperature points
if len(c_list) > 0:
# ROOM r_id
coeff_room = t_step/(r_list[0]*c_room)
mat_state[i][i] -= coeff_room
mat_state[i][k] += coeff_room
# ROOM n_id
if n_id in list_active_room:
idx_n = list_active_room.index(n_id)
dk = rc_model.orderModel-1
coeff_room = t_step/(r_list[dk]*self.building_data.room(n_id).thermal_capa)
mat_state[idx_n][idx_n] -= coeff_room
mat_state[idx_n][k+dk] += coeff_room
for j in range(rc_model.orderModel):
idx1 = k-1
idx2 = k+1
# Update the influence of room n_id temperature
if j == 0:
idx1 = i
mat_state[k][k] -= t_step * (1.0 / (r_list[j]*c_list[j]) + 1.0 / (r_list[j+1]*c_list[j]))
mat_state[k][idx1] = t_step/(r_list[j]*c_list[j])
# Update the influence of room n_id temperature
if j == rc_model.orderModel-1:
if n_id in list_active_room:
idx2 = list_active_room.index(n_id)
else:
idx2 = list_fix_room.index(n_id)
if n_id in list_fix_room and j == rc_model.orderModel-1: # Take the fixed temperature in this case
mat_fix_tmp[k][idx2] = t_step/(r_list[j+1]*c_list[j])
else:
mat_state[k][idx2] = t_step/(r_list[j+1]*c_list[j])
k += 1 # INCREMENT THE INDEX OF THE INTERFACE TEMPERATURE POINTS
else: # Particular case: direct link between two rooms via a resistance
r = r_list[0]
coeff1 = t_step/(r*c_room)
mat_state[i][i] -= coeff1 # neighbor effect on r_id
if n_id in list_active_room: # Interaction with another temperature point to simulate
coeff2 = t_step/(r*self.building_data.room(n_id).thermal_capa)
n_index = list_active_room.index(n_id)
mat_state[i][n_index] += coeff1 # neighbor effect on r_id
mat_state[n_index][n_index] -= coeff2 # effect of r_id on the neighbor
mat_state[n_index][i] += coeff2 # effect of r_id on the neighbor
else: # Interaction with a fixed temperature: don't
n_fix_index = list_fix_room.index(n_id)
mat_fix_tmp[i][n_fix_index] += coeff1
return mat_state, mat_fix_tmp
def get_nb_interface_nodes(self, active_rooms):
x = 0
already_visited = list()
for r_id in active_rooms:
interf = self.building_data.room(r_id).interfaces
for n_id in interf.keys():
# Skip this link if n_id -> r_id has already been taken into account
if n_id in already_visited:
continue
i_obj = interf[n_id]
if type(i_obj) is list:
for ii_obj in i_obj:
x += ii_obj.thermalProp.orderModel
else:
x += i_obj.thermalProp.orderModel
already_visited.append(r_id)
return x
def get_thermal_load_room(self, th_l_id):
for r_id in self.building_data.room_ids_list:
print(self.building_data.room(r_id).get_comfort_loads(EMS_COMFORT_temperature))
if th_l_id in self.building_data.room(r_id).get_comfort_loads(EMS_COMFORT_temperature):
return r_id
return None

Event Timeline