Page MenuHomec4science

mpc.py
No OneTemporary

File Metadata

Created
Mon, May 27, 02:21
__author__ = 'Olivier Van Cutsem'
from emscore.ems_main import EnergyManagementSystem
from building_data_management.category_management.category_config import *
from ems_config import *
from building_data_management.interface_management.generic_interfaces import EMS_INTERFACE_WINDOW
import numpy as np
import cvxpy
import time
## MPC Parameters
MPC_HORIZON = 2*60 # in minutes
## Thermal parameters
OPEN_INTERFACE_R = 1
class EMS_MPC(EnergyManagementSystem):
def __init__(self, in_q, out_q, gui_q):
"""
Initialize ClusteringManagementSystem object.
Read messages from the BMS interface bubble messages through a Queue
"""
super(EMS_MPC, self).__init__(in_q, out_q, gui_q)
def update(self):
"""
TODO
"""
list_room_ids = self.building_data.room_ids_list
### ---- FORECAST estimation ----
# Time vectors
t_hor = MPC_HORIZON * 60 # time horizon, in seconds
t_step = EMS_TIME_STEP # time step, in seconds
t_cur = self.current_time
t_pack = (t_cur, t_hor, t_step) # current time [in s], horizon [in s] and time step [in s]
nb_pts_T = int(t_hor / t_step) + 1
## Environmental conditions
temp_pred = self.get_environment_data(EMS_ENV_DATA_TEMP, t_pack)
irr_pred = self.get_environment_data(EMS_ENV_DATA_IRR, t_pack)
lum_pred = self.get_environment_data(EMS_ENV_DATA_LUM, t_pack)
lum_factor_pred = None
price_signal = self.get_elec_price(t_pack)
## (non-controllable) Load power consumption
loads_nc = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_LOAD, EMS_CATEGORY_ENTITY_LOAD_USER_DRIVEN])
loads_def = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_LOAD, EMS_CATEGORY_ENTITY_LOAD_DEFERRABLE])
loads_list = loads_nc + loads_def
loads_power_pred = nb_pts_T * [0]
for _l in loads_list:
loads_power_pred = list(np.add(loads_power_pred, _l.consumption_forecast(t_pack)))
## Local generation estimation
gen_power_pred = nb_pts_T * [0]
generation_list = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_GENERATION])
for _g in generation_list:
gen_power_pred = list(np.add(gen_power_pred, _g.generation_forecast(t_pack, temp_pred, irr_pred, lum_pred)))
## Sun heat gain, per room
sun_heat_gain = dict((r_id, []) for r_id in list_room_ids)
for r_id in sun_heat_gain.keys():
if self.building_data.room(r_id).has_interface_with(neighbor=EMS_OUTSIDE_ROOM_NAME, type_interface=EMS_INTERFACE_WINDOW):
sun_heat_gain[r_id] = self.building_data.room(r_id).get_solar_heat_gain(t_pack, irr_pred)
### ---- CONTROL and MODEL VARIABLES definition ----
# --- Thermal loads
list_therm_loads = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_LOAD, EMS_CATEGORY_ENTITY_LOAD_THERM])
p_load_therm = cvxpy.Variable(len(list_therm_loads), nb_pts_T)
# --- Deferrable loads
pass
# --- Local power Storage
list_battery = self.building_data.get_entity_list([EMS_CATEGORY_ENTITY_STORAGE])
p_storage = cvxpy.Variable(len(list_battery), nb_pts_T)
energy_storage = cvxpy.Variable(len(list_battery), nb_pts_T + 1)
# --- Rooms temperature
list_room_active_simu = []
list_room_fix_simu = []
for r_id in list_room_ids:
if not(self.building_data.room(r_id).has_forecast(t_pack, EMS_COMFORT_temperature)):
list_room_active_simu.append(r_id) # Active in the simu: temperature to be simulated
else:
list_room_fix_simu.append(r_id) # Fix in the simu: temperature forecast given
# interface nodes temperature
nb_interf_nodes = self.get_nb_interface_nodes(list_room_active_simu)
temp_node = cvxpy.Variable(len(list_room_active_simu) + nb_interf_nodes, nb_pts_T + 1)
# --- Total power asked to the grid
grid_power = cvxpy.Variable(1, nb_pts_T)
### ---- CONSTRAINTS ----
constr = []
# -- 1) TIME INDEPENDENT
# THERMAL LOADS
print("THERMAL LOAD: {0} ".format(list_therm_loads))
for i in range(len(list_therm_loads)):
print("MAX POWER: {0}".format(list_therm_loads[i].max_power))
constr += [p_load_therm[i, :] <= list_therm_loads[i].max_power]
constr += [p_load_therm[i, :] >= 0]
# BATTERY power
for i in range(len(list_battery)):
constr += [p_storage[i, :] <= list_battery[i].max_power]
constr += [p_storage[i, :] >= list_battery[i].min_power]
constr += [energy_storage[i, :] <= list_battery[i].capacity]
constr += [energy_storage[i, :] >= 0]
# -- 2) TIME DEPENDENT
for t in range(nb_pts_T+1):
t_abs = t_cur + t * t_step
# ROOM TEMPERATURE
i = 0
for r_id in list_room_active_simu:
temp_constr = self.building_data.room(r_id).get_comfort_constraint(EMS_COMFORT_temperature) # time dependent
if temp_constr is not None:
constr += [temp_node[i, t] <= temp_constr.get_max(t_abs)]
constr += [temp_node[i, t] >= temp_constr.get_min(t_abs)]
i += 1
# Grid purchase consumption definition
for t in range(nb_pts_T):
constr += [grid_power[0, t] == sum(p_load_therm[:, t]) + loads_power_pred[t] + sum(p_storage[:, t]) + gen_power_pred[t]]
### ---- MODEL EQUATIONS ----
nb_active_rooms = len(list_room_active_simu)
nb_fix_rooms = len(list_room_fix_simu)
size_x = nb_active_rooms + nb_interf_nodes # #room + #nodes-interface
temp_0_var = size_x * [0]
# Unknown temperature points for the horizon N
idx = 0
for r_id in list_room_active_simu:
temp_0_var[idx] = self.building_data.room(r_id).get_comfort_value(EMS_COMFORT_temperature)
idx += 1
# Forecast temperature points for the horizon N
temp_fix_forecast = list()
for r_id in list_room_fix_simu:
temp_fix_forecast.append(self.building_data.room(r_id).get_forecast(EMS_COMFORT_temperature, t_pack))
# Thermal simulation RC matrix
aa, pp = self.get_thermal_matrix(t_step, list_room_active_simu, list_room_fix_simu, nb_interf_nodes)
# heat gain due to the HVACs
bb = np.zeros((size_x, len(list_therm_loads)))
heat_gain_vect = []
for i in range(size_x):
heat_gain_vect.append(nb_pts_T * [0])
idx_th = 0
for (l_id, l_obj) in list_therm_loads:
r_id = self.get_thermal_load_room(l_id)
if r_id is None or r_id not in list_room_active_simu: # Only the HVACs in simulated room make sense
continue
# Put the right variable in the vector "u"
idx_r = list_room_active_simu.index(r_id)
bb[idx_th][idx_r] = l_obj.thermal_efficiency # Get the thermal efficiency (elec -> therm heat) from the load object and place it at the right place
idx_th += 1
# heat gain due to the sun
dd = np.eye(size_x)
idx_r = 0
for r_id in list_room_active_simu:
if self.building_data.room(r_id).has_interface_with(neighbor=EMS_OUTSIDE_ROOM_NAME, type_interface=EMS_INTERFACE_WINDOW):
heat_gain_vect[idx_r] = self.building_data.room(r_id).get_solar_heat_gain(t_pack, irr_pred)
# Equations
constr += [temp_node[:, 0] == temp_0_var] # load the initial temperature values from the measurements
for t in range(0, nb_pts_T):
t_abs = t_cur + t * t_step
# --- Thermal model equations
# Room temp unknown, room temp forecast and interface node temp
d = list(heat_gain_vect[i][t] for i in range(len(heat_gain_vect))) # sun disturbance
p = list(temp_fix_forecast[i][t] for i in range(len(temp_fix_forecast))) # forecast temp. influence
if bb.size > 0:
u_term = bb * p_load_therm[:, t]
else:
u_term = np.zeros((size_x, 1))
constr += [temp_node[:, t + 1] == aa * temp_node[:, t] + u_term + np.matmul(dd, d) + np.matmul(pp, p)]
# --- Batteries energy equations
for i in range(len(list_battery)):
bat = list_battery[i]
constr += [energy_storage[i, t + 1] == t_step * bat.leak_coeff * energy_storage[i, t] + bat.efficiency * t_step * p_storage[i, t]]
### ---- Objective
obj_fct = 0.0
for t in range(nb_pts_T):
obj_fct += price_signal[t] * grid_power[0, t]
# --- MPC step solve --- #
mpc_build_prob = cvxpy.Problem(cvxpy.Minimize(obj_fct), constr)
start = time.time()
mpc_build_prob.solve(verbose=False)
elapsed_time = time.time() - start
print("MPC calculation time: {0} [sec]".format(elapsed_time))
if mpc_build_prob.status == cvxpy.OPTIMAL:
print("OK :)")
else:
print("PAS OK :( --- {0}".format(mpc_build_prob.status))
return []
def get_thermal_matrix(self, t_step, list_active_room, list_fix_room, nb_interf_node):
nb_active_rooms = len(list_active_room)
size_x = nb_interf_node + nb_active_rooms
mat_state = np.ones((size_x, size_x), dtype=np.float) # A matrix for RC model, initialized with 1 on the diagonal
mat_fix_tmp = np.zeros((size_x, len(list_fix_room)), dtype=np.float) # P matrix for forecast temp
# For matrix creation purpose: link 2 rooms to their interface
room_link_already_visited = dict()
for r_id in list_active_room:
room_link_already_visited[r_id] = list()
k = nb_active_rooms # the index pointing to the right interface temperature node
for i in range(nb_active_rooms):
r_id = list_active_room[i]
c_room = self.building_data.room(r_id).thermal_capa
# Loop over all the interfaces this room has with other rooms
list_room_interface = self.building_data.room(r_id).interfaces
for n_id in list_room_interface.keys():
# This interface has already been handled ?
if n_id in list_active_room and r_id in room_link_already_visited[n_id]:
continue
else:
room_link_already_visited[r_id].append(n_id)
# The general case is to consider multiple interface for each neighbor
interf_list = list_room_interface[n_id]
if type(interf_list) is not list:
interf_list = [interf_list]
# Go through all the interface objects this room r_id has with the neighbor room n_id
for i_obj in interf_list:
# RC parameters
rc_model = i_obj.thermalProp
r_list = rc_model.resistiveParameters
c_list = rc_model.capacitiveParam
if i_obj.isOpen:
r_list = [OPEN_INTERFACE_R]
c_list = []
# Normal case: multiple intermediate temperature points
if len(c_list) > 0:
# ROOM r_id
coeff_room = t_step/(r_list[0]*c_room)
mat_state[i][i] -= coeff_room
mat_state[i][k] += coeff_room
# ROOM n_id
if n_id in list_active_room:
idx_n = list_active_room.index(n_id)
dk = rc_model.orderModel-1
coeff_room = t_step/(r_list[dk]*self.building_data.room(n_id).thermal_capa)
mat_state[idx_n][idx_n] -= coeff_room
mat_state[idx_n][k+dk] += coeff_room
for j in range(rc_model.orderModel):
idx1 = k-1
idx2 = k+1
# Update the influence of room n_id temperature
if j == 0:
idx1 = i
mat_state[k][k] -= t_step * (1.0 / (r_list[j]*c_list[j]) + 1.0 / (r_list[j+1]*c_list[j]))
mat_state[k][idx1] = t_step/(r_list[j]*c_list[j])
# Update the influence of room n_id temperature
if j == rc_model.orderModel-1:
if n_id in list_active_room:
idx2 = list_active_room.index(n_id)
else:
idx2 = list_fix_room.index(n_id)
if n_id in list_fix_room and j == rc_model.orderModel-1: # Take the fixed temperature in this case
mat_fix_tmp[k][idx2] = t_step/(r_list[j+1]*c_list[j])
else:
mat_state[k][idx2] = t_step/(r_list[j+1]*c_list[j])
k += 1 # INCREMENT THE INDEX OF THE INTERFACE TEMPERATURE POINTS
else: # Particular case: direct link between two rooms via a resistance
r = r_list[0]
coeff1 = t_step/(r*c_room)
mat_state[i][i] -= coeff1 # neighbor effect on r_id
if n_id in list_active_room: # Interaction with another temperature point to simulate
coeff2 = t_step/(r*self.building_data.room(n_id).thermal_capa)
n_index = list_active_room.index(n_id)
mat_state[i][n_index] += coeff1 # neighbor effect on r_id
mat_state[n_index][n_index] -= coeff2 # effect of r_id on the neighbor
mat_state[n_index][i] += coeff2 # effect of r_id on the neighbor
else: # Interaction with a fixed temperature: don't
n_fix_index = list_fix_room.index(n_id)
mat_fix_tmp[i][n_fix_index] += coeff1
return mat_state, mat_fix_tmp
def get_nb_interface_nodes(self, active_rooms):
x = 0
already_visited = list()
for r_id in active_rooms:
interf = self.building_data.room(r_id).interfaces
for n_id in interf.keys():
# Skip this link if n_id -> r_id has already been taken into account
if n_id in already_visited:
continue
i_obj = interf[n_id]
if type(i_obj) is list:
for ii_obj in i_obj:
x += ii_obj.thermalProp.orderModel
else:
x += i_obj.thermalProp.orderModel
already_visited.append(r_id)
return x
def get_thermal_load_room(self, th_l_id):
for r_id in self.building_data.room_ids_list:
if th_l_id in self.building_data.room(r_id).get_comfort_loads(EMS_COMFORT_temperature):
return r_id
return None

Event Timeline